Dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和Spring框架无缝集成。现在已成为Apache的开源项目。
RPC(Remote Procedure Call)—远程过程调用
,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外的为这个交互作用编程,如果涉及的软件采用面向对象编程(java),那么远程过程调用亦可称作远程调用
或远程方法调用
。只要支持网络传输的协议就是RPC协议,RPC是一种框架。
按照官网架构图,模块内容设计如下
服务提供者:提供API,启动的时候要注册服务
服务消费者:从注册中心获取服务,调用子服务
注册中心:保存服务配置
RPC协议:
由于模块之间还要引用jar包,于是在手写实现时以包的形式代表各个模块
定义一个服务接口
public interface HelloService {
public void sayHello(String username);
}
-------------------------------------------------------------
public class HelloServiceImpl implements HelloService {
public void sayHello(String username) {
System.out.println("Hello:"+username);
}
}
注册服务,启动tomcat
这里用到了策略模式和简单工厂模式,提供了两种注册服务的策略
public static void main(String[] args) {
// 注册服务
// 远程注册
URL url = new URL("localhost", 8080);
RemoteMapRegister.regist(HelloService.class.getName(), url);
// 服务:实现类
// 本地注册
LocalRegister.regist(HelloService.class.getName(), HelloServiceImpl.class);
// 协议工厂
Protocol protocol = ProtocolFactory.getProtocol();
protocol.start(url);
}
服务注册形式
Map<interfaceName, List<URL>>
两个数据bean
Invocation
要实现Serializable,在服务消费端设值后序列化成对象流传输,然后在服务提供端转为对象,获取接口名,从注册中心获取实现类,从而调用方法。
@Data
@AllArgsConstructor
public class Invocation implements Serializable {
private String interfaceName;
private String methodName;
private Object[] params;
private Class[] paramType;
}
URL
@Data
@AllArgsConstructor
public class URL implements Serializable {
private String hostname;
private Integer port;
}
具体实现
本地注册
public class LocalRegister {
private static Map<String, Class> map = new HashMap<>();
/**
* 注册服务(暴露接口)
* @param interfaceName
* @param implClass
*/
public static void regist(String interfaceName, Class implClass) {
map.put(interfaceName, implClass);
}
/**
* 从注册中心获取实现类(发现服务)
* @param interfaceName
* @return
*/
public static Class get(String interfaceName) {
return map.get(interfaceName);
}
}
远程注册
public class RemoteMapRegister {
private static Map<String, List<URL>> REGISTER = new HashMap<>();
public static void regist(String interfaceName, URL url){
List<URL> list = REGISTER.get(interfaceName);
if (list == null) {
list = new ArrayList<>();
}
list.add(url);
REGISTER.put(interfaceName, list);
saveFile();
}
public static List<URL> get(String interfaceName) {
REGISTER = getFile();
List<URL> list = REGISTER.get(interfaceName);
return list;
}
private static void saveFile() {
try {
FileOutputStream fileOutputStream = new FileOutputStream("./temp.txt");
ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream);
objectOutputStream.writeObject(REGISTER);
} catch (IOException e) {
e.printStackTrace();
}
}
private static Map<String, List<URL>> getFile() {
try {
FileInputStream fileInputStream = new FileInputStream("./temp.txt");
ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
return (Map<String, List<URL>>) objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
}
引入内嵌tomcat依赖
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>9.0.12</version>
</dependency>
tomcat结构 server.xml
<Server port="8005" shutdown="SHUTDOWN">
<Service name="Catalina">
<Connector port="8080" protocol="HTTP/1.1"
connectionTimeout="20000"
redirectPort="8443"
URIEncoding="UTF-8"/>
<Engine name="Catalina" defaultHost="localhost">
<Host name="localhost" appBase="webapps"
unpackWARs="true" autoDeploy="true">
<Context path="" doBase="WORKDIR" reloadable="true"/>
</Host>
</Engine>
</Service>
</Server>
是不是很熟悉,根据这个xml结构构建一个tomcat启动类
public class HttpServer {
public void start(String hostname,Integer port){
// 实例一个tomcat
Tomcat tomcat = new Tomcat();
// 构建server
Server server = tomcat.getServer();
/**
* 在getServer的时候,就在方法内部执行了
* Service service = new StandardService();
* service.setName("Tomcat");
* server.addService(service);
*/
// 获取service
Service service = server.findService("Tomcat");
// 构建Connector
Connector connector = new Connector();
connector.setPort(port);
connector.setURIEncoding("UTF-8");
// 构建Engine
Engine engine = new StandardEngine();
engine.setDefaultHost(hostname);
// 构建Host
Host host = new StandardHost();
host.setName(hostname);
// 构建Context
String contextPath = "";
Context context = new StandardContext();
context.setPath(contextPath);
context.addLifecycleListener(new Tomcat.FixContextListener());// 生命周期监听器
// 然后按照server.xml,一层层把子节点添加到父节点
host.addChild(context);
engine.addChild(host);
service.setContainer(engine);
service.addConnector(connector);
// service在getServer时就被添加到server节点了
// tomcat是一个servlet,设置路径与映射
tomcat.addServlet(contextPath,"dispatcher",new DispatcherServlet());
context.addServletMappingDecoded("/*","dispatcher");
try {
tomcat.start();// 启动tomcat
tomcat.getServer().await();// 接受请求
}catch (LifecycleException e){
e.printStackTrace();
}
}
}
HttpServerHandler 所有http请求交给HttpServerHandler处理,即服务消费端的远程调用
public class DispatcherServlet extends HttpServlet{
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// 方便后期在此拓展服务
new HttpServerHandler().handler(req, resp);
}
}
public class HttpServerHandler {
public void handler(HttpServletRequest req, HttpServletResponse resp){
try{
// Http请求流转为对象
Invocation invocation = (Invocation) new ObjectInputStream(req.getInputStream()).readObject();
String interfaceName = invocation.getInterfaceName();
// 寻找注册中心的实现类,通过反射执行方法
Class implClass = LocalRegister.get(interfaceName);
Method method = implClass.getMethod(invocation.getMethodName(), invocation.getParamType());
String result = (String) method.invoke(implClass.newInstance(), invocation.getParams());
// 将结果返回
System.out.println("tomcat:" + result);
IOUtils.write(result, resp.getOutputStream());
}catch (Exception e){
e.printStackTrace();
}
}
}
注意: URL一定要重写equals与hashCode方法,否则Register.get(new URL("localhost",8080),invocation.getInterfaceName());
时为null。
ProtocolFactory 协议工厂
public class ProtocolFactory {
public static Protocol getProtocol() {
// 简单工厂模式
String name = System.getProperty("protocolName");
if (name == null || name.equals("")) name = "http";
switch (name) {
case "http":
return new HttpProtocol();
case "dubbo":
return new DubboProtocol();
default:
break;
}
return new HttpProtocol();
}
}
HttpProtocol协议实现类
public class HttpProtocol implements Protocol {
@Override
public void start(URL url) {
HttpServer httpServer = new HttpServer();
httpServer.start(url.getHostname(), url.getPort());
}
@Override
public String send(URL url, Invocation invocation) {
HttpClient httpClient = new HttpClient();
return httpClient.send(url.getHostname(), url.getPort(),invocation);
}
}
public class consumer {
public static void main(String[] args) {
HelloService helloService = ProxyFactory.getProxy(HelloService.class);
String result = helloService.sayHello("国王");
System.out.println(result);
}
}
HttpClient
public class HttpClient {
public String send(String hostname, Integer port, Invocation invocation) {
try {
// 进行http连接
URL url = new URL("http", hostname, port, "/");
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setRequestMethod("POST");
httpURLConnection.setDoOutput(true);
OutputStream outputStream = httpURLConnection.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(outputStream);
oos.writeObject(invocation);
oos.flush();
oos.close();
InputStream inputStream = httpURLConnection.getInputStream();
String result = IOUtils.toString(inputStream);
return result;
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
HttpProtocol
public class HttpProtocol implements Protocol {
@Override
public void start(URL url) {
HttpServer httpServer = new HttpServer();
httpServer.start(url.getHostname(), url.getPort());
}
@Override
public String send(URL url, Invocation invocation) {
HttpClient httpClient = new HttpClient();
return httpClient.send(url.getHostname(), url.getPort(),invocation);
}
}
先启动服务提供者
再启动服务消费者
dubbo是直接引入接口jar包,调用接口方法就可以获取结果,于是使用到了动态代理返回一个代理对象。
public class ProxyFactory<T> {
@SuppressWarnings("unchecked")
public static <T> T getProxy(final Class interfaceClass) {
return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String mock = System.getProperty("mock");
if (mock != null && mock.startsWith("return:")) {
String result = mock.replace("return:", "");
return result;
}
Invocation invocation = new Invocation(interfaceClass.getName(), method.getName(), args, method.getParameterTypes());
// List<URL> urlList = ZookeeperRegister.get(interfaceClass.getName());
List<URL> urlList = RemoteMapRegister.get(interfaceClass.getName());
URL url = LoadBalance.random(urlList);
Protocol protocol = ProtocolFactory.getProtocol();
String result = protocol.send(url, invocation);
return result;
}
});
}
}
因为消费端与服务端是两个进程,消费端是获取不到服务端的REGISTER
的,所以需要在服务端注册时将URL写入文本,然后在消费端根据interfaceName随机调度已发布服务的服务器地址。
public class RemoteMapRegister {
private static Map<String, List<URL>> REGISTER = new HashMap<>();
public static void regist(String interfaceName, URL url){
List<URL> list = REGISTER.get(interfaceName);
if (list == null) {
list = new ArrayList<>();
}
list.add(url);
REGISTER.put(interfaceName, list);
saveFile();
}
public static List<URL> get(String interfaceName) {
REGISTER = getFile();
List<URL> list = REGISTER.get(interfaceName);
return list;
}
private static void saveFile() {
try {
FileOutputStream fileOutputStream = new FileOutputStream("./temp.txt");
ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream);
objectOutputStream.writeObject(REGISTER);
} catch (IOException e) {
e.printStackTrace();
}
}
private static Map<String, List<URL>> getFile() {
try {
FileInputStream fileInputStream = new FileInputStream("./temp.txt");
ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
return (Map<String, List<URL>>) objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
}
最后贴一个项目地址: