Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析

发布时间:2023年12月23日


在这里插入图片描述


概述

在这里插入图片描述


心跳检测

Netty 的心跳检测机制是一种用于保持网络连接活跃的机制,它通过定期发送和接收特定的消息(心跳包)来确保客户端和服务器之间的连接仍然有效。这种机制对于需要长时间保持连接的应用程序(如实时通信、监控、推送服务等)非常重要,因为它可以帮助检测连接是否因网络问题或客户端崩溃而断开。

Netty 提供了心跳检测机制,用于检测连接是否仍然处于活动状态。在 TCP 连接中,如果连接断开了,服务端和客户端不会立即知道它已经断开。因此,通过发送心跳消息并等待对方的响应,可以检测连接是否仍然处于活动状态。

Netty 提供了两种方式来实现心跳检测:

  1. 使用 TCP 层的 KeepAlive 机制。该机制默认的心跳时间是 2 小时,依赖操作系统实现,不够灵活。

  2. 使用 Netty 的 IdleStateHandler。IdleStateHandler 是 Netty 提供的空闲状态处理器,可以自定义检测间隔时间。通过设置 IdleStateHandler 的构造函数中的参数,可以指定读空闲检测的时间、写空闲检测的时间和读写空闲检测的时间。将它们设置为 0 表示禁用该类型的空闲检测。

IdleStateHandler 的构造函数如下:

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
  • readerIdleTime:读的空闲时间,超出此时间就会发送一个心跳检测包,检测是否连接。
  • writerIdleTime:写的空闲时间,超出此时间就会发送一个心跳检测包,检测是否连接。
  • allIdleTime:读写的空闲时间,超出此时间就会发送一个心跳检测包,检测是否连接。
  • unit:空闲时间单位,默认为秒(TimeUnit.SECONDS)。

当满足上述其中一个条件后,就会自动触发 IdleStateEvent,会传递给管道中的下一个 handler 的 user在这里插入代码片EventTriggered 事件去处理。

在服务端,可以添加 IdleStateHandler 心跳检测处理器,并添加自定义处理 handler 类实现 userEventTriggered() 方法作为超时事件的逻辑处理。

例如,可以设置每 5 秒进行一次读检测,如果 5 秒内 ChannelRead() 方法未被调用则触发一次 userEventTrigger() 方法。

在客户端,可以启动客户端后,先发送一个 “hello” 消息,然后等候服务端心跳信息 “ping”,收到心跳后,回复心跳响应 “ok”。心跳消息可以根据需要进行定义。

通过 Netty 的心跳检测机制,可以有效地维护长连接,保证连接的有效性,避免浪费服务端资源。


Code

package com.artisan.heartbeat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class HeartBeatArtisanServer {
    public static void main(String[] args) throws Exception {
        // 创建主从事件循环组
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 创建服务器启动类
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 配置事件循环组
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel作为通道
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 初始化子通道
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline(); // 获取管道
                            pipeline.addLast("decoder", new StringDecoder()); // 添加解码器
                            pipeline.addLast("encoder", new StringEncoder()); // 添加编码器
                            // 添加空闲状态处理器,设置3秒的读空闲时间
                            pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
                            pipeline.addLast(new HeartBeatArtisanServerHandler()); // 添加自定义处理器
                        }
                    });
            System.out.println("netty server start。。"); // 打印启动信息
            // 绑定端口并同步等待成功
            ChannelFuture future = bootstrap.bind(9000).sync();
            // 等待服务器socket关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace(); // 打印异常信息
        } finally {
            // 释放资源,优雅地关闭事件循环组
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

一个简单的 Netty 服务器示例,它使用 NioServerSocketChannel 作为服务器通道,并设置了一个 IdleStateHandler 来处理空闲状态。如果客户端在 3 秒内没有发送任何消息,服务器将触发一个 IdleStateEvent 事件,并传递给管道中的下一个处理器,即 HeartBeatArtisanServerHandler。这个自定义处理器需要实现 userEventTriggered 方法来处理这个事件,例如发送一个心跳包以保持连接活跃。

main 方法中,我们创建了一个 ServerBootstrap 实例,并配置了事件循环组、通道类型、空闲状态处理器和自定义处理器。然后,我们绑定了一个端口,并等待服务器启动和关闭。

注意:在实际应用中,HeartBeatArtisanServerHandler 类需要实现 userEventTriggered 方法来处理 IdleStateEvent,请继续看


这段代码是一个Netty的ChannelHandler,用于处理服务器端的心跳包。

package com.artisan.heartbeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class HeartBeatArtisanServerHandler extends SimpleChannelInboundHandler<String> {
    // 定义一个计数器,用于记录读空闲的次数
    int readIdleTimes = 0;
    // 当从通道读取到消息时,该方法将被调用
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(" ====== > [server] message received : " + s);
        // 如果接收到的消息是"Heartbeat Packet",则回复"ok"
        if ("Heartbeat Packet".equals(s)) {
            ctx.channel().writeAndFlush("ok");
        } else {
            // 如果是其他信息,打印其他信息处理 ...
            System.out.println(" 其他信息处理 ... ");
        }
    }
    // 当Netty触发超时事件时,该方法将被调用
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;
        // 定义一个字符串,用于存储超时事件的类型
        String eventType = null;
        // 根据事件的状态进行切换
        switch (event.state()) {
            case READER_IDLE:
                eventType = "读空闲";
                // 读空闲的计数加1
                readIdleTimes++; 
                break;
            case WRITER_IDLE:
                eventType = "写空闲";
                // 不处理
                break;
            case ALL_IDLE:
                eventType = "读写空闲";
                // 不处理
                break;
        }
        
        // 打印出超时事件的信息
        System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
        // 如果读空闲的次数超过3次,则关闭连接,释放更多资源
        if (readIdleTimes > 3) {
            System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
            ctx.channel().writeAndFlush("idle close");
            ctx.channel().close();
        }
    }
    // 当通道激活时,即连接成功建立时,该方法将被调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
    }
}

这段代码定义了一个Netty的ChannelHandler,用于处理网络连接中的心跳包。这个处理器继承自SimpleChannelInboundHandler<String>,意味着它主要用于处理字符串类型的消息。

  • channelRead0方法:当从通道读取到消息时,该方法将被调用。在这里,它检查接收到的消息是否是"Heartbeat Packet",如果是,则回复"ok",否则打印其他信息处理。
  • userEventTriggered方法:该方法用于处理Netty的超时事件。Netty会定期检查通道是否处于空闲状态,这里的空闲指的是没有读写操作发生。如果有超时事件,Netty将触发此方法。在这个方法中,它统计读空闲的次数,如果超过3次,则发送"idle close"消息并关闭连接。
  • channelActive方法:当通道激活时,即连接成功建立时,该方法将被调用。在这里,它打印出连接的远程地址。

简而言之: 这个处理器主要处理三种类型的超时事件:读空闲、写空闲和读写空闲。当接收到心跳包时,会回复"ok",如果读空闲的次数超过3次,则会关闭连接。


【Client】

这段代码是一个简单的Netty客户端示例,用于发送心跳包到服务器。以下是详细的中文注释:

package com.artisan.heartbeat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Random;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class HeartBeatArtisanClient {
    public static void main(String[] args) throws Exception {
        // 创建一个NioEventLoopGroup,用于处理Netty的事件循环
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            // 创建一个Bootstrap对象,用于配置客户端
            Bootstrap bootstrap = new Bootstrap();
            // 设置EventLoopGroup和Channel类型
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    // 添加ChannelInitializer,用于初始化ChannelPipeline
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 获取ChannelPipeline并添加解码器、编码器和心跳处理handler
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new HeartBeatArtisanClientHandler());
                        }
                    });
            // 打印出Netty客户端启动信息
            System.out.println("netty client start。。");
            // 连接到服务器,并获取Channel对象
            Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();
            // 定义要发送的心跳包文本
            String text = "Heartbeat Packet";
            // 创建一个Random对象,用于生成随机数
            Random random = new Random();
            // 在循环中随机发送心跳包
            while (channel.isActive()) {
                int num = random.nextInt(8);
                Thread.sleep(num * 1000);
                channel.writeAndFlush(text);
            }
        } catch (Exception e) {
            // 打印异常信息
            e.printStackTrace();
        } finally {
            // 优雅地关闭EventLoopGroup,释放资源
            eventLoopGroup.shutdownGracefully();
        }
    }
    
}

HeartBeatArtisanClientHandler,它继承自SimpleChannelInboundHandler<String>。这个处理器用于处理从服务器接收到的字符串消息。

package com.artisan.heartbeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class HeartBeatArtisanClientHandler extends SimpleChannelInboundHandler<String> {
    // 重写channelRead0方法以处理接收到的消息
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 打印出接收到的消息
        System.out.println(" client received :" + msg);
        // 如果接收到的消息是"idle close",则关闭连接
        if (msg != null && msg.equals("idle close")) {
            // 打印出服务端关闭连接的信息
            System.out.println(" 服务端关闭连接,客户端也关闭");
            // 关闭客户端连接
            ctx.channel().closeFuture();
        }
    }
}

在这个处理器中,当接收到消息时,channelRead0方法会被调用。如果接收到的是"idle close"消息,处理器会打印一条信息表明服务端已经关闭了连接,并立即关闭客户端的连接。这种情况下,客户端不会继续发送心跳包,因为服务端已经不再接受连接。


请看 客户端中 HeartBeatArtisanClient

在这里插入图片描述

再看下 HeartBeatArtisanServer端的设置

在这里插入图片描述

模拟心跳超时

按照如上的设计, 随机 0 - 7

我们先后启动Server和Client观察

在这里插入图片描述

在这里插入图片描述

正常情况

在这里插入图片描述

重新测试一下

在这里插入图片描述

在这里插入图片描述

消息逐一对应 。


IdleStateHandler源码分析

在这里插入图片描述

channelRead

我们先看下IdleStateHandler中的channelRead方法

在这里插入图片描述
只是进行了透传,不做任何业务逻辑处理,让channelPipe中的下一个handler处理channelRead方法.

channelActive

在这里插入图片描述

initialize

initialize的方法,这是IdleStateHandler的核心源码

private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }

在这个方法中,首先通过switch语句检查当前的状态。如果状态已经是1或2,则不需要进行初始化。状态1通常表示初始化完成,状态2表示通道已经关闭。

然后将状态设置为1,并调用initOutputChanged方法来处理输出变化,这通常是为了确保通道的输出缓冲区大小与初始化时设置的大小相匹配。
接下来,设置最后读取和写入的时间为当前时间,这些时间戳将用于后续的空闲状态检测。
然后,根据设置的读空闲超时时间(readerIdleTimeNanos)、写空闲超时时间(writerIdleTimeNanos)和所有空闲超时时间(allIdleTimeNanos),分别调度对应的超时任务。这些任务会在指定的超时时间后被执行,以处理通道的空闲状态。

  • ReaderIdleTimeoutTask:如果通道在readerIdleTimeNanos内没有读取操作,这个任务将被触发。
  • WriterIdleTimeoutTask:如果通道在writerIdleTimeNanos内没有写入操作,这个任务将被触发。
  • AllIdleTimeoutTask:如果通道在allIdleTimeNanos内既没有读取操作也没有写入操作,这个任务将被触发。

一般我们都是使用 readerIdleTimeout ,所以我们需要重点关注

 if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }

继续 ReaderIdleTimeoutTask
在这里插入图片描述

直接看run方法

@Override
        protected void run(ChannelHandlerContext ctx) {
            // 定义下一次触发空闲状态事件前的延迟时间
            long nextDelay = readerIdleTimeNanos;
            // 如果当前不在读取状态
            if (!reading) {
                // 减去自上次读取时间以来的纳秒数,以调整下一次的延迟
                nextDelay -= ticksInNanos() - lastReadTime;
            }

            // 如果延迟时间小于等于0,说明读已经空闲了足够长的时间
            if (nextDelay <= 0) {
                // 设置一个新的超时时间,并通过回调通知
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

                // 是否是第一次触发读空闲事件
                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false; // 将标志设置为false,以避免后续的重复触发

                try {
                    // 创建一个空闲状态事件
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    // 在通道上下文中触发这个事件
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    // 如果发生异常,传播这个异常到通道上下文
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // 如果在超时之前发生了读取操作,安排一个较短延迟的超时
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

  1. run方法首先定义了一个nextDelay变量,这个变量表示下一次触发空闲状态事件前的延迟时间。这个延迟时间由readerIdleTimeNanos决定,它是在IdleStateHandler构造函数中设置的。
  2. 如果reading标志为false,意味着通道当前不在读取状态,那么会从nextDelay中减去自上次读取时间以来的纳秒数,以调整下一次的延迟。
  3. 如果nextDelay小于或等于0,这意味着读者已经空闲了足够长的时间,需要设置一个新的超时时间,并通过回调通知。这里使用了schedule方法在ctx通道上下文中安排一个IdleStateHandler的执行,执行时间为readerIdleTimeNanos
  4. firstReaderIdleEvent标志用于标识是否是第一次触发读者空闲事件。如果是第一次,这个标志会被设置为false,以避免后续的重复触发。
  5. 代码尝试创建一个IdleStateEvent事件,并使用channelIdle方法在通道上下文中触发这个事件。这个事件会被传递给在ChannelPipeline中注册的IdleStateHandler的回调。
  6. 如果在上述过程中发生异常,使用ctx.fireExceptionCaught(t)方法在通道上下文中传播这个异常。
  7. 如果nextDelay大于0,说明在超时之前发生了读取操作,因此会安排一个较短延迟的超时。

我们举个例子来理解下哈

nextDelay -= ticksInNanos() - lastReadTime;

用当前时间减去最后一次channelRead方法调用的时间,假如这个结果是6s ,说明最后一次调用channelRead已经是6s
之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了

那就会触发 channelIdle(ctx, event);

源码如下

    /**
     * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
     * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
     */
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

如果没有超时则不触发userEventTriggered方法。

没有超时则走

// 如果在超时之前发生了读取操作,安排一个较短延迟的超时
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);

这个run方法是Netty处理通道空闲状态的关键部分,它确保了在通道长时间未进行读取操作时能够触发相应的处理逻辑,从而避免资源浪费和潜在的连接问题

在这里插入图片描述

文章来源:https://blog.csdn.net/yangshangwei/article/details/135173900
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。