提示:dubbo底层实现,手写dubbo框架。手写rpc框架、用servlet实现dubbo、用servlet实现rpc框架
上次开会,同时们讨论了一个dubbo框架的事务问题。才猛然意识到,自己之前手写过dubbo框架,结果都还给老师了。趁此机会,回忆并记录一下,方便自己日后查阅。本人水平有限,如有误导,欢迎斧正,一起学习,共同进步!
手写rpc原理时,一共有三个项目,分别是api项目、consumer项目、provider项目。其中consumer和provider都依赖了api。其中的
api项目中:请求的参数定义、接口的定义 等一些公共规范。
provider服务提供方:接口的具体实现(具体的功能)、socket的服务器端
consumer服务消费方:准备了请求参数、socket的客户端。
项目启动起来,创建一个socket的服务端,不断的监听某一个端口,等待着socket的客户端的链接,然后通过socket.getInputSream来拿到socket传递过来的对象,拿到对象以后,就是具体实现的参数(比如说UserServiceImpl的addUser的方法的请求参数),通过这个参数,去调用这个方法,调用完成以后,拿到一个返回结果。将这个返回结果以socket.getOutputSream的writeObject方法返回给请求端。
项目通过动态代理来生成一个目标(UserService)的代理对象(此次是jdk动态代理)。因为是jdk动态代理,所以是实现了InvocationHandler接口,具体的执行的invoke方法,在invoke方法中,去发起了socket的调用(指定了socket的host、port,参数,然后socket服务器那边就会监听到)。拿到这个代理对象以后,调用这个对象的addUser方法,并拿到响应结果。因为这个动态代理对象有这个目标对象的全部方法,所以可以直接调用。
这个是UserService。也就是定义的规范
/**
* @author: ZhengTianLiang
* @date: 2021/07/11 17:16
* @desc: 服务的标准(对外暴露的api规范)
*/
public interface UserService {
public UserDTO addUser(UserDTO userDTO);
}
这个是UserDTO 也就是addUser方法的请求参数
/**
* @author: ZhengTianLiang
* @date: 2021/07/11 17:16
* @desc: 是在网络中传输的dto,
*/
@Data
public class UserDTO implements Serializable {
/**
* 为了保证系列化和反序列化的安全性,可以加一个id
*/
private static final long serialVersionUID = -7085411221862236858L;
private String name;
private String age;
private String userId;
}
这个是RPCCommonReqDTO,也就是网络传输中的对象,里面有一个Object类型的参数,这个里面可以放上面的UserDTO
/**
* @author: ZhengTianLiang
* @date: 2021/07/12 22:34
* @desc: 定义的统一的在网络中数据传输的规则
*/
@Data
public class RPCCommonReqDTO implements Serializable {
private static final long serialVersionUID = 666960806401175269L;
// 方法名称
private String methodName;
// 类的权限定路径
private String classpath;
// 方法的参数
private Object[] args;
}
项目跑起来,调用startup方法,监听一个端口。监听某一个端口时它会等着某个客户端的链接。然后来一个socket对象,就创建一个线程去处理这个socket对象,所以具体的操作这个socket的方法,就需要写在runnable接口的run方法里面。run方法中,通过构造器的方式,将socket注入进来了,然后通过socket获取到输入流对象,然后拿到传输过来的对象,然后将这个对象,交给ServiceDispatch对象进行服务的分发,然后将ServiceDispatch对象分发完以后,返回的对象,通过网络(socket,OutputStream)返回给请求端
这个是serviceImpl,是上面的规范的具体实现
/**
* @author: ZhengTianLiang
* @date: 2021/07/11 17:28
* @desc: 这个是,api项目中的。UserService 接口的实现类
* 就是说,rpc-api项目,只是单纯的定义规范,
* rpc-provider项目,是具体实现(提供服务的)
* prc-consumer项目,是服务的消费方
*/
public class UserServiceImpl implements UserService {
/**
* @author: ZhengTianLiang
* @date: 2021/07/11 17:29
* @desc: 服务的具体实现
*/
public UserDTO addUser(UserDTO userDTO) {
System.out.println("接收到的dto:" + userDTO);
userDTO.setUserId(new Random().nextInt(1000000) + "");
System.out.println("返回的dto:" + userDTO);
return userDTO;
}
}
dispatch,进行服务的分发,有点类似于nginx的服务转发
package com.csdn.dispatch;
import com.csdn.dao.RPCCommonReqDTO;
import java.lang.reflect.Method;
/**
* @author: ZhengTianLiang
* @date: 2021/07/11 18:00
* @desc: 用来做网络的分发
*/
public class ServiceDispatch {
/**
* @author: ZhengTianLiang
* @date: 2021/07/11 18:00
* @desc: 做服务的分发
* 若是不用反射的话,可能是
* 当type=1 ---> 调用某个方法(比如说addUser)
* 当type=2 ---> 调用某个方法(比如说selectUser)
* ...
* 你至少要写一个枚举,type=1,2,3.。。 可能还要提供一份文档
*
* 但是若是用了反射,则省下了这些操作,直接提供不同的参数,就调用了不同的方法
*/
public static Object dispatch(Object reqObj) {
// 基于方法的反射的调用,来实现服务的分发。
RPCCommonReqDTO rpcCommonReqDTO = (RPCCommonReqDTO) reqObj;
Object[] args = rpcCommonReqDTO.getArgs();
String classpath = rpcCommonReqDTO.getClasspath();
String methodName = rpcCommonReqDTO.getMethodName();
// 这个是存放形参的类型的数组的,是因为反射调用方法时,要传递的
Class [] types = new Class[args.length];
for (int i=0;i<args.length;i++){
types[i] = args[i].getClass();
}
Object respObj = null;
try {
Class<?> clazz = Class.forName(classpath);
Method method = clazz.getDeclaredMethod(methodName, types);
// 调用方法 第一个参数是对象,第二个参数是方法的参数值(参数值,不是参数的clazz对象)
// Object obj = clazz.getConstructor().newInstance(); Object result = m2.invoke(obj,10,20);
respObj = method.invoke(clazz.newInstance(),args);
}catch (Exception e){
e.printStackTrace();
}
return respObj;
}
}
netServer,也就是socket的服务端,其中的main,是让socket服务端启动,开始监听socket请求
package com.csdn.net;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author: ZhengTianLiang
* @date: 2021/07/11 17:34
* @desc: 提供网络上的服务的传输 的功能的类
*/
public class NetServer {
// 总不能说,我这个服务器,只能处理一个线程吧,所以此时需要一个socket对应一个线程,因此我们创建一个线程池对象
private static final ExecutorService threadPool = Executors.newFixedThreadPool(100);
/**
* @author: ZhengTianLiang
* @date: 2021/07/11 17:35
* @desc: 提供服务的实现。port代表的是端口
*
* 流程是:项目跑起来,调用startup方法,监听一个端口。监听某一个端口时它会等着某个客户端的链接
* 然后来一个socket对象,就创建一个线程去处理这个socket对象,
* 所以具体的操作这个socket的方法,就需要写在runnable接口的run方法里面。
* run方法中,通过构造器的方式,将socket注入进来了,然后通过socket获取到输入流对象,
* 然后拿到传输过来的对象,然后将这个对象,交给ServiceDispatch对象进行服务的分发,
* 然后将ServiceDispatch对象分发完以后,返回的对象,通过网络(socket,OutputStream)返回给请求端
*/
public static void startup(int port) throws IOException {
ServerSocket serverSocket = new ServerSocket(port);
while (true){
// 阻塞的等待着,socket客户端的链接
Socket socket = serverSocket.accept();
// 让线程池去提交一个任务。submit中可以是 实现了runnable接口的类
threadPool.submit(new RPCThreadProcessor(socket));
// socket是通过输入、输出流的操作来实现通讯。这样是一种典型的nio(同步阻塞io),nio是会导致线程等待的。
// socket.getInputStream();
// socket.getOutputStream();
}
}
/**
* @author: ZhengTianLiang
* @date: 2021/07/11 17:34
* @desc: 写一个启动类,去启动这个socket的服务端,去不断的监听 是否有socket请求进来
*/
public static void main(String[] args) throws IOException {
startup(9999);
}
}
RPCThreadProcessor,每一个线程的具体的操作
package com.csdn.net;
import com.csdn.dispatch.ServiceDispatch;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* @author: ZhengTianLiang
* @date: 2021/07/11 17:44
* @desc: 线程的处理类(线程池中的线程对象 , 都这样处理)
*/
public class RPCThreadProcessor implements Runnable {
/**
* 因为我们这个线程是要操作socket对象的,所以我们为了方便,自己通过构造器方式注入一个socker对象
* spring中的注入方式有三种,getter方法、构造器方式、注解方式(@Component/@Repository/@Controller/@Service)
*/
private Socket socket;
public RPCThreadProcessor(Socket socket) {
this.socket = socket;
}
/**
* @author: ZhengTianLiang
* @date: 2021/07/11 17:54
* @desc: 具体的操作应该写在这里,流程的步骤我写在了:NetServer的解释中
*/
public void run() {
// 因为在网络中传输的是java对象(dto对象),所以可以用ObjectInputStream而不是InputStream
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try {
ois = new ObjectInputStream(socket.getInputStream());
// 在网络传输的过程中,传输过来的一个对象
Object reqObj = ois.readObject();
// 将网络中传输过来的对象,交给这个分发器去分发。拿到的返回对象,是要往请求端返回的
Object respObj = ServiceDispatch.dispatch(reqObj);
oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(respObj);
oos.flush();
}catch (Exception e){
e.printStackTrace();
}finally {
try {
ois.close();
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
首先 通过动态工厂的方式,拿到一个目标对象的动态代理对象。然后封装请求参数,发起调用。调用的过程中,最终会跑到实现了InvocationHandler接口的 invoke 方法中,所以我们在invoke方法中调用了 socket的工具类,去发起socket请求。并接收到服务提供方的返回结果,然后拿到这个返回结果。完成一次rpc远程调用。
NetClient,也就是socket客户端的工具类
package com.csdn.net;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* @author: ZhengTianLiang
* @date: 2021/07/25 10:24
* @desc: 是socket的工具类
*/
public class NetClient {
/**
* @author: ZhengTianLiang
* @date: 2021/07/25 10:24
* @desc: 是socket的实现,发出具体的网络请求。 host代表目标主机、port代表目标端口,obj是请求入参
*/
public static Object callRemoteService(String host,int port,Object obj){
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
Object o = null;
try {
Socket socket = new Socket(host,port);
oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(obj);
oos.flush();
ois = new ObjectInputStream(socket.getInputStream());
// 网络传过来的响应对象
o = ois.readObject();
}catch (Exception e){
e.printStackTrace();
}finally {
try {
ois.close();
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return o;
}
}
ProxyFactory,也就是生成目标对象(UserServiceImpl)的动态工厂
package com.csdn.proxy;
import java.lang.reflect.Proxy;
/**
* @author: ZhengTianLiang
* @date: 2021/07/25 9:56
* @desc: 代理工厂对象,用来动态的生成 各种 serviceImpl 对象
*/
public class ProxyFactory {
/**
* @author: ZhengTianLiang
* @date: 2021/07/25 9:57
* @param: 想创建什么类型的对象,就传入什么类型的clazz对象
* @desc: 使用jdk动态代理来生成 传入参数的 动态代理对象
*/
public static <T> T getProxyInstance(Class<T> interfaceClazz){
/*
这个里面有三个参数,分别是:classLoader、clazz数组、InvocationHandler
当前类的clazzLoader: 类名.class().getClassLoader
class数组:接口列表(要创建的对象),是一个数组
InvocationHandler:实现了InvocationHandler 接口的实现类
*/
return (T) Proxy.newProxyInstance(ProxyFactory.class.getClassLoader(),
new Class[]{interfaceClazz},new RPCInvocationHandler());
}
}
InvocationHandler,其中的invoke方法,也就是具体的执行(里面调用了socket工具类去发请求)。
package com.csdn.proxy;
import com.csdn.dao.RPCCommonReqDTO;
import com.csdn.net.NetClient;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
/**
* @author: ZhengTianLiang
* @date: 2021/07/25 10:04
* @desc: 是实现了InvocationHandler接口的实现类,jdk动态代理需要实现这个接口。
* 其中的 invoke 方法是具体的实现
*/
public class RPCInvocationHandler implements InvocationHandler {
/**
* @author: ZhengTianLiang
* @date: 2021/07/25 10:04
* @desc: 动态代理可以对 保护目标对象、实例对象进行强化,
* 这个invoke 方法内就是强化的具体
* 这个invoke方法,进行网络请求的封装(因为你调用目标对象serviceImpl的addUser方法,
* 最终是执行的这个invoke方法,所以我们可以把网络请求封装到这个invoke方法中去)
*/
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 封装网络请求
RPCCommonReqDTO rpcCommonReqDTO = new RPCCommonReqDTO();
rpcCommonReqDTO.setMethodName(method.getName());
rpcCommonReqDTO.setArgs(args);
// 这个目前是写死的,二期优化会是动态的
rpcCommonReqDTO.setClasspath("com.csdn.api.impl.UserServiceImpl");
// 调用socket的工具类来发起请求
Object responseObj = NetClient.callRemoteService("localhost", 9999, rpcCommonReqDTO);
return responseObj;
}
}
测试如下:
package com.csdn;
import com.csdn.api.UserService;
import com.csdn.dao.UserDTO;
import com.csdn.proxy.ProxyFactory;
/**
* @author: ZhengTianLiang
* @date: 2021/07/25 9:55
* @desc: 测试发起实例(这个项目是一个rpc Consumer项目 , 是测试消费方的代码)
*/
public class Test {
/**
* @author: ZhengTianLiang
* @date: 2021/07/25 9:55
* @desc: 测试发起消费。通过自己动态代理工厂生成的,服务提供方的serviceImpl(我本身是服务消费方),
* 通过这个动态代理生成的 服务提供方的动态代理对象,来具体的执行serviceImpl中的方法,来实现rpc远程调用
*
*
* 首先 通过动态工厂的方式,拿到一个目标对象的动态代理对象。然后封装请求参数,发起调用。
* 调用的过程中,最终会跑到实现了InvocationHandler接口的 invoke 方法中,所以我们在invoke方法
* 中调用了 socket的工具类,去发起socket请求。并接收到服务提供方的返回结果,然后拿到这个返回结果。
* 完成一次rpc远程调用。
*/
public static void main(String[] args) {
UserService proxyInstance = ProxyFactory.getProxyInstance(UserService.class);
// 封装请求参数
UserDTO userDTO = new UserDTO();
userDTO.setName("userName");
userDTO.setAge("18");
UserDTO responsePojo = proxyInstance.addUser(userDTO);
System.out.println("传入的参数是:" + userDTO);
System.out.println("传入的参数是:" + responsePojo);
}
}
这只是dubbo框架的一个简单的实现,内部东西还很多。此次只是分享一下他最核心的rpc调用的流程。以后有机会了,会继续分享一些更细致的东西。