目录
心跳检测是用于保障服务端与客户端之间通信连接状态的实时监控。客户端不断向服务端发送心跳包(心跳包就是一组数据,自行定义,能够进行区分就好)。服务端在一定时间范围内能够正常接收客户端心跳包的话,就认为连接正常活跃;如果服务端在一定时间内没有接收到客户端心跳包的话,就认为连接出现中断或异常,那么就可以进行连接断开、释放资源等操作,从而保证节省服务端连接资源。
实现心跳检测,将通过由Netty提供的以下类:
1、IdleStateHandler
主要有三个关键参数,通过构造函数传入:
读空闲时间,readerIdleTime/readerIdleTimeSeconds
写空闲时间,writerIdleTime/writerIdleTimeSeconds
读写空闲时间,allIdleTime/allIdleTimeSeconds
如果对某一个参数不感兴趣,只需要将对应参数设置为0即可
2、触发事件
当通道有一段时间没有执行读取、写入或同时执行这两种操作时,触发IdleStateEvent。
没有读取操作,将触发?IdleState.READER_IDLE事件
没有写入操作,将触发IdleState.WRITER_IDLE事件
没有读写事件,将触发IdleState.ALL_IDLE事件
3、处理事件
当触发上诉事件时,我们需要接收并且进行对应处理,通过重写handler中的userEventTriggered方法,进行事件处理,参考如下:
public class MyHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new PingMessage());
}
}
}
}
接下来,将通过实际代码,来实现自己的心跳检测。
本次实现,服务端只关注读空闲事件,也就是如果服务端在3秒内没有收到客户端的心跳包,将会关闭与客户端之间的连接。
1、ServerChannelInitializer,用于与每个客户端Channel的初始化
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 服务端只关注读事件,如果3秒内没有收到客户端的消息,将会触发IdleState.READER_IDLE事件,将由HeartbeatHandler进行处理
pipeline.addLast(new IdleStateHandler(3, 0, 0));
pipeline.addLast(new LineBasedFrameDecoder(1024));
pipeline.addLast(new StringDecoder());
// 处理客户端读超时
pipeline.addLast(new HeartbeatHandler());
}
}
2、HeartbeatHandler,用于处理IdleStateEvent事件,服务端只关注读空闲事件,当触发读空闲事件时,将会关闭与客户端之间的连接
public class HeartbeatHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 处理IdleState.READER_IDLE时间
if(evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
IdleState idleState = ((IdleStateEvent) evt).state();
// 如果是触发的是读空闲时间,说明已经超过n秒没有收到客户端心跳包
if(idleState == IdleState.READER_IDLE) {
System.out.println("超过n秒没有收到客户端心跳, channel: " + ctx.channel());
// 关闭channel,避免造成更多资源占用
ctx.close();
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接收到客户端数据, channel: " + ctx.channel() + ", 数据: " + msg.toString());
}
}
3、NettyServer,用于创建并启动服务端
public class NettyServer {
public void bind(Integer port){
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerChannelInitializer());
// 阻塞, 等待启动
ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
System.out.println("server启动成功!");
// 阻塞,等到关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
childGroup.shutdownGracefully();
parentGroup.shutdownGracefully();
}
}
}
客户端只关注写空闲事件,如果在2秒内没有任何写操作,将会触发写空闲事件,向服务端发送心跳包。
1、ClientChannelInitializer,用于与服务端通信的Channel初始化
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 客户端只关注写事件,如果超过2秒没有发送数据,则发送心跳包
pipeline.addLast(new IdleStateHandler(0, 2, 0));
pipeline.addLast(new LineBasedFrameDecoder(1024));
pipeline.addLast(new StringEncoder());
pipeline.addLast(new HeartbeatHandler());
}
}
2、HeartbeatHandler,处理写空闲事件
public class HeartbeatHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
IdleState idleState = idleStateEvent.state();
if(idleState == IdleState.WRITER_IDLE) {
ctx.writeAndFlush("两秒没有写数据,发送心跳包\n");
System.out.println("超过两秒没有写数据,发送心跳包");
}
}
}
}
3、NettyClient,初始化客户端并且启动
public class NettyClient {
public void connect(String remoteIP, Integer port) {
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.AUTO_READ, true)
.handler(new ClientChannelInitializer());
// 阻塞,等待连接
ChannelFuture channelFuture = bootstrap.connect(remoteIP, port).sync();
System.out.println("client启动成功!");
// 阻塞,等待关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workGroup.shutdownGracefully();
}
}
}
1、启动服务端
new NettyServer().bind(8000);
2、启动客户端
new NettyClient().connect("127.0.0.1", 8000);
3、查看运行结果:
1)客户端
client启动成功!
超过两秒没有写数据,发送心跳包
超过两秒没有写数据,发送心跳包
超过两秒没有写数据,发送心跳包
...
2)服务端
server启动成功!
接收到客户端数据, channel: [id: 0x093d838a, L:/127.0.0.1:8000 - R:/127.0.0.1:54593], 数据: 两秒没有写数据,发送心跳包
接收到客户端数据, channel: [id: 0x093d838a, L:/127.0.0.1:8000 - R:/127.0.0.1:54593], 数据: 两秒没有写数据,发送心跳包
接收到客户端数据, channel: [id: 0x093d838a, L:/127.0.0.1:8000 - R:/127.0.0.1:54593], 数据: 两秒没有写数据,发送心跳包
...
4、对比
可将ClientChannelInitializer中的以下语句进行注释
pipeline.addLast(new HeartbeatHandler());
重启客户端,观察结果:
1)服务端
server启动成功!
超过n秒没有收到客户端心跳, channel: [id: 0xd8e262c0, L:/127.0.0.1:8000 - R:/127.0.0.1:54718]
2)客户端,channel关闭退出
client启动成功!
Process finished with exit code 0