该处理器接口用于处理io事件,和拦截io操作 ,并且调用传递给pipeline中的下一个处理器。
它下面有2个子接口,并且它们都有对应的适配器类,并且还有1个复合的ChannelDuplexHandler类,用于处理入站io事件和出站io操作
上下文对象
状态管理
使用AttributeKey
尽管推荐使用成员变量去存储handler自身的状态数据,但是由于某些原因,你可能并不想为每个连接都去创建1个新的handler。在这种情况下,可以使用ChannelHandlerContext提供的AttributeKey来解决这个问题(如下所示),但是目前ChannelHandlerContext#attr和ChannelHandlerContext#hasAttr都被弃用了,推荐使用Channel#attr(AttributeKey)和Channel#hasAttr。这样就可以让同一个handler实例在多个pipeline中都可以使用了。
public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
private final AttributeKey<Boolean> auth = AttributeKey.valueOf("auth");
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
Attribute<Boolean> attr = ctx.attr(auth);
if (msg instanceof LoginMessage) {
authenticate((LoginMessage) msg);
attr.set(true);
} else if (message instanceof GetDataMessage) {
if (Boolean.TRUE.equals(attr.get())) {
ctx.writeAndFlush(fetchSecret((GetDataMessage) msg));
} else {
fail();
}
}
}
}
@Sharable注解
ChannelHandler的API
ChannelHandler 作为顶层接口,它并不具备太多功能,它仅仅只提供了三个 API:
API | 描述 |
---|---|
handlerAdded() | 当ChannelHandler 添加到 ChannelPipeline 中时被调用 |
handlerRemoved() | 当 ChannelHandler 被从 ChannelPipeline 移除时调用 |
exceptionCaught() | 当 ChannelHandler 在处理过程中出现异常时调用 |
从 ChannelHandler 提供的 API 中我们可以看出,它并不直接参与 Channel 的数据加工过程,而是用来响应 ChannelPipeline 链和异常处理的,对于 Channel 的数据加工则由它的子接口处理:
public interface ChannelHandler {
// 当ChannelHandler 添加到 ChannelPipeline 中时被调用
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
// 当 ChannelHandler 被从 ChannelPipeline 移除时调用
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
// 当 ChannelHandler 在处理过程中出现异常时调用
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}
public abstract class ChannelHandlerAdapter implements ChannelHandler {
boolean added;
protected void ensureNotSharable() {
if (isSharable()) {
throw new IllegalStateException("ChannelHandler " + getClass().getName() + " is not allowed to be shared");
}
}
public boolean isSharable() {
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Skip
@Override
@Deprecated
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
public interface ChannelInboundHandler extends ChannelHandler {
// Channel 被注册到EventLoop 时
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
// Channel 从 EventLoop 中取消时
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
// Channel 处于活跃状态,可以读写时
void channelActive(ChannelHandlerContext ctx) throws Exception;
// Channel 不再是活动状态且不再连接它的远程节点时
void channelInactive(ChannelHandlerContext ctx) throws Exception;
// Channel 读取数据时
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
// Channel 从上一个读操作完成时
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
// ChannelInboundHandler.fireUserEventTriggered()方法被调用时
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
// Channel 的可写状态发生改变时
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
出处:大明哥的死磕netty专栏 — 精密数据工匠:探索 Netty ChannelHandler 的奥秘
Channel 是有状态的,而且 Channel 也提供了判断 Channel 当前状态的 API,如下:
状态 | 描述 |
---|---|
ChannelUnregistered | Channel 已经被创建,但还未注册到 EventLoop。此时 isOpen() 返回 true,但 isRegistered() 返回 false。 |
ChannelRegistered | Channel 已经被注册到 EventLoop。此时 isRegistered() 返回 true,但 isActive() 返回 false。 |
ChannelActive | Channel 已经处于活动状态并可以接收与发送数据。此时 isActive() 返回 true。 |
ChannelInactive | Channel 没有连接到远程节点 |
状态变更如下:
当 Channel 的状态发生改变时,会生成相对应的事件,这些事件会被转发给 ChannelHandler,而 ChannelHandler 中会有相对应的方法来对其进行响应。在 ChannelHandler 中定义一些与这生命周期相关的 API,如 channelRegistered() 、channelUnregistered() 、channelActive() 、channelInactive()等等,后面大明哥会详细介绍这些 API。
调用时机如下:
@Slf4j
public class NettyServer11 {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(16);
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new CustomizeInboundHandler());
}
});
try {
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8080)).sync();
log.info("=======服务器启动成功=======");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
@Slf4j
public class CustomizeInboundHandler implements ChannelInboundHandler {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("【handlerAdded】- handler 添加到 ChannelPipeline");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("【channelRegistered】- handler 注册到 eventLoop");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("【channelActive】- Channel 准备就绪");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("【channelRead】- Channel 中有可读数据");
if (msg instanceof ByteBuf) {
try {
ByteBuf byteBuf = (ByteBuf) msg;
String code = byteBuf.toString(StandardCharsets.UTF_8);
if ("evt".equals(code)) {
ctx.fireUserEventTriggered("JUST A EVT~");
} else if ("ex".equals(code)) {
throw new NullPointerException("NULL POINTER~");
} else if ("write".equals(code)) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer().writeBytes("Great!Well Done~".getBytes());
ctx.channel().writeAndFlush(buf);
} else {
log.info("服务端收到客户端发送的消息: {}", code);
}
} finally {
ReferenceCountUtil.release(msg);
}
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("【channelReadComplete】- Channel 读取数据完成");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("【channelInactive】- Channel 被关闭,不在活跃");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("【channelUnregistered】- Channel 从 EventLoop 中被取消");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("【handlerRemoved】- handler 从 ChannelPipeline 中移除");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("【exceptionCaught】 - ChannelHandler处理发生异常");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info("【userEventTriggered】 - 激发自定义事件: {}", evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
log.info("【channelWritabilityChanged】 - 可写状态改变");
}
}
@Slf4j
public class NettyClient11 {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Channel channel = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf byteBuf = (ByteBuf) msg;
log.info("客户端收到服务端发送的消息: {}", byteBuf.toString(StandardCharsets.UTF_8));
ReferenceCountUtil.release(msg);
}
}
});
}
})
.connect("127.0.0.1", 8080)
.sync()
.channel();
log.info("=======客户端连接服务器成功=======");
Scanner sc = new Scanner(System.in);
while (true) {
System.out.print("输入:");
String line = sc.nextLine();
if (line == null || line.length() == 0) {
continue;
}
if ("close".equals(line)) {
channel.close().sync();
eventLoopGroup.shutdownGracefully();
break;
}
// 输入内容
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
byteBuf.writeBytes(line.getBytes());
channel.writeAndFlush(byteBuf);
System.out.println("=======发送成功=======");
}
}
}
客户端依次发送:evt、ex、write、halo、close这几个字符串,观察日志输出
服务端日志
[15:57:54] [main] com.zzhua.test11.NettyServer11 [33] - =======服务器启动成功=======
[15:58:01] [nioEventLoopGroup-3-1] 【handlerAdded】- handler 添加到 ChannelPipeline
[15:58:01] [nioEventLoopGroup-3-1] 【channelRegistered】- handler 注册到 eventLoop
[15:58:01] [nioEventLoopGroup-3-1] 【channelActive】- Channel 准备就绪
[15:58:11] [nioEventLoopGroup-3-1] 【channelRead】- Channel 中有可读数据
[15:58:11] [nioEventLoopGroup-3-1] 【channelReadComplete】- Channel 读取数据完成
[15:58:22] [nioEventLoopGroup-3-1] 【channelRead】- Channel 中有可读数据
[15:58:22] [nioEventLoopGroup-3-1] 【exceptionCaught】 - ChannelHandler处理发生异常
[15:58:22] [nioEventLoopGroup-3-1] 【channelReadComplete】- Channel 读取数据完成
[15:58:31] [nioEventLoopGroup-3-1] 【channelRead】- Channel 中有可读数据
[15:58:31] [nioEventLoopGroup-3-1] 【channelReadComplete】- Channel 读取数据完成
[15:58:46] [nioEventLoopGroup-3-1] 【channelRead】- Channel 中有可读数据
[15:58:46] [nioEventLoopGroup-3-1] 服务端收到客户端发送的消息: halo
[15:58:46] [nioEventLoopGroup-3-1] 【channelReadComplete】- Channel 读取数据完成
[15:59:01] [nioEventLoopGroup-3-1] 【channelReadComplete】- Channel 读取数据完成
[15:59:01] [nioEventLoopGroup-3-1] 【channelInactive】- Channel 被关闭,不在活跃
[15:59:01] [nioEventLoopGroup-3-1] 【channelUnregistered】- Channel 从 EventLoop 中被取消
[15:59:01] [nioEventLoopGroup-3-1] 【handlerRemoved】- handler 从 ChannelPipeline 中移除
客户端日志
[15:58:01] [main] com.zzhua.test11.NettyClient11 [48] - =======客户端连接服务器成功=======
输入:evt
=======发送成功=======
输入:ex
=======发送成功=======
输入:write
=======发送成功=======
输入:[15:58:31] [INFO ] [nioEventLoopGroup-2-1] com.zzhua.test11.NettyClient11 [37] - 客户端收到服务端发送的消息: Great!Well Done~
halo
=======发送成功=======
输入:close
Disconnected from the target VM, address: '127.0.0.1:65133', transport: 'socket'
执行过程
在整个生命周期中,响应方法执行顺序如下:
handlerAdded()
-> channelRegistered()
-> channelActive ()
channelRead()
-> channelReadComplete()
channelReadComplete()
-> channelInactive()
-> channelUnregistered()
-> handlerRemoved()
这里大明哥对 ChannelHandler 生命周期的方法做一个总结:
handlerAdded()
:ChannelHandler 被加入到 Pipeline 时触发。当服务端检测到新链接后,会将 ChannelHandler 构建成一个双向链表(下篇文章介绍),该方法被触发表示在当前 Channel 中已经添加了一个 ChannelHandler 业务处理链了》。channelRegistered()
:当 Channel 注册到 EventLoop 中时被触发。该方法被触发了,表明当前 Channel 已经绑定到了某一个 EventLoop 中了。channelActive()
:Channel 连接就绪时触发。该方法被触发,说明当前 Channel 已经处于活跃状态了,可以进行数据读写了。channelRead()
:当 Channel 有数据可读时触发。客户端向服务端发送数据,都会触发该方法,该方法被调用说明有数据可读。而且我们自定义业务 handler 时都是重写该方法。channelReadComplete()
:当 Channel 数据读完时触发。服务端每次读完数据后都会触发该方法,表明数据已读取完毕。channelInactive()
:当 Channel 断开连接时触发。该方法被触发,说明 Channel 已经不再是活跃状态了,连接已经关闭了。channelUnregistered()
:当 Channel 取消注册时触发:连接关闭后,我们就要取消该 Channel 与 EventLoop 的绑定关系了。handlerRemoved()
:当 ChannelHandler 被从 ChannelPipeline 中移除时触发。将与该 Channel 绑定的 ChannelPipeline 中的 ChannelHandler 业务处理链全部移除。public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
@Skip
@Override
@SuppressWarnings("deprecation")
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
}
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean autoRelease;
protected SimpleChannelInboundHandler() {
this(true);
}
protected SimpleChannelInboundHandler(boolean autoRelease) {
matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
this.autoRelease = autoRelease;
}
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
this(inboundMessageType, true);
}
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
matcher = TypeParameterMatcher.get(inboundMessageType);
this.autoRelease = autoRelease;
}
public boolean acceptInboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
}
在使用 ChannelInboundHandlerAdapter 的时候,需要注意的是我们需要显示地释放与池化 ByteBuf 实例相关的内存,Netty 为此专门提供了一个方法 ReferenceCountUtil.release()
,即我们需要在 ChannelInboundHandler 的链的末尾需要使用该方法来释放内存,如下:
public class ByteBufReleaseHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
//释放msg
ReferenceCountUtil.release(msg);
}
}
但是有些小伙伴有时候会忘记这点,会带来不必要的麻烦,那有没有更好的方法呢?Netty 提供了一个类来帮助我们简化这个过程: SimpleChannelInboundHandler,对于我们业务处理的类,采用继承 SimpleChannelInboundHandler 而不是 ChannelInboundHandlerAdapter 就可以解决了。
使用 SimpleChannelInboundHandler 我们就不需要显示释放资源了,是不是非常人性化。
public interface ChannelOutboundHandler extends ChannelHandler {
// 请求将 Channel 绑定到本地地址时
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
// 请求将 Channel 连接到远程节点时
void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
// 请求将 Channel 从远程节点断开时
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// 请求关闭 Channel 时
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// 请求将 Channel 从它的 EventLoop 注销时
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// 请求从 Channel 中读取数据时
void read(ChannelHandlerContext ctx) throws Exception;
// 请求通过 Channel 将入队数据刷入远程节点时
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
// 请求通过 Channel 将数据写入远程节点时
void flush(ChannelHandlerContext ctx) throws Exception;
}
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.disconnect(promise);
}
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.close(promise);
}
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
继承自ChannelInboundHandlerAdapter,实现了ChannelOutboundHandler接口。
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.disconnect(promise);
}
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}