欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > 手撕RPC——实现简单的RPC调用

手撕RPC——实现简单的RPC调用

2024/10/25 23:00:33 来源:https://blog.csdn.net/pipihan21/article/details/139896472  浏览:    关键词:手撕RPC——实现简单的RPC调用

手撕RPC——实现简单的RPC调用

  • 一、场景设计
  • 二、设计思路
    • 2.1 客户端的设计
    • 2.2 服务端的设计
    • 2.3 通信设计
  • 三、代码实现
    • 3.1 定义用户信息
    • 3.2 用户服务接口
    • 3.3 用户服务接口实现
    • 3.4 定义消息格式
    • 3.5 实现动态代理类
    • 3.6 封装信息传输类
    • 3.7 定义服务端Server接口
    • 3.8 实现RpcServer接口
    • 3.9 实现WorkThread类
    • 3.10 实现本地服务存放器
    • 3.11 客户端主程序
    • 3.12 服务端主程序

一、场景设计

现在A,B位于不同的服务器上,但现在A想调用B的某个方法,如何实现呢?

服务端B
有一个用户表

  1. UserService 里有一个功能:getUserByUserId(Integer id)
  2. UserServiceImpl 实现了UserService接口和方法

客户端A
调用getUserByUserId方法, 内部传一个Id给服务端,服务端查询到User对象返回给客户端

如何实现以上调用过程呢?

二、设计思路

主要考虑客户端、服务端、以及双方如何通信才能实现此功能

2.1 客户端的设计

  1. 调用getUserByUserId方法时,内部将调用信息处理后发送给服务端B,告诉B我要获取User
  2. 外部调用方法,内部进行其它的处理——这种场景我们可以使用动态代理的方式,改写原本方法的处理逻辑

2.2 服务端的设计

  1. 监听到A的请求后,接收A的调用信息,并根据信息得到A想调用的服务与方法
  2. 根据信息找到对应的服务,进行调用后将结果发送回给A

2.3 通信设计

  1. 使用Java的socket网络编程进行通信
  2. 为了方便A ,B之间 对接收的消息进行处理,我们需要将请求信息和返回信息封装成统一的消息格式

三、代码实现

在此部分我们将理论转化为代码,分别实现客户端和服务端。

项目目录结构
在这里插入图片描述

3.1 定义用户信息

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {// 客户端和服务端共有的private Integer id;private String userName;private Boolean sex;
}

3.2 用户服务接口

public interface UserService {// 客户端通过这个接口调用服务端的实现类User getUserByUserId(Integer id);//新增一个功能Integer insertUserId(User user);
}

3.3 用户服务接口实现

public class UserServiceImpl implements UserService {@Overridepublic User getUserByUserId(Integer id) {System.out.println("客户端查询了"+id+"的用户");// 模拟从数据库中取用户的行为Random random = new Random();User user = User.builder().userName(UUID.randomUUID().toString()).id(id).sex(random.nextBoolean()).build();return user;}@Overridepublic Integer insertUserId(User user) {System.out.println("插入数据成功"+user.getUserName());return user.getId();}
}

3.4 定义消息格式

//定义请求信息格式RpcRequest
@Data
@Builder
public class RpcRequest implements Serializable {//服务类名,客户端只知道接口private String interfaceName;//调用的方法名private String methodName;//参数列表private Object[] params;//参数类型private Class<?>[] paramsType;
}//定义返回信息格式RpcResponse(类似http格式)
@Data
@Builder
public class RpcResponse implements Serializable {//状态码private int code;//状态信息private String message;//具体数据private Object data;//构造成功信息public static RpcResponse sussess(Object data){return RpcResponse.builder().code(200).data(data).build();}//构造失败信息public static RpcResponse fail(){return RpcResponse.builder().code(500).message("服务器发生错误").build();}
}

3.5 实现动态代理类

@AllArgsConstructor
public class ClientProxy implements InvocationHandler {//传入参数service接口的class对象,反射封装成一个requestprivate String host;private int port;//jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端)@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//构建requestRpcRequest request=RpcRequest.builder().interfaceName(method.getDeclaringClass().getName()).methodName(method.getName()).params(args).paramsType(method.getParameterTypes()).build();//IOClient.sendRequest 和服务端进行数据传输RpcResponse response= IOClient.sendRequest(host,port,request);return response.getData();}public <T>T getProxy(Class<T> clazz){Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);return (T)o;}
}

3.6 封装信息传输类

public class IOClient {//这里负责底层与服务端的通信,发送request,返回responsepublic static RpcResponse sendRequest(String host, int port, RpcRequest request){try {Socket socket=new Socket(host, port);ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());oos.writeObject(request);oos.flush();RpcResponse response=(RpcResponse) ois.readObject();return response;} catch (IOException | ClassNotFoundException e) {e.printStackTrace();return null;}}
}

3.7 定义服务端Server接口

public interface RpcServer {//开启监听void start(int port);void stop();
}

3.8 实现RpcServer接口

@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {private ServiceProvider serviceProvide;@Overridepublic void start(int port) {try {ServerSocket serverSocket=new ServerSocket(port);System.out.println("服务器启动了");while (true) {//如果没有连接,会堵塞在这里Socket socket = serverSocket.accept();//有连接,创建一个新的线程执行处理new Thread(new WorkThread(socket,serviceProvide)).start();}} catch (IOException e) {e.printStackTrace();}}@Overridepublic void stop() {}
}

3.9 实现WorkThread类

WorkThread类负责启动线程和客户端进行数据传输,WorkThread类中的getResponse方法负责解析收到的request信息,寻找服务进行调用并返回结果。

@AllArgsConstructor
public class WorkThread implements Runnable{private Socket socket;private ServiceProvider serviceProvide;@Overridepublic void run() {try {ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());//读取客户端传过来的requestRpcRequest rpcRequest = (RpcRequest) ois.readObject();//反射调用服务方法获取返回值RpcResponse rpcResponse=getResponse(rpcRequest);//向客户端写入responseoos.writeObject(rpcResponse);oos.flush();} catch (IOException | ClassNotFoundException e) {e.printStackTrace();}}private RpcResponse getResponse(RpcRequest rpcRequest){//得到服务名String interfaceName=rpcRequest.getInterfaceName();//得到服务端相应服务实现类Object service = serviceProvide.getService(interfaceName);//反射调用方法Method method=null;try {method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());Object invoke=method.invoke(service,rpcRequest.getParams());return RpcResponse.sussess(invoke);} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {e.printStackTrace();System.out.println("方法执行错误");return RpcResponse.fail();}}
}

3.10 实现本地服务存放器

因为一个服务器会有多个服务,所以需要设置一个本地服务存放器serviceProvider存放服务,在接收到服务端的request信息之后,我们在本地服务存放器找到需要的服务,通过反射调用方法,得到结果并返回

//本地服务存放器
public class ServiceProvider {//集合中存放服务的实例private Map<String,Object> interfaceProvider;public ServiceProvider(){this.interfaceProvider=new HashMap<>();}//本地注册服务public void provideServiceInterface(Object service){String serviceName=service.getClass().getName();Class<?>[] interfaceName=service.getClass().getInterfaces();for (Class<?> clazz:interfaceName){interfaceProvider.put(clazz.getName(),service);}}//获取服务实例public Object getService(String interfaceName){return interfaceProvider.get(interfaceName);}
}

3.11 客户端主程序

public class TestClient {public static void main(String[] args) {ClientProxy clientProxy=new ClientProxy("127.0.0.1",9999);UserService proxy=clientProxy.getProxy(UserService.class);User user = proxy.getUserByUserId(1);System.out.println("从服务端得到的user="+user.toString());User u=User.builder().id(100).userName("wxx").sex(true).build();Integer id = proxy.insertUserId(u);System.out.println("向服务端插入user的id"+id);}
}

3.12 服务端主程序

public class TestServer {public static void main(String[] args) {UserService userService=new UserServiceImpl();ServiceProvider serviceProvider=new ServiceProvider();serviceProvider.provideServiceInterface(userService);RpcServer rpcServer=new SimpleRPCRPCServer(serviceProvider);rpcServer.start(9999);}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com