手写netty通信框架以及常见问题

发布时间:2024年01月13日

目录

通信框架设计

实现功能点

通信模型

消息定义

可靠性设计

代码

服务端代码

常见netty问题

如何让netty支持百万长连接?

1. 操作系统层面优化

2. netty层面优化

? ? ? ? 2.1 设置合理线程

? ? ? ? 2.2 心跳优化

????????2.3 合理使用内存池

? ? ? ? 2.4 IO线程与业务线程剥离

3. JVM层面优化?

什么是水平触发(LT)和边缘触发(ET)?

DNS解析域名全过程?


通信框架设计

实现功能点

  1. 基于netty的NIO通信框架, 提供高性能的异步通信能力
  2. 支持消息编解码, 实现POJO的序列化和反序列化
  3. 消息安全验证, 防篡改
  4. 支持IP白名单接入
  5. 链路鉴权校验
  6. 客户端自动重连

通信模型

运行流程

  1. 客户端发送握手请求消息, 并携带节点ID和身份认证信息
  2. 服务端对握手消息请求合法性校验, 包括节点ID有效校验, 节点重复登录, IP是否合法等.校验通过后, 发送握手应答消息给客户端
  3. 链路建立成功后, 客户端发送业务消息
  4. 链路建立成功后, 客户端定时发送心跳消息
  5. 服务端发送业务消息
  6. 服务端定时发送心跳消息
  7. 服务端退出时, 服务端关闭连接, 客户端感知到服务端关闭连接后, 被动关闭客户端连接。并且开启异步线程, 客户端尝试重连

消息定义

  1. 消息头
    1. 消息id
    2. md5缺省摘要
    3. 消息类型
  2. 消息体

可靠性设计

心跳机制

重连机制

重复登录校验

md5缺省摘要校验

代码

服务端代码

1. maven依赖

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>
        <dependency>
            <groupId>de.javakaffee</groupId>
            <artifactId>kryo-serializers</artifactId>
            <version>0.42</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <!-- logback 依赖 -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
        </dependency>

2. Model代码

@Data
public final class MyMessage {

    private MsgHeader msgHeader;

    private Object body;
}

@Data
public final class MsgHeader {

    /*消息体的MD5摘要*/
    private String md5;

    /*消息的ID,因为是同步处理模式,不考虑应答消息需要填入请求消息ID*/
    private long msgID;

    /*消息类型*/
    private byte type;

    /*消息优先级*/
    private byte priority;

    private Map<String, Object> attachment = new HashMap<String, Object>();
}

public enum MessageType {

    SERVICE_REQ((byte) 0),/*业务请求消息*/
    SERVICE_RESP((byte) 1), /*TWO_WAY消息,需要业务应答*/
    ONE_WAY((byte) 2), /*无需应答的业务请求消息*/
    LOGIN_REQ((byte) 3), /*登录请求消息*/
    LOGIN_RESP((byte) 4), /*登录响应消息*/
    HEARTBEAT_REQ((byte) 5), /*心跳请求消息*/
    HEARTBEAT_RESP((byte) 6);/*心跳应答消息*/

    private byte value;

    private MessageType(byte value) {
        this.value = value;
    }

    public byte value() {
        return this.value;
    }
}

3. NettyServer

@Slf4j
public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("nt_boss"));
        EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("nt_worker"));
        ServerBootstrap sb = new ServerBootstrap();
        sb.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ServerInit());
        sb.bind(SERVER_PORT).sync();
        log.info("netty server start ip = {}   port = {}", SERVER_IP, SERVER_PORT);
    }
}

public class ServerInit extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
        // 解决粘包 半包问题
        sc.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
        sc.pipeline().addLast(new LengthFieldPrepender(2));
        // 序列化成消息对象
        sc.pipeline().addLast(new KryoDecoder());
        sc.pipeline().addLast(new KryoEncoder());
        /*处理心跳超时*/
        sc.pipeline().addLast(new ReadTimeoutHandler(15));
        // 处理登录鉴权
        sc.pipeline().addLast(new LoginAuthRespHandler());
        // 心跳
        sc.pipeline().addLast(new HeartBeatRespHandler());
        // 处理真正业务
        sc.pipeline().addLast(new ServerBusiHandler());
    }
}

4. 业务Handler

@Slf4j
public class ServerBusiHandler extends SimpleChannelInboundHandler<MyMessage> {

    private static BlockingQueue<Runnable> taskQueue  = new ArrayBlockingQueue<Runnable>(3000);
    private static ExecutorService executorService = new ThreadPoolExecutor(NettyRuntime.availableProcessors(),
            NettyRuntime.availableProcessors() * 2,60, TimeUnit.SECONDS,taskQueue);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) {
        // 校验MD5
        String headMd5 = msg.getMsgHeader().getMd5();
        String calcMd5 = EncryptUtils.encryptObj(msg.getBody());
        if(!headMd5.equals(calcMd5)){
            log.error("报文md5检查不通过:"+headMd5+" vs "+calcMd5+",关闭连接");
            ctx.writeAndFlush(buildBusiResp("报文md5检查不通过,关闭连接"));
            ctx.close();
        }
        log.info(msg.toString());
        if(msg.getMsgHeader().getType() == MessageType.ONE_WAY.value()){
            log.debug("ONE_WAY类型消息,异步处理");
            executorService.execute(() -> {
                // 处理消息 msg
            });
        }else{
            log.debug("TWO_WAY类型消息,应答");
            ctx.writeAndFlush(buildBusiResp("OK"));
        }
    }

    private MyMessage buildBusiResp(String result) {
        MyMessage message = new MyMessage();
        MsgHeader msgHeader = new MsgHeader();
        msgHeader.setType(MessageType.SERVICE_RESP.value());
        message.setMsgHeader(msgHeader);
        message.setBody(result);
        return message;
    }
}

常见netty问题

如何让netty支持百万长连接?

1. 操作系统层面优化

? ? ? ? 1.1 修改操作系统允许当前用户进程打开的句柄数量

# 查看允许当前用户进程打开的句柄数量
ulimit -n

# 修改数量
ulimit -n 1000000

? ? ? ? 1.2 修改软限制和硬限制

# 修改配置文件
vim /etc/security/limits.conf

# 文末添加
* soft nofile 1000000
* hard nofile 1000000


# 修改配置文件
vim /etc/pam.d/login

# 文末添加
session required /lib/security/pam_limits.so

? ? ? ? ?1.3 修改Linux系统级能打开最大文件数限制

# 查看Linux能打开最大文件数
cat /proc/sys/fs/file-max

# 修改限制
vim /etc/sysctl.conf
# 文末添加
fs.file_max = 1000000
# 配置立即生效
sysctl -p

2. netty层面优化

? ? ? ? 2.1 设置合理线程

????????对于 Nety 服务端,通常只需要启动一个监听端口用于端侧设备接入即可,但是如果服务 端集群实例比较少,甚至是单机(或者双机冷备)部署,在端侧设备在短时间内大量接入时,需要 对服务端的监听方式和线程模型做优化,以满足短时间内(例如 30s)百万级的端侧设备接入的 需要。 服务端可以监听多个端口,利用主从 Reactor 线程模型做接入优化,前端通过 SLB 做 4 层 门 7 层负载均衡。

?????????IO线程优化: 先使用默认构造函数的线程(CPU*2)压测, jstack监控堆栈, 如果都停留在Selectorlmpl. lockAndDoSelect, 表明IO线程比较空闲, 无须调整.如果停留在读/写操作, 可适当调大线程.

? ? ? ? 2.2 心跳优化

心跳优化策略:

  1. 及时检测失效连接, 将其剔除, 防止句柄占用, 导致OOM等问题
  2. 设置合理心跳周期, 防止心跳定时任务积压, 造成频繁FullGC
  3. 使用netty提供的链路空闲检测机制, 不要自己创建定时任务, 增加系统负担

心跳失败判断:

  1. 连续 N 次心跳检测都没有收到对方的 Pong 应答消息或者 Ping 请求消息,则认为链路 已经发生逻辑失效,这被称为心跳超时。
  2. 在读取和发送心跳消息的时候如果直接发生了 IO 异常,说明链路已经失效,这被称为 心跳失败。无论发生心跳超时还是心跳失败,都需要关闭链路,由客户端发起重连操作,保证链 路能够恢复正常。

Nety 提供了三种链路空闲检测机制:

  1. 读空闲,链路持续时间 T 没有读取到任何消息
  2. 写空闲,链路持续时间 T 没有发送任何消息
  3. 读写空闲,链路持续时间 T 没有接收或者发送任何消息
????????2.3 合理使用内存池

????????Nety 内存池从实现上可以分为两类:堆外直接内存和堆内存。由于 Byte Buf 主要用于网 络 IO 读写,因此采用堆外直接内存会减少一次从用户堆内存到内核态的字节数组拷贝,所以 性能更高。由于 DirectByteBuf 的创建成本比较高,因此如果使用 DirectByteBuf,则需要配合内存池使用.

? ? ? ? 2.4 IO线程与业务线程剥离

3. JVM层面优化?

  • 调整应用内存, 最少16G以上.
  • 垃圾收集器用G1或者ZGC
  • 结合具体业务, 监控JVM, 调整堆大小

什么是水平触发(LT)和边缘触发(ET)?

  • Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait() 会通知处理程序去读写。如果这次没有把数据一次性全部读写完,那么下次调用 epoll_wait() 时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会 一直通知你。
  • Edge_triggered(边缘触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait() 会通知处理程序去读写。如果这次没有把数据全部读写完,那么下次调用 epoll_wait()时, 它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会 通知你。这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符

select(),poll()模型都是水平触发模式,信号驱动 IO 是边缘触发模式,epoll()模型即支 持水平触发,也支持边缘触发,默认是水平触发。JDK 中的 select 实现是水平触发,而 Netty 提供的 Epoll 的实现中是边缘触发。

DNS解析域名全过程?

如将域名www.google.com解析成IP地址

  1. 用户在浏览器中输入域名www.google.com。
  2. 浏览器首先会检查本地的hosts文件和浏览器自身的DNS缓存,看是否有对应的IP地址。如果有,就直接访问这个IP地址对应的网站;如果没有,则向本地DNS服务器发起查询请求。
  3. 本地DNS服务器收到查询请求后,会查看自己的DNS缓存,看是否有对应的IP地址。如果有,就返回这个IP地址;如果没有,则向根DNS服务器发起查询请求。
  4. 根DNS服务器收到查询请求后,会将负责.com顶级域的顶级域名服务器的地址返回给本地DNS服务器。
  5. 本地DNS服务器收到顶级域名服务器的地址后,向其发起查询请求。顶级域名服务器会将负责google.com域的权威DNS服务器的地址返回给本地DNS服务器。
  6. 本地DNS服务器收到权威DNS服务器的地址后,向其发起查询请求。权威DNS服务器会将www.google.com域名对应的IP地址返回给本地DNS服务器。
  7. 本地DNS服务器收到IP地址后,将其返回给用户的浏览器。
  8. 浏览器收到IP地址后,就可以通过这个IP地址访问www.google.com网站。

文章来源:https://blog.csdn.net/weixin_64027360/article/details/135492407
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。