手写rpc和redis

发布时间:2024年01月23日

rpc框架搭建
consumer 消费者应用
provider 提供的服务
Provider-common 公共类模块
rpc 架构
service-Registration 服务发现
nacos nacos配置中心
load-balancing 负载均衡
redis-trench 手写redis实现和链接

package com.trench.protocol;

import com.trench.enumUtil.RedisRepEnum;
import redis.clients.jedis.util.SafeEncoder;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

public class Protocol {

    public static final  String DOLLAR="$";

    public static final String STAR="*";

    public static final String BLANK="\r\n";

    public static void  sendCommand(OutputStream outputStream, RedisRepEnum redisRepEnum,byte [] ... args){
        StringBuffer str=new StringBuffer();
        str.append(STAR).append(args.length-1).append(BLANK);
        str.append(DOLLAR).append(redisRepEnum.name().length()).append(BLANK);
        str.append(redisRepEnum).append(BLANK);

        Arrays.stream(args).forEach(arg->{
            str.append(DOLLAR).append(arg.length).append(BLANK);
            str.append(new String(arg)).append(BLANK);
        });
        try {
            outputStream.write(str.toString().getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static final byte[] toByteArray(long value) {
        return SafeEncoder.encode(String.valueOf(value));
    }
}

package com.trench.api;

import com.trench.connection.Connetion;
import com.trench.enumUtil.RedisRepEnum;
import com.trench.protocol.Protocol;
import com.trench.util.SerializeUtils;
import redis.clients.jedis.BuilderFactory;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;

public class Client {
    Connetion connetion;

    public Client(String host,Integer port){
        connetion=new Connetion(port,host);
    }

    public void set(final  String key, final String values){
        connetion.sendCommand( RedisRepEnum.SET,key.getBytes(StandardCharsets.UTF_8), SerializeUtils.serialize(values));
    }

    public Object get(final String key){
        connetion.sendCommand(RedisRepEnum.GET,key.getBytes(StandardCharsets.UTF_8));
        return connetion.getData();
    }

    public void  delete(final String key){
        connetion.sendCommand(RedisRepEnum.GETDEL,key.getBytes(StandardCharsets.UTF_8));
    }
    //封装redis的过期时间
    public void expire(String key, long seconds){
        connetion.sendCommand(RedisRepEnum.EXISTS, key.getBytes(StandardCharsets.UTF_8), Protocol.toByteArray(seconds));
    }
    //是否存在key
    public boolean exists(final String key) {
        connetion.sendCommand(RedisRepEnum.EXISTS, key.getBytes(StandardCharsets.UTF_8));
        return (Long)connetion.getData()==1L;
    }

    //查找key中set包含
    public Set<String> keys(final String key){
        connetion.sendCommand(RedisRepEnum.KEYS,key.getBytes(StandardCharsets.UTF_8));
        return (Set) BuilderFactory.STRING_SET.build((List)connetion.getData());
    }
}

rpc框架核心代码

package com.trench.protocol;

import com.trench.SerializeUtils;
import com.trench.frawork.Invocation;
import com.trench.nacos.dome.NacosHttp;
import org.apache.commons.io.IOUtils;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;

public class HttpClient {
    public String send(String hostName, Integer port, Invocation invocation) throws IOException {
        //读取nacos中的配置用户的请求方式。如http   POST get等
        NacosHttp nacosHttp=new NacosHttp();
        try {
            URL url=new URL(nacosHttp.getHttp(),hostName,port,nacosHttp.getFile());
            HttpURLConnection httpURLConnection=  (HttpURLConnection)url.openConnection();

            httpURLConnection.setRequestMethod(nacosHttp.getRequestMethod());
            httpURLConnection.setDoOutput(true);

            //配置
            OutputStream outputStream=httpURLConnection.getOutputStream();
            ObjectOutputStream oss = new ObjectOutputStream(outputStream);

            oss.writeObject(SerializeUtils.serialize(invocation));
            oss.flush();
            oss.close();

            InputStream inputStream = httpURLConnection.getInputStream();
           return (String) SerializeUtils.deSerialize(IOUtils.toString(inputStream).getBytes(StandardCharsets.UTF_8));
        } catch (MalformedURLException e) {
           throw e;
        } catch (IOException e) {
           throw e;
        }
    }
}

启动tomcat
package com.trench.protocol;

import org.apache.catalina.*;
import org.apache.catalina.connector.Connector;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.core.StandardEngine;
import org.apache.catalina.core.StandardHost;
import org.apache.catalina.startup.Tomcat;

public class HttpServer {

    public void start(String hostname,Integer port){
        //读取用户配置
        Tomcat tomcat=new Tomcat();
        Server server = tomcat.getServer();
        Service service = server.findService("Tomcat");

        Connector connector=new Connector();
        connector.setPort(port);

        Engine engine=new StandardEngine();
        engine.setDefaultHost(hostname);

        Host host=new StandardHost();
        host.setName(hostname);

        String contextPash="";
        Context context=new StandardContext();
        context.setPath(contextPash);
        context.addLifecycleListener(new Tomcat.FixContextListener());

        host.addChild(context);
        engine.addChild(host);

        service.setContainer(engine);
        service.addConnector(connector);

        tomcat.addServlet(contextPash,"dispatcher",new DispatcherServlet());

        context.addServletMappingDecoded("/*","dispatcher");

        try {
            tomcat.start();
            tomcat.getServer().await();
        }catch (LifecycleException e){
            e.printStackTrace();
        }
    }
}

package com.trench.protocol;

import com.trench.frawork.Invocation;
import com.trench.register.LocalRegister;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;

public class HttpServerHandler extends HttpServer {
    public void handler(HttpServletRequest request, HttpServletResponse response){
        //可自行添加为心跳监听等操作
        //处理请求
        try {
            Invocation invocation = (Invocation)new ObjectInputStream(request.getInputStream()).readObject();
            String interfaceName =invocation.getInterfaceName();//接口名称
            Class aClass = LocalRegister.get(interfaceName,invocation.getVersion());

            Method method = aClass.getMethod(invocation.getMethodName(), invocation.getParameterTypes());
            String invoke = (String) method.invoke(aClass.newInstance(), invocation.getParameter());

            IOUtils.write(invoke.getBytes(StandardCharsets.UTF_8),response.getOutputStream());

        } catch (IOException e) {
            e.printStackTrace();
        }catch (ClassNotFoundException e){
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        }
    }
}

相关的gitub仓库地址: master分支源代码

文章来源:https://blog.csdn.net/zjhzyw/article/details/135768369
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。