rpc框架搭建
consumer 消费者应用
provider 提供的服务
Provider-common 公共类模块
rpc 架构
service-Registration 服务发现
nacos nacos配置中心
load-balancing 负载均衡
redis-trench 手写redis实现和链接
package com.trench.protocol;
import com.trench.enumUtil.RedisRepEnum;
import redis.clients.jedis.util.SafeEncoder;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
public class Protocol {
public static final String DOLLAR="$";
public static final String STAR="*";
public static final String BLANK="\r\n";
public static void sendCommand(OutputStream outputStream, RedisRepEnum redisRepEnum,byte [] ... args){
StringBuffer str=new StringBuffer();
str.append(STAR).append(args.length-1).append(BLANK);
str.append(DOLLAR).append(redisRepEnum.name().length()).append(BLANK);
str.append(redisRepEnum).append(BLANK);
Arrays.stream(args).forEach(arg->{
str.append(DOLLAR).append(arg.length).append(BLANK);
str.append(new String(arg)).append(BLANK);
});
try {
outputStream.write(str.toString().getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
}
}
public static final byte[] toByteArray(long value) {
return SafeEncoder.encode(String.valueOf(value));
}
}
package com.trench.api;
import com.trench.connection.Connetion;
import com.trench.enumUtil.RedisRepEnum;
import com.trench.protocol.Protocol;
import com.trench.util.SerializeUtils;
import redis.clients.jedis.BuilderFactory;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
public class Client {
Connetion connetion;
public Client(String host,Integer port){
connetion=new Connetion(port,host);
}
public void set(final String key, final String values){
connetion.sendCommand( RedisRepEnum.SET,key.getBytes(StandardCharsets.UTF_8), SerializeUtils.serialize(values));
}
public Object get(final String key){
connetion.sendCommand(RedisRepEnum.GET,key.getBytes(StandardCharsets.UTF_8));
return connetion.getData();
}
public void delete(final String key){
connetion.sendCommand(RedisRepEnum.GETDEL,key.getBytes(StandardCharsets.UTF_8));
}
//封装redis的过期时间
public void expire(String key, long seconds){
connetion.sendCommand(RedisRepEnum.EXISTS, key.getBytes(StandardCharsets.UTF_8), Protocol.toByteArray(seconds));
}
//是否存在key
public boolean exists(final String key) {
connetion.sendCommand(RedisRepEnum.EXISTS, key.getBytes(StandardCharsets.UTF_8));
return (Long)connetion.getData()==1L;
}
//查找key中set包含
public Set<String> keys(final String key){
connetion.sendCommand(RedisRepEnum.KEYS,key.getBytes(StandardCharsets.UTF_8));
return (Set) BuilderFactory.STRING_SET.build((List)connetion.getData());
}
}
rpc框架核心代码
package com.trench.protocol;
import com.trench.SerializeUtils;
import com.trench.frawork.Invocation;
import com.trench.nacos.dome.NacosHttp;
import org.apache.commons.io.IOUtils;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
public class HttpClient {
public String send(String hostName, Integer port, Invocation invocation) throws IOException {
//读取nacos中的配置用户的请求方式。如http POST get等
NacosHttp nacosHttp=new NacosHttp();
try {
URL url=new URL(nacosHttp.getHttp(),hostName,port,nacosHttp.getFile());
HttpURLConnection httpURLConnection= (HttpURLConnection)url.openConnection();
httpURLConnection.setRequestMethod(nacosHttp.getRequestMethod());
httpURLConnection.setDoOutput(true);
//配置
OutputStream outputStream=httpURLConnection.getOutputStream();
ObjectOutputStream oss = new ObjectOutputStream(outputStream);
oss.writeObject(SerializeUtils.serialize(invocation));
oss.flush();
oss.close();
InputStream inputStream = httpURLConnection.getInputStream();
return (String) SerializeUtils.deSerialize(IOUtils.toString(inputStream).getBytes(StandardCharsets.UTF_8));
} catch (MalformedURLException e) {
throw e;
} catch (IOException e) {
throw e;
}
}
}
启动tomcat
package com.trench.protocol;
import org.apache.catalina.*;
import org.apache.catalina.connector.Connector;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.core.StandardEngine;
import org.apache.catalina.core.StandardHost;
import org.apache.catalina.startup.Tomcat;
public class HttpServer {
public void start(String hostname,Integer port){
//读取用户配置
Tomcat tomcat=new Tomcat();
Server server = tomcat.getServer();
Service service = server.findService("Tomcat");
Connector connector=new Connector();
connector.setPort(port);
Engine engine=new StandardEngine();
engine.setDefaultHost(hostname);
Host host=new StandardHost();
host.setName(hostname);
String contextPash="";
Context context=new StandardContext();
context.setPath(contextPash);
context.addLifecycleListener(new Tomcat.FixContextListener());
host.addChild(context);
engine.addChild(host);
service.setContainer(engine);
service.addConnector(connector);
tomcat.addServlet(contextPash,"dispatcher",new DispatcherServlet());
context.addServletMappingDecoded("/*","dispatcher");
try {
tomcat.start();
tomcat.getServer().await();
}catch (LifecycleException e){
e.printStackTrace();
}
}
}
package com.trench.protocol;
import com.trench.frawork.Invocation;
import com.trench.register.LocalRegister;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
public class HttpServerHandler extends HttpServer {
public void handler(HttpServletRequest request, HttpServletResponse response){
//可自行添加为心跳监听等操作
//处理请求
try {
Invocation invocation = (Invocation)new ObjectInputStream(request.getInputStream()).readObject();
String interfaceName =invocation.getInterfaceName();//接口名称
Class aClass = LocalRegister.get(interfaceName,invocation.getVersion());
Method method = aClass.getMethod(invocation.getMethodName(), invocation.getParameterTypes());
String invoke = (String) method.invoke(aClass.newInstance(), invocation.getParameter());
IOUtils.write(invoke.getBytes(StandardCharsets.UTF_8),response.getOutputStream());
} catch (IOException e) {
e.printStackTrace();
}catch (ClassNotFoundException e){
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
}
}
}
相关的gitub仓库地址: master分支源代码