Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.Copy
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
注意:netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO
如果使用传统NIO,其工作量大,bug 多
Netty 对 API 进行增强,使之更易用,如
public class HelloServer {
public static void main(String[] args) {
// 1、启动器,负责装配netty组件,启动服务器
new ServerBootstrap()
// 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector
.group(new NioEventLoopGroup())
// 3、选择服务器的 ServerSocketChannel 实现
.channel(NioServerSocketChannel.class)
// 4、child 负责处理读写,该方法决定了 child 执行哪些操作
// ChannelInitializer 处理器(仅执行一次)
// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 5、SocketChannel的处理器,使用StringDecoder解码,ByteBuf=>String
nioSocketChannel.pipeline().addLast(new StringDecoder());
// 6、SocketChannel的业务处理,使用上一个处理器的处理结果
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
});
}
// 7、ServerSocketChannel绑定8080端口
}).bind(8080);
}
}Copy
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
// 选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现
.channel(NioSocketChannel.class)
// ChannelInitializer 处理器(仅执行一次)
// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
// 消息会经过通道 handler 处理,这里是将 String => ByteBuf 编码发出
channel.pipeline().addLast(new StringEncoder());
}
})
// 指定要连接的服务器和端口
.connect(new InetSocketAddress("localhost", 8080))
// Netty 中很多方法都是异步的,如 connect
// 这时需要使用 sync 方法等待 connect 建立连接完毕
.sync()
// 获取 channel 对象,它即为通道抽象,可以进行数据读写操作
.channel()
// 写入消息并清空缓冲区
.writeAndFlush("hello world");
}
}Copy
左:客户端 右:服务器端
channel 可以理解为数据的通道
msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 中的各个 handler 加工,会变成其它类型对象,最后输出又变成 ByteBuf
handler 可以理解为数据的处理工序
工序有多道,
合在一起就是 pipeline(传递途径)
,pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
handler 分 Inbound 和 Outbound 两类
eventLoop 可以理解为处理数据的工人
事件循环对象 EventLoop
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件
它的继承关系如下
事件循环组 EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
public class TestEventLoop {
public static void main(String[] args) {
// 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程
EventLoopGroup group = new NioEventLoopGroup(2);
// 通过next方法可以获得下一个 EventLoop
System.out.println(group.next());
System.out.println(group.next());
// 通过EventLoop执行普通任务
group.next().execute(()->{
System.out.println(Thread.currentThread().getName() + " hello");
});
// 通过EventLoop执行定时任务
group.next().scheduleAtFixedRate(()->{
System.out.println(Thread.currentThread().getName() + " hello2");
}, 0, 1, TimeUnit.SECONDS);
// 优雅地关闭
group.shutdownGracefully();
}
}Copy
输出结果如下
io.netty.channel.nio.NioEventLoop@7bb11784
io.netty.channel.nio.NioEventLoop@33a10788
nioEventLoopGroup-2-1 hello
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2Copy
关闭 EventLoopGroup
优雅关闭 shutdownGracefully
方法。该方法会首先切换 EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
public class MyServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}Copy
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);
// 此处打断点调试,调用 channel.writeAndFlush(...);
System.in.read();
}
}Copy
Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件
public class MyServer {
public static void main(String[] args) {
new ServerBootstrap()
// 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
...
}
}Copy
多个客户端分别发送 hello
结果
nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4Copy
可以看出,一个EventLoop可以负责多个Channel,且EventLoop一旦与Channel绑定,则一直负责处理该Channel中的事件
当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理
public class MyServer {
public static void main(String[] args) {
// 增加自定义的非NioEventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理
socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
// 调用下一个handler
ctx.fireChannelRead(msg);
}
})
// 该handler绑定自定义的Group
.addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}Copy
启动四个客户端发送数据
nioEventLoopGroup-4-1 hello1
defaultEventLoopGroup-2-1 hello1
nioEventLoopGroup-4-2 hello2
defaultEventLoopGroup-2-2 hello2
nioEventLoopGroup-4-1 hello3
defaultEventLoopGroup-2-3 hello3
nioEventLoopGroup-4-2 hello4
defaultEventLoopGroup-2-4 hello4Copy
可以看出,客户端与服务器之间的事件,被nioEventLoopGroup和defaultEventLoopGroup分别处理
不同的EventLoopGroup切换的实现原理如下
由上面的图可以看出,当handler中绑定的Group不同时,需要切换Group来执行不同的任务
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获得下一个EventLoop, excutor 即为 EventLoopGroup
EventExecutor executor = next.executor();
// 如果下一个EventLoop 在当前的 EventLoopGroup中
if (executor.inEventLoop()) {
// 使用当前 EventLoopGroup 中的 EventLoop 来处理任务
next.invokeChannelRead(m);
} else {
// 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}Copy
Channel 的常用方法
拆分客户端代码
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
// NIO线程:NioEventLoop 中的线程
.connect(new InetSocketAddress("localhost", 8080));
// 该方法用于等待连接真正建立
channelFuture.sync();
// 获取客户端-服务器之间的Channel对象
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world");
System.in.read();
}
}Copy
如果我们去掉channelFuture.sync()
方法,会服务器无法收到hello world
这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()
方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端
所以需要通过channelFuture.sync()
方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程
下面还有一种方法,用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO 线程(去执行connect操作的线程)
addListener方法
通过这种方法可以在NIO线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
// NIO线程:NioEventLoop 中的线程
.connect(new InetSocketAddress("localhost", 8080));
// 当connect方法执行完毕后,也就是连接真正建立后
// 会在NIO线程中调用operationComplete方法
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world");
}
});
System.in.read();
}
}Copy
public class ReadClient {
public static void main(String[] args) throws InterruptedException {
// 创建EventLoopGroup,使用完毕后关闭
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.sync();
Channel channel = channelFuture.channel();
Scanner scanner = new Scanner(System.in);
// 创建一个线程用于输入并向服务器发送
new Thread(()->{
while (true) {
String msg = scanner.next();
if ("q".equals(msg)) {
// 关闭操作是异步的,在NIO线程中执行
channel.close();
break;
}
channel.writeAndFlush(msg);
}
}, "inputThread").start();
// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();
System.out.println("waiting close...");
// 同步等待NIO线程执行完close操作
closeFuture.sync();
// 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的
System.out.println("关闭之后执行一些额外操作...");
// 关闭EventLoopGroup
group.shutdownGracefully();
}
}Copy
关闭channel
当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作
如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现
通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作
// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();
// 同步等待NIO线程执行完close操作
closeFuture.sync();Copy
调用closeFuture.addListener方法,添加close的后续操作
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// 等待channel关闭后才执行的操作
System.out.println("关闭之后执行一些额外操作...");
// 关闭EventLoopGroup
group.shutdownGracefully();
}
});Copy
netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口
netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
public class JdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "JdkFuture");
}
};
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);
// 获得Future对象
Future<Integer> future = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(1);
return 50;
}
});
// 通过阻塞的方式,获得运行结果
System.out.println(future.get());
}
}Copy
public class NettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
// 获得 EventLoop 对象
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 50;
}
});
// 主线程中获取结果
System.out.println(Thread.currentThread().getName() + " 获取结果");
System.out.println("getNow " + future.getNow());
System.out.println("get " + future.get());
// NIO线程中异步获取结果
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println(Thread.currentThread().getName() + " 获取结果");
System.out.println("getNow " + future.getNow());
}
});
}
}Copy
运行结果
main 获取结果
getNow null
get 50
nioEventLoopGroup-2-1 获取结果
getNow 50Copy
Netty中的Future对象,可以通过EventLoop的sumbit()方法得到
Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果
public class NettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建EventLoop
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
// 创建Promise对象,用于存放结果
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 自定义线程向Promise中存放结果
promise.setSuccess(50);
}).start();
// 主线程从Promise中获取结果
System.out.println(Thread.currentThread().getName() + " " + promise.get());
}
}Copy
public class PipeLineServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 在socketChannel的pipeline中添加handler
// pipeline中handler是带有head与tail节点的双向链表,的实际结构为
// head <-> handler1 <-> ... <-> handler4 <->tail
// Inbound主要处理入站操作,一般为读操作,发生入站操作时会触发Inbound方法
// 入站时,handler是从head向后调用的
socketChannel.pipeline().addLast("handler1" ,new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(Thread.currentThread().getName() + " Inbound handler 1");
// 父类该方法内部会调用fireChannelRead
// 将数据传递给下一个handler
super.channelRead(ctx, msg);
}
});
socketChannel.pipeline().addLast("handler2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(Thread.currentThread().getName() + " Inbound handler 2");
// 执行write操作,使得Outbound的方法能够得到调用
socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));
super.channelRead(ctx, msg);
}
});
// Outbound主要处理出站操作,一般为写操作,发生出站操作时会触发Outbound方法
// 出站时,handler的调用是从tail向前调用的
socketChannel.pipeline().addLast("handler3" ,new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(Thread.currentThread().getName() + " Outbound handler 1");
super.write(ctx, msg, promise);
}
});
socketChannel.pipeline().addLast("handler4" ,new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(Thread.currentThread().getName() + " Outbound handler 2");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}Copy
运行结果如下
nioEventLoopGroup-2-2 Inbound handler 1
nioEventLoopGroup-2-2 Inbound handler 2
nioEventLoopGroup-2-2 Outbound handler 2
nioEventLoopGroup-2-2 Outbound handler 1Copy
通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler
handler需要放入通道的pipeline中,才能根据放入顺序来使用handler
具体结构如下
调用顺序如下
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从tail向前寻找OutboundHandler
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler
EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("4");
super.write(ctx, msg, promise);
}
};
// 用于测试Handler的Channel
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 执行Inbound操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
// 执行Outbound操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
}
}Copy
调试工具方法
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}Copy
该方法可以帮助我们更为详细地查看ByteBuf中的内容
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
ByteBufUtil.log(buffer);
// 向buffer中写入数据
StringBuilder sb = new StringBuilder();
for(int i = 0; i < 20; i++) {
sb.append("a");
}
buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
// 查看写入结果
ByteBufUtil.log(buffer);
}
}Copy
运行结果
read index:0 write index:0 capacity:16
read index:0 write index:20 capacity:64
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000010| 61 61 61 61 |aaaa |
+--------+-------------------------------------------------+----------------+Copy
ByteBuf通过ByteBufAllocator
选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小
当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作
如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()
来创建
通过该方法创建的ByteBuf,使用的是基于直接内存的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);Copy
可以使用下面的代码来创建池化基于堆的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);Copy
也可以使用下面的代码来创建池化基于直接内存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);Copy
验证
public class ByteBufStudy {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println(buffer.getClass());
buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
System.out.println(buffer.getClass());
buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
System.out.println(buffer.getClass());
}
}Copy
// 使用池化的直接内存
class io.netty.buffer.PooledUnsafeDirectByteBuf
// 使用池化的堆内存
class io.netty.buffer.PooledUnsafeHeapByteBuf
// 使用池化的直接内存
class io.netty.buffer.PooledUnsafeDirectByteBufCopy
池化的最大意义在于可以重用 ByteBuf,优点有
池化功能是否开启,可以通过下面的系统环境变量来设置
-Dio.netty.allocator.type={unpooled|pooled}Copy
ByteBuf主要有以下几个组成部分
最大容量与当前容量
java.lang.IndexOutOfBoundsException
异常读写操作不同于ByteBuffer只用position进行控制,
ByteBuf分别由读指针和写指针两个指针控制
。进行读写操作时,无需进行模式的切换
常用方法如下
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 | |
writeInt(int value) | 写入 int 值 | Big Endian(大端写入),即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian(小端写入),即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 | |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] | |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 | CharSequence为字符串类的父类,第二个参数为对应的字符集 |
注意
- 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
- 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)
使用方法
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
ByteBufUtil.log(buffer);
// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
ByteBufUtil.log(buffer);
buffer.writeInt(5);
ByteBufUtil.log(buffer);
buffer.writeIntLE(6);
ByteBufUtil.log(buffer);
buffer.writeLong(7);
ByteBufUtil.log(buffer);
}
}Copy
运行结果
read index:0 write index:0 capacity:16
read index:0 write index:4 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
read index:0 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
read index:0 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 |............ |
+--------+-------------------------------------------------+----------------+
read index:0 write index:20 capacity:20
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07 |.... |
+--------+-------------------------------------------------+----------------+Copy
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
当ByteBuf中的容量无法容纳写入的数据时,会进行扩容操作
buffer.writeLong(7);
ByteBufUtil.log(buffer);Copy
// 扩容前
read index:0 write index:12 capacity:16
...
// 扩容后
read index:0 write index:20 capacity:20
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07 |.... |
+--------+-------------------------------------------------+----------------+Copy
如何写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容
如果写入后数据大小超过 512 字节,则选择下一个 2
n
扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException
异常
Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(8) exceeds maxCapacity(20): PooledUnsafeDirectByteBuf(ridx: 0, widx: 20, cap: 20/20)
...Copy
读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针
如果需要重复读取,需要调用buffer.markReaderIndex()
对读指针进行标记,并通过buffer.resetReaderIndex()
将读指针恢复到mark标记的位置
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
buffer.writeInt(5);
// 读取4个字节
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
ByteBufUtil.log(buffer);
// 通过mark与reset实现重复读取
buffer.markReaderIndex();
System.out.println(buffer.readInt());
ByteBufUtil.log(buffer);
// 恢复到mark标记处
buffer.resetReaderIndex();
ByteBufUtil.log(buffer);
}
}Copy
1
2
3
4
read index:4 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 |.... |
+--------+-------------------------------------------------+----------------+
5
read index:8 write index:8 capacity:16
read index:4 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 |.... |
+--------+-------------------------------------------------+----------------+Copy
还有以 get 开头的一系列方法,这些方法不会改变读指针的位置
由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release
起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
入站 ByteBuf 处理原则
出站 ByteBuf 处理原则
异常处理原则
有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
while (!buffer.release()) {}Copy
当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中
TailConext中释放ByteBuf的源码
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
// 具体的释放方法
ReferenceCountUtil.release(msg);
}
}Copy
判断传过来的是否为ByteBuf,是的话才需要释放
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}Copy
ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针
得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用
修改原ByteBuf中的值,也会影响切片后得到的ByteBuf
public class TestSlice {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
// 将buffer分成两部分
ByteBuf slice1 = buffer.slice(0, 5);
ByteBuf slice2 = buffer.slice(5, 5);
// 需要让分片的buffer引用计数加一
// 避免原Buffer释放导致分片buffer无法使用
slice1.retain();
slice2.retain();
ByteBufUtil.log(slice1);
ByteBufUtil.log(slice2);
// 更改原始buffer中的值
System.out.println("===========修改原buffer中的值===========");
buffer.setByte(0,5);
System.out.println("===========打印slice1===========");
ByteBufUtil.log(slice1);
}
}Copy
运行结果
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 |..... |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a |..... |
+--------+-------------------------------------------------+----------------+
===========修改原buffer中的值===========
===========打印slice1===========
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 02 03 04 05 |..... |
+--------+-------------------------------------------------+----------------+Copy
public class StudyServer {
static final Logger log = LoggerFactory.getLogger(StudyServer.class);
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连接建立时会执行该方法
log.debug("connected {}", ctx.channel());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接断开时会执行该方法
log.debug("disconnect {}", ctx.channel());
super.channelInactive(ctx);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
log.debug("{} binding...", channelFuture.channel());
channelFuture.sync();
log.debug("{} bound...", channelFuture.channel());
// 关闭channel
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stopped");
}
}
public static void main(String[] args) {
new StudyServer().start();
}
}Copy
客户端代码
public class StudyClient {
static final Logger log = LoggerFactory.getLogger(StudyClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connected...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
// 每次发送16个字节的数据,共发送10次
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}Copy
服务器接收结果
7999 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x5b43ecb0, L:/127.0.0.1:8080 - R:/127.0.0.1:53797] READ: 160B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000020| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000030| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000040| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000050| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000060| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000070| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000080| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000090| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+Copy
可见虽然客户端是分别以16字节为单位,通过channel向服务器发送了10次数据,可是服务器端却只接收了一次,接收数据的大小为160B,即客户端发送的数据总大小,这就是粘包现象
将客户端-服务器之间的channel容量进行调整
服务器代码
// 调整channel的容量
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);Copy
注意
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10) 影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍
服务器接收结果
5901 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc73284f3, L:/127.0.0.1:8080 - R:/127.0.0.1:49679] READ: 36B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000020| 00 01 02 03 |.... |
+--------+-------------------------------------------------+----------------+
5901 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc73284f3, L:/127.0.0.1:8080 - R:/127.0.0.1:49679] READ: 40B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000010| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000020| 04 05 06 07 08 09 0a 0b |........ |
+--------+-------------------------------------------------+----------------+
5901 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc73284f3, L:/127.0.0.1:8080 - R:/127.0.0.1:49679] READ: 40B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 0c 0d 0e 0f 00 01 02 03 04 05 06 07 08 09 0a 0b |................|
|00000010| 0c 0d 0e 0f 00 01 02 03 04 05 06 07 08 09 0a 0b |................|
|00000020| 0c 0d 0e 0f 00 01 02 03 |........ |
+--------+-------------------------------------------------+----------------+
5901 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc73284f3, L:/127.0.0.1:8080 - R:/127.0.0.1:49679] READ: 40B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000010| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000020| 04 05 06 07 08 09 0a 0b |........ |
+--------+-------------------------------------------------+----------------+
5901 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc73284f3, L:/127.0.0.1:8080 - R:/127.0.0.1:49679] READ: 4B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 0c 0d 0e 0f |.... |
+--------+-------------------------------------------------+----------------+Copy
可见客户端每次发送的数据,因channel容量不足,无法将发送的数据一次性接收,便产生了半包现象
发生粘包与半包现象的本质是因为 TCP 是流式协议,消息无边界
客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象
客户端代码改进
修改channelActive方法
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
// 使用短链接,每次发送完毕后就断开连接
ctx.channel().close();
}Copy
将发送步骤整体封装为send()方法,调用10次send()方法,模拟发送10次数据
public static void main(String[] args) {
// 发送10次
for (int i = 0; i < 10; i++) {
send();
}
}Copy
运行结果
6452 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3eb6a684, L:/127.0.0.1:8080 - R:/127.0.0.1:65024] ACTIVE
6468 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3eb6a684, L:/127.0.0.1:8080 - R:/127.0.0.1:65024] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
6468 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3eb6a684, L:/127.0.0.1:8080 ! R:/127.0.0.1:65024] INACTIVE
6483 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x7dcc31ff, L:/127.0.0.1:8080 - R:/127.0.0.1:65057] ACTIVE
6483 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x7dcc31ff, L:/127.0.0.1:8080 - R:/127.0.0.1:65057] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
6483 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x7dcc31ff, L:/127.0.0.1:8080 ! R:/127.0.0.1:65057] INACTIVE
...Copy
客户端先于服务器建立连接,此时控制台打印ACTIVE
,之后客户端向服务器发送了16B的数据,发送后断开连接,此时控制台打印INACTIVE
,可见未出现粘包现象
客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder
对数据进行定长解码,具体使用方法如下
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));Copy
客户端代码
客户端发送数据的代码如下
// 约定最大长度为16
final int maxLength = 16;
// 被发送的数据
char c = 'a';
// 向服务器发送10个报文
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer(maxLength);
// 定长byte数组,未使用部分会以0进行填充
byte[] bytes = new byte[maxLength];
// 生成长度为0~15的数据
for (int j = 0; j < (int)(Math.random()*(maxLength-1)); j++) {
bytes[j] = (byte) c;
}
buffer.writeBytes(bytes);
c++;
// 将数据发送给服务器
ctx.writeAndFlush(buffer);
}Copy
服务器代码
使用FixedLengthFrameDecoder
对粘包数据进行拆分,该handler需要添加在LoggingHandler
之前,保证数据被打印时已被拆分
// 通过定长解码器对粘包数据进行拆分
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));Copy
运行结果
8222 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xbc122d07, L:/127.0.0.1:8080 - R:/127.0.0.1:52954] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 00 00 00 00 00 00 00 00 00 00 00 00 |aaaa............|
+--------+-------------------------------------------------+----------------+
8222 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xbc122d07, L:/127.0.0.1:8080 - R:/127.0.0.1:52954] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 00 00 00 00 00 00 00 00 00 00 00 00 00 |bbb.............|
+--------+-------------------------------------------------+----------------+
8222 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xbc122d07, L:/127.0.0.1:8080 - R:/127.0.0.1:52954] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |cc..............|
+--------+-------------------------------------------------+----------------+
...Copy
行解码器的是通过分隔符对数据进行拆分来解决粘包半包问题的
可以通过LineBasedFrameDecoder(int maxLength)
来拆分以换行符(\n)为分隔符的数据,也可以通过DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)
来指定通过什么分隔符来拆分数据(可以传入多个分隔符)
两种解码器都需要传入数据的最大长度,若超出最大长度,会抛出TooLongFrameException
异常
以换行符 \n 为分隔符
客户端代码
// 约定最大长度为 64
final int maxLength = 64;
// 被发送的数据
char c = 'a';
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer(maxLength);
// 生成长度为0~62的数据
Random random = new Random();
StringBuilder sb = new StringBuilder();
for (int j = 0; j < (int)(random.nextInt(maxLength-2)); j++) {
sb.append(c);
}
// 数据以 \n 结尾
sb.append("\n");
buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
c++;
// 将数据发送给服务器
ctx.writeAndFlush(buffer);
}Copy
服务器代码
// 通过行解码器对粘包数据进行拆分,以 \n 为分隔符
// 需要指定最大长度
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(64));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));Copy
运行结果
4184 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9d6ac701, L:/127.0.0.1:8080 - R:/127.0.0.1:58282] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaa |
+--------+-------------------------------------------------+----------------+
4184 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9d6ac701, L:/127.0.0.1:8080 - R:/127.0.0.1:58282] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 62 62 62 62 62 62 62 62 |bbbbbbbbbbb |
+--------+-------------------------------------------------+----------------+
4184 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9d6ac701, L:/127.0.0.1:8080 - R:/127.0.0.1:58282] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 |cc |
+--------+-------------------------------------------------+----------------+
...Copy
以自定义分隔符 \c 为分隔符
客户端代码
...
// 数据以 \c 结尾
sb.append("\\c");
buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
...Copy
服务器代码
// 将分隔符放入ByteBuf中
ByteBuf bufSet = ch.alloc().buffer().writeBytes("\\c".getBytes(StandardCharsets.UTF_8));
// 通过行解码器对粘包数据进行拆分,以 \c 为分隔符
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(64, ch.alloc().buffer().writeBytes(bufSet)));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));Copy
运行结果
8246 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x86215ccd, L:/127.0.0.1:8080 - R:/127.0.0.1:65159] READ: 14B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaa |
+--------+-------------------------------------------------+----------------+
8247 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x86215ccd, L:/127.0.0.1:8080 - R:/127.0.0.1:65159] READ: 3B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 |bbb |
+--------+-------------------------------------------------+----------------+
...Copy
在传送数据时可以在数据中添加一个用于表示有用数据长度的字段,在解码时读取出这个用于表明长度的字段,同时读取其他相关参数,即可知道最终需要的数据是什么样子的
LengthFieldBasedFrameDecoder
解码器可以提供更为丰富的拆分方法,其构造方法有五个参数
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip)Copy
参数解析
参数图解
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 0 (= do not strip header)
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+Copy
从0开始即为长度标识,长度标识长度为2个字节
0x000C 即为后面 HELLO, WORLD
的长度
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 2 (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+Copy
从0开始即为长度标识,长度标识长度为2个字节,读取时从第二个字节开始读取(此处即跳过长度标识)
因为跳过了用于表示长度的2个字节,所以此处直接读取HELLO, WORLD
lengthFieldOffset = 2 (= the length of Header 1)
lengthFieldLength = 3
lengthAdjustment = 0
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+Copy
长度标识前面还有2个字节的其他内容(0xCAFE),第三个字节开始才是长度标识,长度表示长度为3个字节(0x00000C)
Header1中有附加信息,读取长度标识时需要跳过这些附加信息来获取长度
lengthFieldOffset = 0
lengthFieldLength = 3
lengthAdjustment = 2 (= the length of Header 1)
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
| 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+Copy
从0开始即为长度标识,长度标识长度为3个字节,长度标识之后还有2个字节的其他内容(0xCAFE)
长度标识(0x00000C)表示的是从其后lengthAdjustment(2个字节)开始的数据的长度,即HELLO, WORLD
,不包括0xCAFE
lengthFieldOffset = 1 (= the length of HDR1)
lengthFieldLength = 2
lengthAdjustment = 1 (= the length of HDR2)
initialBytesToStrip = 3 (= the length of HDR1 + LEN)
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+Copy
长度标识前面有1个字节的其他内容,后面也有1个字节的其他内容,读取时从长度标识之后3个字节处开始读取,即读取 0xFE HELLO, WORLD
使用
通过 EmbeddedChannel 对 handler 进行测试
public class EncoderStudy {
public static void main(String[] args) {
// 模拟服务器
// 使用EmbeddedChannel测试handler
EmbeddedChannel channel = new EmbeddedChannel(
// 数据最大长度为1KB,长度标识前后各有1个字节的附加信息,长度标识长度为4个字节(int)
new LengthFieldBasedFrameDecoder(1024, 1, 4, 1, 0),
new LoggingHandler(LogLevel.DEBUG)
);
// 模拟客户端,写入数据
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
send(buffer, "Hello");
channel.writeInbound(buffer);
send(buffer, "World");
channel.writeInbound(buffer);
}
private static void send(ByteBuf buf, String msg) {
// 得到数据的长度
int length = msg.length();
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
// 将数据信息写入buf
// 写入长度标识前的其他信息
buf.writeByte(0xCA);
// 写入数据长度标识
buf.writeInt(length);
// 写入长度标识后的其他信息
buf.writeByte(0xFE);
// 写入具体的数据
buf.writeBytes(bytes);
}
}Copy
运行结果
146 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ca 00 00 00 05 fe 48 65 6c 6c 6f |......Hello |
+--------+-------------------------------------------------+----------------+
146 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ca 00 00 00 05 fe 57 6f 72 6c 64 |......World |
+--------+-------------------------------------------------+----------------+Copy
TCP/IP 中消息传输基于流的方式,没有边界
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
如果我们要向Redis服务器发送一条set name Nyima
的指令,需要遵守如下协议
// 该指令一共有3部分,每条指令之后都要添加回车与换行符
*3\r\n
// 第一个指令的长度是3
$3\r\n
// 第一个指令是set指令
set\r\n
// 下面的指令以此类推
$4\r\n
name\r\n
$5\r\n
Nyima\r\nCopy
客户端代码如下
public class RedisClient {
static final Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 打印日志
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 回车与换行符
final byte[] LINE = {'\r','\n'};
// 获得ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 连接建立后,向Redis中发送一条指令,注意添加回车与换行
// set name Nyima
buffer.writeBytes("*3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("set".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$5".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("Nyima".getBytes());
buffer.writeBytes(LINE);
ctx.writeAndFlush(buffer);
}
});
}
})
.connect(new InetSocketAddress("localhost", 6379));
channelFuture.sync();
// 关闭channel
channelFuture.channel().close().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭group
group.shutdownGracefully();
}
}
}Copy
控制台打印结果
1600 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x28c994f1, L:/127.0.0.1:60792 - R:localhost/127.0.0.1:6379] WRITE: 34B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 2a 33 0d 0a 24 33 0d 0a 73 65 74 0d 0a 24 34 0d |*3..$3..set..$4.|
|00000010| 0a 6e 61 6d 65 0d 0a 24 35 0d 0a 4e 79 69 6d 61 |.name..$5..Nyima|
|00000020| 0d 0a |.. |
+--------+-------------------------------------------------+----------------+Copy
Redis中查询执行结果
HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用HttpServerCodec
作为服务器端的解码器与编码器,来处理HTTP请求
// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder
// Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodecCopy
服务器代码
public class HttpServer {
static final Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// 作为服务器,使用 HttpServerCodec 作为编码器与解码器
ch.pipeline().addLast(new HttpServerCodec());
// 服务器只处理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
// 获得请求uri
log.debug(msg.uri());
// 获得完整响应,设置版本号与状态码
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,避免浏览器一直接收响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);
// 写回响应
ctx.writeAndFlush(response);
}
});
}
})
.bind(8080);
}
}Copy
服务器负责处理请求并响应浏览器。所以只需要处理HTTP请求即可
// 服务器只处理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()Copy
获得请求后,需要返回响应给浏览器。需要创建响应对象DefaultFullHttpResponse
,设置HTTP版本号及状态码,为避免浏览器获得响应后,因为获得CONTENT_LENGTH
而一直空转,需要添加CONTENT_LENGTH
字段,表明响应体中数据的具体长度
// 获得完整响应,设置版本号与状态码
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,避免浏览器一直接收响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);Copy
运行结果
浏览器
控制台
// 请求内容
1714 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] READ: 688B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
|00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.|
|00000030| 0a 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 |.Connection: kee|
|00000040| 70 2d 61 6c 69 76 65 0d 0a 50 72 61 67 6d 61 3a |p-alive..Pragma:|
....
// 响应内容
1716 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] WRITE: 61B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 43 6f 6e 74 65 6e 74 2d 4c 65 6e 67 74 68 3a |.Content-Length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 57 6f 72 6c 64 21 3c 2f 68 31 3e |, World!</h1> |
+--------+-------------------------------------------------+----------------+Copy
魔数:用来在第一时间判定接收的数据是否为无效数据包
版本号:可以支持协议的升级
序列化算法
:消息正文到底采用哪种序列化反序列化方式
指令类型:是登录、注册、单聊、群聊… 跟业务相关
请求序号:为了双工通信,提供异步能力
正文长度
消息正文
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 设置魔数 4个字节
out.writeBytes(new byte[]{'N','Y','I','M'});
// 设置版本号 1个字节
out.writeByte(1);
// 设置序列化方式 1个字节
out.writeByte(1);
// 设置指令类型 1个字节
out.writeByte(msg.getMessageType());
// 设置请求序号 4个字节
out.writeInt(msg.getSequenceId());
// 为了补齐为16个字节,填充1个字节的数据
out.writeByte(0xff);
// 获得序列化后的msg
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 获得并设置正文长度 长度用4个字节标识
out.writeInt(bytes.length);
// 设置消息正文
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 获取魔数
int magic = in.readInt();
// 获取版本号
byte version = in.readByte();
// 获得序列化方式
byte seqType = in.readByte();
// 获得指令类型
byte messageType = in.readByte();
// 获得请求序号
int sequenceId = in.readInt();
// 移除补齐字节
in.readByte();
// 获得正文长度
int length = in.readInt();
// 获得正文
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
// 将信息放入List中,传递给下一个handler
out.add(message);
// 打印获得的信息正文
System.out.println("===========魔数===========");
System.out.println(magic);
System.out.println("===========版本号===========");
System.out.println(version);
System.out.println("===========序列化方法===========");
System.out.println(seqType);
System.out.println("===========指令类型===========");
System.out.println(messageType);
System.out.println("===========请求序号===========");
System.out.println(sequenceId);
System.out.println("===========正文长度===========");
System.out.println(length);
System.out.println("===========正文===========");
System.out.println(message);
}
}Copy
编码器与解码器方法源于父类ByteToMessageCodec,通过该类可以自定义编码器与解码器,泛型类型为被编码与被解码的类。此处使用了自定义类Message,代表消息
public class MessageCodec extends ByteToMessageCodec<Message>Copy
编码器负责将附加信息与正文信息写入到ByteBuf中,其中附加信息总字节数最好为2n,不足需要补齐。正文内容如果为对象,需要通过序列化将其放入到ByteBuf中
解码器负责将ByteBuf中的信息取出,并放入List中,该List用于将信息传递给下一个handler
编写测试类
public class TestCodec {
static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel();
// 添加解码器,避免粘包半包问题
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
channel.pipeline().addLast(new MessageCodec());
LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");
// 测试编码与解码
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, user, byteBuf);
channel.writeInbound(byteBuf);
}
}Copy
运行结果
为了提高handler的复用率,可以将handler创建为handler对象,然后在不同的channel中使用该handler对象进行处理操作
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的channel中使用同一个handler对象,提高复用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);Copy
但是并不是所有的handler都能通过这种方法来提高复用率的,例如LengthFieldBasedFrameDecoder
。如果多个channel中使用同一个LengthFieldBasedFrameDecoder对象,则可能发生如下问题
为了提高handler的复用率,同时又避免出现一些并发问题,Netty中原生的handler中用@Sharable
注解来标明,该handler能否在多个channel中共享。
只有带有该注解,才能通过对象的方式被共享,否则无法被共享
这需要根据自定义的handler的处理逻辑进行分析
我们的MessageCodec本身接收的是LengthFieldBasedFrameDecoder处理之后的数据,那么数据肯定是完整的,按分析来说是可以添加@Sharable注解的
但是实际情况我们并不能添加该注解,会抛出异常信息ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared
因为MessageCodec继承自ByteToMessageCodec,ByteToMessageCodec类的注解如下
这就意味着ByteToMessageCodec不能被多个channel所共享的
如果想要共享,需要怎么办呢?
继承MessageToMessageDecoder即可。该类的目标是:将已经被处理的完整数据再次被处理。传过来的Message如果是被处理过的完整数据,那么被共享也就不会出现问题了,也就可以使用@Sharable注解了。实现方式与ByteToMessageCodec类似
@ChannelHandler.Sharable
public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
...
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
...
}
}Copy
public interface UserService {
/**
* 登录
* @param username 用户名
* @param password 密码
* @return 登录成功返回 true, 否则返回 false
*/
boolean login(String username, String password);
}Copy
public interface Session {
/**
* 绑定会话
* @param channel 哪个 channel 要绑定会话
* @param username 会话绑定用户
*/
void bind(Channel channel, String username);
/**
* 解绑会话
* @param channel 哪个 channel 要解绑会话
*/
void unbind(Channel channel);
/**
* 获取属性
* @param channel 哪个 channel
* @param name 属性名
* @return 属性值
*/
Object getAttribute(Channel channel, String name);
/**
* 设置属性
* @param channel 哪个 channel
* @param name 属性名
* @param value 属性值
*/
void setAttribute(Channel channel, String name, Object value);
/**
* 根据用户名获取 channel
* @param username 用户名
* @return channel
*/
Channel getChannel(String username);
}Copy
public interface GroupSession {
/**
* 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
* @param name 组名
* @param members 成员
* @return 成功时返回组对象, 失败返回 null
*/
Group createGroup(String name, Set<String> members);
/**
* 加入聊天组
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group joinMember(String name, String member);
/**
* 移除组成员
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeMember(String name, String member);
/**
* 移除聊天组
* @param name 组名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeGroup(String name);
/**
* 获取组成员
* @param name 组名
* @return 成员集合, 如果群不存在或没有成员会返回 empty set
*/
Set<String> getMembers(String name);
/**
* 获取组成员的 channel 集合, 只有在线的 channel 才会返回
* @param name 组名
* @return 成员 channel 集合
*/
List<Channel> getMembersChannel(String name);
/**
* 判断群聊是否一被创建
* @param name 群聊名称
* @return 是否存在
*/
boolean isCreated(String name);
}Copy
public class ChatClient {
static final Logger log = LoggerFactory.getLogger(ChatClient.class);
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageSharableCodec);
}
});
Channel channel = bootstrap.connect().sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}Copy
public class ChatServer {
static final Logger log = LoggerFactory.getLogger(ChatServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageSharableCodec);
}
});
Channel channel = bootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}Copy
客户端添加如下handler,分别处理登录、聊天等操作
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
/**
* 创建连接时执行的处理器,用于执行登陆操作
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 开辟额外线程,用于用户登陆及后续操作
new Thread(()->{
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名");
String username = scanner.next();
System.out.println("请输入密码");
String password = scanner.next();
// 创建包含登录信息的请求体
LoginRequestMessage message = new LoginRequestMessage(username, password);
// 发送到channel中
ctx.writeAndFlush(message);
System.out.println("等待后续操作...");
// 阻塞,直到登陆成功后CountDownLatch被设置为0
try {
waitLogin.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 执行后续操作
if (!loginStatus.get()) {
// 登陆失败,关闭channel并返回
ctx.channel().close();
return;
}
// 登录成功后,执行其他操作
while (true) {
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
String command = scanner.nextLine();
// 获得指令及其参数,并发送对应类型消息
String[] commands = command.split(" ");
switch (commands[0]){
case "send":
ctx.writeAndFlush(new ChatRequestMessage(username, commands[1], commands[2]));
break;
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(username,commands[1], commands[2]));
break;
case "gcreate":
// 分割,获得群员名
String[] members = commands[2].split(",");
Set<String> set = new HashSet<>(Arrays.asList(members));
// 把自己加入到群聊中
set.add(username);
ctx.writeAndFlush(new GroupCreateRequestMessage(commands[1],set));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(commands[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, commands[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, commands[1]));
break;
case "quit":
ctx.channel().close();
return;
default:
System.out.println("指令有误,请重新输入");
continue;
}
}
}, "login channel").start();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("{}", msg);
if (msg instanceof LoginResponseMessage) {
// 如果是登录响应信息
LoginResponseMessage message = (LoginResponseMessage) msg;
boolean isSuccess = message.isSuccess();
// 登录成功,设置登陆标记
if (isSuccess) {
loginStatus.set(true);
}
// 登陆后,唤醒登陆线程
waitLogin.countDown();
}
}
});Copy
服务器添加如下handler,并添加到对应的channel中,负责处理登录请求信息,并作出响应
@ChannelHandler.Sharable // 必须添加该注解
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
// 获得登录信息
String username = msg.getUsername();
String password = msg.getPassword();
// 校验登录信息
boolean login = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage message;
if (login) {
message = new LoginResponseMessage(true, "登陆成功");
// 绑定channel与user
SessionFactory.getSession().bind(ctx.channel(), username);
} else {
message = new LoginResponseMessage(false, "登陆失败");
}
ctx.writeAndFlush(message);
}
}Copy
// 该handler处理登录请求
LoginRequestMessageHandler loginRequestMessageHandler = new LoginRequestMessageHandler();
ch.pipeline().addLast(new LoginRequestMessageHandler());Copy
客户端
5665 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - 1314474317, 1, 1, 1, 0, 279
5667 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - message:AbstractResponseMessage{success=true, reason='登陆成功'}
5667 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='登陆成功'}
successCopy
服务器
11919 [nioEventLoopGroup-3-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - 1314474317, 1, 1, 0, 0, 217
11919 [nioEventLoopGroup-3-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - message:LoginRequestMessage{username='Nyima', password='123'}
7946 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x8e7c07f6, L:/127.0.0.1:8080 - R:/127.0.0.1:60572] WRITE: 295B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 4e 59 49 4d 01 01 01 00 00 00 00 ff 00 00 01 17 |NYIM............|
|00000010| ac ed 00 05 73 72 00 31 63 6e 2e 6e 79 69 6d 61 |....sr.1cn.nyima|
|00000020| 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes|
|00000030| 73 61 67 65 2e 4c 6f 67 69 6e 52 65 73 70 6f 6e |sage.LoginRespon|
|00000040| 73 65 4d 65 73 73 61 67 65 e2 34 49 24 72 52 f3 |seMessage.4I$rR.|
|00000050| 07 02 00 00 78 72 00 34 63 6e 2e 6e 79 69 6d 61 |....xr.4cn.nyima|
|00000060| 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes|
|00000070| 73 61 67 65 2e 41 62 73 74 72 61 63 74 52 65 73 |sage.AbstractRes|
|00000080| 70 6f 6e 73 65 4d 65 73 73 61 67 65 b3 7e 19 32 |ponseMessage.~.2|
|00000090| 9b 88 4d 7b 02 00 02 5a 00 07 73 75 63 63 65 73 |..M{...Z..succes|
|000000a0| 73 4c 00 06 72 65 61 73 6f 6e 74 00 12 4c 6a 61 |sL..reasont..Lja|
|000000b0| 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 78 |va/lang/String;x|
|000000c0| 72 00 24 63 6e 2e 6e 79 69 6d 61 63 2e 73 74 75 |r.$cn.nyimac.stu|
|000000d0| 64 79 2e 64 61 79 38 2e 6d 65 73 73 61 67 65 2e |dy.day8.message.|
|000000e0| 4d 65 73 73 61 67 65 dd e9 84 b7 21 db 18 52 02 |Message....!..R.|
|000000f0| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType|
|00000100| 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.|
|00000110| 00 00 00 00 00 00 00 01 74 00 0c e7 99 bb e9 99 |........t.......|
|00000120| 86 e6 88 90 e5 8a 9f |....... |
+--------+-------------------------------------------------+----------------+Copy
客户端输入send username content
即可发送单聊消息,需要服务器端添加处理ChatRequestMessage的handler
@ChannelHandler.Sharable // 必须添加该注解
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
// 获得user所在的channel
Channel channel = SessionFactory.getSession().getChannel(msg.getTo());
// 如果双方都在线
if (channel != null) {
// 通过接收方与服务器之间的channel发送信息
channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
} else {
// 通过发送方与服务器之间的channel发送消息
ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或离线,发送失败"));
}
}
}Copy
// 该handler处理单聊请求
ChatRequestMessageHandler chatRequestMessageHandler = new ChatRequestMessageHandler();
ch.pipeline().addLast(chatRequestMessageHandler);Copy
运行结果
发送方(zhangsan)
send Nyima helloCopy
接收方(Nyima)
// 收到zhangsan发来的消息
20230 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - ChatResponseMessage{from='zhangsan', content='hello'}Copy
添加处理GroupCreateRequestMessage
的handler
@ChannelHandler.Sharable
public class GroupCreateMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
// 获得要创建的群聊名
String groupName = msg.getGroupName();
// 获得要创建的群聊的成员组
Set<String> members = msg.getMembers();
// 判断该群聊是否创建过,未创建返回null并创建群聊
Group group = GroupSessionFactory.getGroupSession().createGroup(groupName, members);
if (group == null) {
// 发送创建成功消息
GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage(true, groupName + "创建成功");
ctx.writeAndFlush(groupCreateResponseMessage);
// 获得在线群员的channel,给群员发送入群聊消息
List<Channel> membersChannel = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);
groupCreateResponseMessage = new GroupCreateResponseMessage(true, "您已被拉入"+groupName);
// 给每个在线群员发送消息
for(Channel channel : membersChannel) {
channel.writeAndFlush(groupCreateResponseMessage);
}
} else {
// 发送失败消息
GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage(false, groupName + "已存在");
ctx.writeAndFlush(groupCreateResponseMessage);
}
}
}Copy
// 该handler处理创建群聊请求
GroupCreateMessageHandler groupCreateMessageHandler = new GroupCreateMessageHandler();
ch.pipeline().addLast(groupCreateMessageHandler);Copy
运行结果
创建者客户端
// 首次创建
gcreate Netty学习 zhangsan,lisi
31649 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='Netty学习创建成功'}
15244 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='您已被拉入Netty学习'}
// 再次创建
gcreate Netty学习 zhangsan,lisi
40771 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='Netty学习已存在'}Copy
群员客户端
28788 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='您已被拉入Netty学习'}Copy
@ChannelHandler.Sharable
public class GroupChatMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
GroupSession groupSession = GroupSessionFactory.getGroupSession();
// 判断群聊是否存在
boolean isCreated = groupSession.isCreated(groupName);
if (isCreated) {
// 给群员发送信息
List<Channel> membersChannel = groupSession.getMembersChannel(groupName);
for(Channel channel : membersChannel) {
channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
}
} else {
ctx.writeAndFlush(new GroupChatResponseMessage(false, "群聊不存在"));
}
}
}Copy
// 该handler处理群聊聊天
GroupChatMessageHandler groupChatMessageHandler = new GroupChatMessageHandler();
ch.pipeline().addLast(groupChatMessageHandler);Copy
运行结果
发送方(群聊存在)
gsend Netty学习 你们好
45408 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupChatResponseMessage{from='zhangsan', content='你们好'}Copy
接收方
48082 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupChatResponseMessage{from='zhangsan', content='你们好'}Copy
发送方(群聊不存在)
gsend Spring学习 你们好
25140 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='群聊不存在'}Copy
@ChannelHandler.Sharable
public class GroupJoinMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
GroupSession groupSession = GroupSessionFactory.getGroupSession();
// 判断该用户是否在群聊中
Set<String> members = groupSession.getMembers(msg.getGroupName());
boolean joinFlag = false;
// 群聊存在且用户未加入,才能加入
if (!members.contains(msg.getUsername()) && groupSession.isCreated(msg.getGroupName())) {
joinFlag = true;
}
if (joinFlag) {
// 加入群聊
groupSession.joinMember(msg.getGroupName(), msg.getUsername());
ctx.writeAndFlush(new GroupJoinResponseMessage(true,"加入"+msg.getGroupName()+"成功"));
} else {
ctx.writeAndFlush(new GroupJoinResponseMessage(false, "加入失败,群聊未存在或您已加入该群聊"));
}
}
}Copy
// 该handler处理加入群聊
GroupJoinMessageHandler groupJoinMessageHandler = new GroupJoinMessageHandler();
ch.pipeline().addLast(groupJoinMessageHandler);Copy
运行结果
正常加入群聊
94921 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='加入Netty学习成功'}Copy
加入不能存在或已加入的群聊
44025 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='加入失败,群聊未存在或您已加入该群聊'}Copy
@ChannelHandler.Sharable
public class GroupQuitMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
GroupSession groupSession = GroupSessionFactory.getGroupSession();
String groupName = msg.getGroupName();
Set<String> members = groupSession.getMembers(groupName);
String username = msg.getUsername();
// 判断用户是否在群聊中以及群聊是否存在
boolean joinFlag = false;
if (groupSession.isCreated(groupName) && members.contains(username)) {
// 可以退出
joinFlag = true;
}
if (joinFlag) {
// 退出成功
groupSession.removeMember(groupName, username);
ctx.writeAndFlush(new GroupQuitResponseMessage(true, "退出"+groupName+"成功"));
} else {
// 退出失败
ctx.writeAndFlush(new GroupQuitResponseMessage(false, "群聊不存在或您未加入该群,退出"+groupName+"失败"));
}
}
}Copy
// 该handler处理退出群聊
GroupQuitMessageHandler groupQuitMessageHandler = new GroupQuitMessageHandler();
ch.pipeline().addLast(groupQuitMessageHandler);Copy
运行结果
正常退出
32282 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='退出Netty学习成功'}Copy
退出不存在或未加入的群聊
67404 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='群聊不存在或您未加入该群,退出Netty失败'}Copy
@ChannelHandler.Sharable
public class GroupMembersMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
ctx.writeAndFlush(new GroupMembersResponseMessage(GroupSessionFactory.getGroupSession().getMembers(msg.getGroupName())));
}
}Copy
// 该handler处理查看成员
GroupMembersMessageHandler groupMembersMessageHandler = new GroupMembersMessageHandler();
ch.pipeline().addLast(groupMembersMessageHandler);Copy
运行结果
46557 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupMembersResponseMessage{members=[zhangsan, Nyima]}Copy
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
/**
* 断开连接时触发 Inactive事件
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 解绑
SessionFactory.getSession().unbind(ctx.channel());
}
/**
* 异常退出,需要解绑
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 解绑
SessionFactory.getSession().unbind(ctx.channel());
}
}Copy
// 该handler处理退出聊天室
ch.pipeline().addLast(quitHandler);
GroupMembersMessageHandler groupMembersMessageHandler = new GroupMembersMessageHandler();Copy
退出时,客户端会关闭channel并返回
case "quit":
// 关闭channel并返回
ctx.channel().close();
return;Copy
原因
问题
可以添加IdleStateHandler
对空闲时间进行检测,通过构造函数可以传入三个参数
当指定时间内未发生读或写事件时,会触发特定事件
READER_IDLE
WRITE_IDLE
ALL_IDEL
想要处理这些事件,需要自定义事件处理函数
服务器端代码
// 用于空闲连接的检测,5s内未读到数据,会触发READ_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
// 添加双向处理器,负责处理READER_IDLE事件
ch.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 获得事件
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 断开连接
ctx.channel().close();
}
}
});Copy
使用IdleStateHandler
进行空闲检测
使用双向处理器
ChannelDuplexHandler
对入站与出站事件进行处理
IdleStateHandler
中的事件为特殊事件,需要实现ChannelDuplexHandler
的userEventTriggered
方法,判断事件类型并自定义处理方式,来对事件进行处理为避免因非网络等原因引发的READ_IDLE事件,比如网络情况良好,只是用户本身没有输入数据,这时发生READ_IDLE事件,直接让服务器断开连接是不可取的
为避免此类情况,需要在客户端向服务器发送心跳包,发送频率要小于服务器设置的IdleTimeSeconds
,一般设置为其值的一半
客户端代码
// 发送心跳包,让服务器知道客户端在线
// 3s未发生WRITER_IDLE,就像服务器发送心跳包
// 该值为服务器端设置的READER_IDLE触发时间的一半左右
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
ch.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
// 发送心跳包
ctx.writeAndFlush(new PingMessage());
}
}
});Copy
public interface Serializer {
/**
* 序列化
* @param object 被序列化的对象
* @param <T> 被序列化对象类型
* @return 序列化后的字节数组
*/
<T> byte[] serialize(T object);
/**
* 反序列化
* @param clazz 反序列化的目标类的Class对象
* @param bytes 被反序列化的字节数组
* @param <T> 反序列化目标类
* @return 反序列化后的对象
*/
<T> T deserialize(Class<T> clazz, byte[] bytes);
}Copy
public enum SerializerAlgorithm implements Serializer {
// Java的序列化和反序列化
Java {
@Override
public <T> byte[] serialize(T object) {
// 序列化后的字节数组
byte[] bytes = null;
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(object);
bytes = bos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return bytes;
}
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
T target = null;
System.out.println(Arrays.toString(bytes));
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis)) {
target = (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
// 返回反序列化后的对象
return target;
}
}
// Json的序列化和反序列化
Json {
@Override
public <T> byte[] serialize(T object) {
String s = new Gson().toJson(object);
System.out.println(s);
// 指定字符集,获得字节数组
return s.getBytes(StandardCharsets.UTF_8);
}
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
String s = new String(bytes, StandardCharsets.UTF_8);
System.out.println(s);
// 此处的clazz为具体类型的Class对象,而不是父类Message的
return new Gson().fromJson(s, clazz);
}
}
}Copy
编码
// 获得序列化后的msg
// 使用指定的序列化方式
SerializerAlgorithm[] values = SerializerAlgorithm.values();
// 获得序列化后的对象
byte[] bytes = values[out.getByte(5)-1].serialize(msg);Copy
解码
// 获得反序列化方式
SerializerAlgorithm[] values = SerializerAlgorithm.values();
// 通过指定方式进行反序列化
// 需要通过Message的方法获得具体的消息类型
Message message = values[seqType-1].deserialize(Message.getMessageClass(messageType), bytes);Copy
public class TestParam {
public static void main(String[] args) {
// SocketChannel 5s内未建立连接就抛出异常
new Bootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
// ServerSocketChannel 5s内未建立连接就抛出异常
new ServerBootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000);
// SocketChannel 5s内未建立连接就抛出异常
new ServerBootstrap().childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
}
}Copy
客户端通过 Bootstrap.option
函数来配置参数,配置参数作用于 SocketChannel
服务器通过
ServerBootstrap
来配置参数,但是对于不同的 Channel 需要选择不同的方法
option
来配置 ServerSocketChannel 上的参数childOption
来配置 SocketChannel 上的参数客户端中连接服务器的线程是 NIO 线程,抛出异常的是主线程。这是如何做到超时判断以及线程通信的呢?
AbstractNioChannel.AbstractNioUnsafe.connect
方法中
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
...
// Schedule connect timeout.
// 设置超时时间,通过option方法传入的CONNECT_TIMEOUT_MILLIS参数进行设置
int connectTimeoutMillis = config().getConnectTimeoutMillis();
// 如果超时时间大于0
if (connectTimeoutMillis > 0) {
// 创建一个定时任务,延时connectTimeoutMillis(设置的超时时间时间)后执行
// schedule(Runnable command, long delay, TimeUnit unit)
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
// 判断是否建立连接,Promise进行NIO线程与主线程之间的通信
// 如果超时,则通过tryFailure方法将异常放入Promise中
// 在主线程中抛出
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
...
}Copy
超时的判断主要是通过 Eventloop 的 schedule 方法和 Promise 共同实现的
connectTimeoutMillis
秒后执行该方法ConnectTimeoutException
异常,并将异常通过 Pormise 传给主线程并抛出该参数是 ServerSocketChannel 的参数
第一次握手时,因为客户端与服务器之间的连接还未完全建立,连接会被放入半连接队列中
当完成三次握手以后,连接会被放入全连接队列中
服务器处理Accept事件是在TCP三次握手,也就是建立连接之后。服务器会从全连接队列中获取连接并进行处理
在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 linux 2.2 之后,分别用下面两个参数来控制
syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略在Netty中,SO_BACKLOG
主要用于设置全连接队列的大小。当处理Accept的速率小于连接建立的速率时,全连接队列中堆积的连接数大于SO_BACKLOG
设置的值是,便会抛出异常
设置方式如下
// 设置全连接队列,大小为2
new ServerBootstrap().option(ChannelOption.SO_BACKLOG, 2);Copy
backlog参数在NioSocketChannel.doBind
方法被使用
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}Copy
其中backlog被保存在了DefaultServerSocketChannelConfig
配置类中
private volatile int backlog = NetUtil.SOMAXCONN;Copy
具体的赋值操作如下
SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
// Determine the default somaxconn (server socket backlog) value of the platform.
// The known defaults:
// - Windows NT Server 4.0+: 200
// - Linux and Mac OS X: 128
int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
File file = new File("/proc/sys/net/core/somaxconn");
BufferedReader in = null;
try {
// file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
// try / catch block.
// See https://github.com/netty/netty/issues/4936
if (file.exists()) {
in = new BufferedReader(new FileReader(file));
// 将somaxconn设置为Linux配置文件中设置的值
somaxconn = Integer.parseInt(in.readLine());
if (logger.isDebugEnabled()) {
logger.debug("{}: {}", file, somaxconn);
}
} else {
...
}
...
}
// 返回backlog的值
return somaxconn;
}
}Copy
backlog的值会根据操作系统的不同,来
选择不同的默认值
如果配置文件/proc/sys/net/core/somaxconn
存在,会读取配置文件中的值,并将backlog的值设置为配置文件中指定的
// 选择ALLOCATOR参数,设置SocketChannel中分配的ByteBuf类型
// 第二个参数需要传入一个ByteBufAllocator,用于指定生成的 ByteBuf 的类型
new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator());Copy
ByteBufAllocator类型
池化并使用直接内存
// true表示使用直接内存
new PooledByteBufAllocator(true);Copy
池化并使用堆内存
// false表示使用堆内存
new PooledByteBufAllocator(false);Copy
非池化并使用直接内存
// ture表示使用直接内存
new UnpooledByteBufAllocator(true);Copy
非池化并使用堆内存
// false表示使用堆内存
new UnpooledByteBufAllocator(false);Copy
在聊天室代码的基础上进行一定的改进
Message中添加如下代码
public abstract class Message implements Serializable {
...
// 添加RPC消息类型
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
static {
// 将消息类型放入消息类对象Map中
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}Copy
RPC请求消息
public class RpcRequestMessage extends Message {
/**
* 调用的接口全限定名,服务端根据它找到实现
*/
private String interfaceName;
/**
* 调用接口中的方法名
*/
private String methodName;
/**
* 方法返回类型
*/
private Class<?> returnType;
/**
* 方法参数类型数组
*/
private Class[] parameterTypes;
/**
* 方法参数值数组
*/
private Object[] parameterValue;
public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
public String getInterfaceName() {
return interfaceName;
}
public String getMethodName() {
return methodName;
}
public Class<?> getReturnType() {
return returnType;
}
public Class[] getParameterTypes() {
return parameterTypes;
}
public Object[] getParameterValue() {
return parameterValue;
}
@Override
public String toString() {
return "RpcRequestMessage{" +
"interfaceName='" + interfaceName + '\'' +
", methodName='" + methodName + '\'' +
", returnType=" + returnType +
", parameterTypes=" + Arrays.toString(parameterTypes) +
", parameterValue=" + Arrays.toString(parameterValue) +
'}';
}
}Copy
想要远程调用一个方法,必须知道以下五个信息
RPC响应消息
public class RpcResponseMessage extends Message {
/**
* 返回值
*/
private Object returnValue;
/**
* 异常值
*/
private Exception exceptionValue;
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_RESPONSE;
}
public void setReturnValue(Object returnValue) {
this.returnValue = returnValue;
}
public void setExceptionValue(Exception exceptionValue) {
this.exceptionValue = exceptionValue;
}
public Object getReturnValue() {
return returnValue;
}
public Exception getExceptionValue() {
return exceptionValue;
}
@Override
public String toString() {
return "RpcResponseMessage{" +
"returnValue=" + returnValue +
", exceptionValue=" + exceptionValue +
'}';
}
}Copy
响应消息中只需要获取返回结果和异常值
服务器
public class RPCServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
// PRC 请求消息处理器
RpcRequestMessageHandler rpcRequestMessageHandler = new RpcRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageSharableCodec);
ch.pipeline().addLast(rpcRequestMessageHandler);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}Copy
服务器中添加了处理RPCRequest消息的handler
客户端
public class RPCClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
// PRC 请求消息处理器
RpcResponseMessageHandler rpcResponseMessageHandler = new RpcResponseMessageHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageSharableCodec);
ch.pipeline().addLast(rpcResponseMessageHandler);
}
});
Channel channel = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}Copy
通过接口Class获取实例对象的Factory
public class ServicesFactory {
static HashMap<Class<?>, Object> map = new HashMap<>(16);
public static Object getInstance(Class<?> interfaceClass) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
// 根据Class创建实例
try {
Class<?> clazz = Class.forName("cn.nyimac.study.day8.server.service.HelloService");
Object instance = Class.forName("cn.nyimac.study.day8.server.service.HelloServiceImpl").newInstance();
// 放入 InterfaceClass -> InstanceObject 的映射
map.put(clazz, instance);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
return map.get(interfaceClass);
}
}Copy
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage rpcMessage) {
RpcResponseMessage rpcResponseMessage = new RpcResponseMessage();
try {
// 设置返回值的属性
rpcResponseMessage.setSequenceId(rpcMessage.getSequenceId());
// 返回一个实例
HelloService service = (HelloService) ServicesFactory.getInstance(Class.forName(rpcMessage.getInterfaceName()));
// 通过反射调用方法,并获取返回值
Method method = service.getClass().getMethod(rpcMessage.getMethodName(), rpcMessage.getParameterTypes());
// 获得返回值
Object invoke = method.invoke(service, rpcMessage.getParameterValue());
// 设置返回值
rpcResponseMessage.setReturnValue(invoke);
} catch (Exception e) {
e.printStackTrace();
// 设置异常
rpcResponseMessage.setExceptionValue(e);
}
}
// 向channel中写入Message
ctx.writeAndFlush(rpcResponseMessage);
}Copy
远程调用方法主要是通过反射实现的,大致步骤如下
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
static final Logger log = LoggerFactory.getLogger(ChatServer.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
System.out.println((String)msg.getReturnValue());
}
}Copy
public class RPCClient {
public static void main(String[] args) {
...
// 创建请求并发送
RpcRequestMessage message = new RpcRequestMessage(1,
"cn.nyimac.study.day8.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"Nyima"});
channel.writeAndFlush(message);
...
}
}Copy
运行结果
客户端
1606 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.server.ChatServer - RpcResponseMessage{returnValue=你好,Nyima, exceptionValue=null}Copy
public class RPCClientManager {
/**
* 产生SequenceId
*/
private static AtomicInteger sequenceId = new AtomicInteger(0);
private static volatile Channel channel = null;
private static final Object lock = new Object();
public static void main(String[] args) {
// 创建代理对象
HelloService service = (HelloService) getProxy(HelloService.class);
// 通过代理对象执行方法
System.out.println(service.sayHello("Nyima"));
System.out.println(service.sayHello("Hulu"));
}
/**
* 单例模式创建Channel
*/
public static Channel getChannel() {
if (channel == null) {
synchronized (lock) {
if (channel == null) {
init();
}
}
}
return channel;
}
/**
* 使用代理模式,帮助我们创建请求消息并发送
*/
public static Object getProxy(Class<?> serviceClass) {
Class<?>[] classes = new Class<?>[]{serviceClass};
// 使用JDK代理,创建代理对象
Object o = Proxy.newProxyInstance(serviceClass.getClassLoader(), classes, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 创建请求消息
int id = sequenceId.getAndIncrement();
RpcRequestMessage message = new RpcRequestMessage(id, serviceClass.getName(),
method.getName(), method.getReturnType(),
method.getParameterTypes(),
args);
// 发送消息
getChannel().writeAndFlush(message);
// 创建Promise,用于获取NIO线程中的返回结果,获取的过程是异步的
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
// 将Promise放入Map中
RpcResponseMessageHandler.promiseMap.put(id, promise);
// 等待被放入Promise中结果
promise.await();
if (promise.isSuccess()) {
// 调用方法成功,返回方法执行结果
return promise.getNow();
} else {
// 调用方法失败,抛出异常
throw new RuntimeException(promise.cause());
}
}
});
return o;
}
private static void init() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
// PRC 请求消息处理器
RpcResponseMessageHandler rpcResponseMessageHandler = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageSharableCodec);
ch.pipeline().addLast(rpcResponseMessageHandler);
}
});
try {
channel = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync().channel();
// 异步关闭 group,避免Channel被阻塞
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}Copy
获得Channel
init
方法中,当连接断开时,通过addListener
方法异步关闭group远程调用方法
RpcRequestMessage
的创建与发送过程通过JDK的动态代理来完成远程调用方法返回值获取
调用方法的是主线程,处理返回结果的是NIO线程(RpcResponseMessageHandler)。要在不同线程中进行返回值的传递,需要用到Promise
在RpcResponseMessageHandler
中创建一个Map
主线程的代理类将RpcResponseMessage发送给服务器后,需要创建Promise对象,并将其放入到RpcResponseMessageHandler的Map中。需要使用await等待结果被放入Promise中。获取结果后,根据结果类型(判断是否成功)来返回结果或抛出异常
// 创建Promise,用于获取NIO线程中的返回结果,获取的过程是异步的
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
// 将Promise放入Map中
RpcResponseMessageHandler.promiseMap.put(id, promise);
// 等待被放入Promise中结果
promise.await();
if (promise.isSuccess()) {
// 调用方法成功,返回方法执行结果
return promise.getNow();
} else {
// 调用方法失败,抛出异常
throw new RuntimeException(promise.cause());
}Copy
NIO线程负责通过SequenceId**获取并移除(remove)**对应的Promise,然后根据RpcResponseMessage中的结果,向Promise中放入不同的值
promise.setSuccess(returnValue)
放入方法返回值promise.setFailure(exception)
放入异常信息// 将返回结果放入对应的Promise中,并移除Map中的Promise
Promise<Object> promise = promiseMap.remove(msg.getSequenceId());
Object returnValue = msg.getReturnValue();
Exception exception = msg.getExceptionValue();
if (promise != null) {
if (exception != null) {
// 返回结果中有异常信息
promise.setFailure(exception);
} else {
// 方法正常执行,没有异常
promise.setSuccess(returnValue);
}
}Copy
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
static final Logger log = LoggerFactory.getLogger(ChatServer.class);
/**
* 用于存放Promise的集合,Promise用于主线程与NIO线程之间传递返回值
*/
public static Map<Integer, Promise<Object>> promiseMap = new ConcurrentHashMap<>(16);
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
// 将返回结果放入对应的Promise中,并移除Map中的Promise
Promise<Object> promise = promiseMap.remove(msg.getSequenceId());
Object returnValue = msg.getReturnValue();
Exception exception = msg.getExceptionValue();
if (promise != null) {
if (exception != null) {
// 返回结果中有异常信息
promise.setFailure(exception);
} else {
// 方法正常执行,没有异常
promise.setSuccess(returnValue);
}
}
// 拿到返回结果并打印
log.debug("{}", msg);
}
}Copy
Netty启动流程可以简化成如下代码
// netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open();
// 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();
// 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 启动 nio boss 线程执行接下来的操作
//注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
// head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);Copy
NioServerSocketChannel
,该Channel作为附件添加到ServerSocketChannel
中ServerSocketChannel
,将其设置为非阻塞模式,并注册到Selector中,此时未关注事件,但是添加了附件NioServerSocketChannel
interestOps
设置感兴趣的事件选择器Selector的创建是在NioEventloopGroup中完成的。NioServerSocketChannel与ServerSocketChannel的创建,ServerSocketChannel注册到Selector中以及绑定操作都是由bind
方法完成的
所以服务器启动的入口便是io.netty.bootstrap.ServerBootstrap.bind
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}Copy
真正完成初始化、注册以及绑定的方法是io.netty.bootstrap.AbstractBootstrap.doBind
dobind方法在主线程中执行
private ChannelFuture doBind(final SocketAddress localAddress) {
// 负责NioServerSocketChannel和ServerSocketChannel的创建
// ServerSocketChannel的注册工作
// init由main线程完成,regisetr由NIO线程完成
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 因为register操作是异步的
// 所以要判断主线程执行到这里时,register操作是否已经执行完毕
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 执行doBind0绑定操作
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
// 如果register操作还没执行完,就会到这个分支中来
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 添加监听器,NIO线程异步进行doBind0操作
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}Copy
initAndRegister()
和doBind0(regFuture, channel, localAddress, promise)
代码
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}Copy
Channel channel = null;
try {
// 通过反射初始化NioServerSocketChannel
channel = channelFactory.newChannel();
init(channel);
}Copy
newChannel方法
@Override
public T newChannel() {
try {
// 通过反射调用NioServerSocketChannel的构造方法
// 创建NioServerSocketChannel对象
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}Copy
NioServerSocketChannel构造方法
public NioServerSocketChannel() {
// 创建了ServerSocketChannel实例
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}Copy
newSocket方法
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// ServerSocketChannel.open方法:
// SelectorProvider.provider().openServerSocketChannel()
// 所以此处相当于ServerSocketChannel.open()
// 创建了ServerSocketChannel实例
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a server socket.", e);
}
}Copy
init方法
@Override
void init(Channel channel) {
...
// NioSocketChannl的Pipeline
ChannelPipeline p = channel.pipeline();
...
// 向Pipeline中添加了一个handler,该handler等待被调用
p.addLast(new ChannelInitializer<Channel>() {
@Override
// register之后才调用该方法
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// 创建handler并加入到pipeline中
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加新的handler,在发生Accept事件后建立连接
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}Copy
init主要完成了以下三个操作
创建NioServerSocketChannel
通过NioServerSocketChannel的构造器,创建了ServerSocketChannel
由
initChannel
方法向NioServerSocketChannel中添加了两个handler,
添加操作在register之后被执行
init执行完毕后,便执行ChannelFuture regFuture = config().group().register(channel)
操作
该方法最终调用的是promise.channel().unsafe().register(this, promise)
方法
promise.channel().unsafe().register(this, promise)
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
// 获取EventLoop
AbstractChannel.this.eventLoop = eventLoop;
// 此处完成了由 主线程 到 NIO线程 的切换
// eventLoop.inEventLoop()用于判断当前线程是否为NIO线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 向NIO线程中添加任务
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 该方法中会执行doRegister
// 执行真正的注册操作
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}Copy
register0方法
private void register0(ChannelPromise promise) {
try {
...
// 执行真正的注册操作
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 调用init中的initChannel方法
pipeline.invokeHandlerAddedIfNeeded();
...
} catch (Throwable t) {
...
}
}Copy
doRegister方法
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// javaChannel()即为ServerSocketChannel
// eventLoop().unwrappedSelector()获取eventLoop中的Selector
// this为NIOServerSocketChannel,作为附件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}Copy
回调initChannel
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 添加新任务,任务负责添加handler
// 该handler负责发生Accepet事件后建立连接
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}Copy
Register主要完成了以下三个操作
完成了主线程到NIO的线程切换
eventLoop.inEventLoop()
进行线程判断,判断当前线程是否为NIO线程调用doRegister方法
// javaChannel()即为ServerSocketChannel
// eventLoop().unwrappedSelector()获取eventLoop中的Selector
// this为NIOServerSocketChannel,作为附件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);Copy
通过invokeHandlerAddedIfNeeded
调用init中的initChannel
方法
initChannel方法主要创建了
两个handler
在doRegister
和invokeHandlerAddedIfNeeded
操作中的完成后,会调用safeSetSuccess(promise)
方法,向Promise中设置执行成功的结果。此时doBind
方法中由initAndRegister
返回的ChannelFuture对象regFuture便会由NIO线程异步执行doBind0绑定操作
// initAndRegister为异步方法,会返回ChannelFuture对象
final ChannelFuture regFuture = initAndRegister();
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
// 如果没有异常,则执行绑定操作
doBind0(regFuture, channel, localAddress, promise);
}
}
});Copy
doBind0最底层调用的是ServerSocketChannel的bind方法
NioServerSocketChannel.doBind方法
通过该方法,绑定了对应的端口
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 调用ServerSocketChannel的bind方法,绑定端口
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}Copy
在绑定端口操作完成后,会判断各种所有初始化操作是否已经完成,若完成,则会添加ServerSocketChannel感兴趣的事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}Copy
最终在AbstractNioChannel.doBeginRead
方法中,会添加ServerSocketChannel添加Accept事件
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// 如果ServerSocketChannel没有关注Accept事件
if ((interestOps & readInterestOp) == 0) {
// 则让其关注Accepet事件
// readInterestOp 取值是 16
// 在 NioServerSocketChannel 创建时初始化
selectionKey.interestOps(interestOps | readInterestOp);
}
}Copy
注意:此处设置interestOps时使用的方法,避免覆盖关注的其他事件
首先获取Channel所有感兴趣的事件
final int interestOps = selectionKey.interestOps();Copy
然后再设置其感兴趣的事件
selectionKey.interestOps(interestOps | readInterestOp);Copy
各个事件对应的值
通过上述步骤,完成了
NioEventLoop的重要组成部分有三个
Selector
public final class NioEventLoop extends SingleThreadEventLoop {
...
// selector中的selectedKeys是基于数组的
// unwrappedSelector中的selectedKeys是基于HashSet的
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
...
}Copy
Thread与TaskQueue
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
// 任务队列
private final Queue<Runnable> taskQueue;
// 线程
private volatile Thread thread;
}Copy
Selector是在NioEventLoop的构造方法中被创建的
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
...
// 初始化selector,初始化过程在openSelector中
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 此处等同于 Selector.open()方法
// 创建了unwrappedSelector对象
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
}Copy
NioEventLoop的构造方法中,调用了openSelector()
方法, 该方法会返回一个
SelectorTuple对象,该方法是创建Selector的核心方法。openSelector()
方法内部调用了
unwrappedSelector = provider.openSelector();Copy
获得了Selector对象unwrappedSelector
后面会通过反射,修改unwrappedSelector
中SelectedKeys的实现,然后通过SelectedSelectionKeySetSelector
方法获得selector。最后通过SelectorTuple的构造方法,将该Selector的值赋给SelectorTuple类中的selector与unwrappedSelector
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
/**
* 一般调用的是这个构造方法
*/
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}Copy
再通过NioEventLoop的构造方法,将SelectorTuple中的Selector赋值给NioEventLoop中的Selector
NioEventLoop中有selector和unwrappedSelector两个Selector,它们的区别主要在于SelectedKeys的数据结构
这样做的主要目的是,数组的遍历效率要高于HashSet
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
...
// 获得基于数组的selectedKeySet实现
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 通过反射拿到unwrappedSelector中的selectedKeys属性
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
...
// 暴力反射,修改私有属性
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
// 替换为基于数组的selectedKeys实现
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
selectedKeys = selectedKeySet;
// 调用构造函数,创建unwrappedSelector与selector
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}Copy
获得数组实现SelectedKeys的Selector的原理是反射,主要步骤如下
获得基于数组的selectedKeySet实现
// 获得基于数组的selectedKeySet实现
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}Copy
通过反射拿到unwrappedSelector中的SelectedKeySet并将其替换为selectedKeySet
通过Selector的构造方法获得selector
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)Copy
通过SelectorTuple的构造方法获得拥有两种Selector的SelectorTuple对象,并返回给NioEventLoop
// 调用构造函数,创建unwrappedSelector与selector
return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));Copy
NioEventLoop中的线程,在首次执行任务时,才会被创建,且只会被创建一次
测试代码
public class TestNioEventLoop {
public static void main(String[] args) {
EventLoop eventLoop = new NioEventLoopGroup().next();
// 使用NioEventLoop执行任务
eventLoop.execute(()->{
System.out.println("hello");
});
}
}Copy
进入execute
执行任务
@Override
public void execute(Runnable task) {
// 检测传入的任务是否为空,为空会抛出NullPointerException
ObjectUtil.checkNotNull(task, "task");
// 执行任务
// 此处判断了任务是否为懒加载任务,wakesUpForTask的返回值只会为true
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}Copy
进入上述代码的execute
方法
private void execute(Runnable task, boolean immediate) {
// 判断当前线程是否为NIO线程
// 判断方法为 return thread == this.thread;
// this.thread即为NIO线程,首次执行任务时,其为null
boolean inEventLoop = inEventLoop();
// 向任务队列taskQueue中添加任务
addTask(task);
// 当前线程不是NIO线程,则进入if语句
if (!inEventLoop) {
// 启动NIO线程的核心方法
startThread();
...
}
// 有任务需要被执行时,唤醒阻塞的NIO线程
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}Copy
进入startThread
方法
private void startThread() {
// 查看NIO线程状态是否为未启动
// 该if代码块只会执行一次
// state一开始的值就是ST_NOT_STARTED
// private volatile int state = ST_NOT_STARTED;
if (state == ST_NOT_STARTED) {
// 通过原子属性更新器将状态更新为启动(ST_STARTED)
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 执行启动线程
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}Copy
进入doStartThread
,真正创建NIO线程并执行任务
private void doStartThread() {
assert thread == null;
// 创建NIO线程并执行任务
executor.execute(new Runnable() {
@Override
public void run() {
// thread即为NIO线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 执行内部run方法
SingleThreadEventExecutor.this.run();
success = true;
}
...
});
}Copy
通过SingleThreadEventExecutor.this.run()
执行传入的任务(task)
该run方法是NioEvnetLoop的run方法
@Override
protected void run() {
int selectCnt = 0;
// 死循环,不断地从任务队列中获取各种任务来执行
for (;;) {
// 执行各种任务
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
}
}
}Copy
NioEvnetLoop需要IO事件、普通任务以及定时任务,任务在run方法的for循环中
@Override
protected void run() {
int selectCnt = 0;
// 死循环,不断地从任务队列中获取各种任务来执行
for (;;) {
// 执行各种任务
...
}
}Copy
中被执行,但该循环不会空转,执行到某些代码时,会被阻塞
run方法中有SELECT分支
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
// 执行select方法
strategy = select(curDeadlineNanos);
}
}
...Copy
会执行NioEvnetLoop的select
方法,该方法内部会根据情况,执行selector的有参和无参的select方法
private int select(long deadlineNanos) throws IOException {
// 如果没有指定阻塞事件,就调用select()
if (deadlineNanos == NONE) {
return selector.select();
}
// 否则调用select(timeoutMillis),指定时间内未发生事件就停止阻塞
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}Copy
但需要注意的是,select
方法是会阻塞线程的,当没有IO事件,但有其他任务需要执行时,需要唤醒线程
唤醒是通过execute最后的if代码块来完成的
// 有任务需要被执行时,唤醒阻塞的NIO线程
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}Copy
NioEventLoop.wakeup
唤醒被selector.select方法阻塞的NIO线程
@Override
protected void wakeup(boolean inEventLoop) {
// 只有当其他线程给当前NIO线程提交任务时(如执行execute),才会被唤醒
// 通过AtomicLong进行更新,保证每次只能有一个线程唤醒成功
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
// 唤醒被selector.select方法阻塞的NIO线程
selector.wakeup();
}
}Copy
唤醒时需要进行两个判断
判断提交任务的
是否为NIO线程
通过AtomicLong保证有多个线程同时提交任务时,只有一个线程能够唤醒NIO线程
run方法的switch语句有多条分支,具体执行分支的代码由strategy变量控制
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
...
}Copy
strategy的值由calculateStrategy
方法确定
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// selectSupplier.get() 底层是 selector.selectNow();
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}Copy
该方法会根据hasTaks变量判断任务队列中是否有任务
若有任务,则通过selectSupplier获得strategy的值
get方法会selectNow方法,顺便拿到IO事件
private final IntSupplier selectNowSupplier = new IntSupplier() {
public int get() throws Exception {
return NioEventLoop.this.selectNow();
}
};
int selectNow() throws IOException {
return this.selector.selectNow();
}Copy
若没有任务,就会进入SELECT分支
也就说,当任务队列中没有任务时,才会进入SELECT分支,让NIO线程阻塞,而不是空转。若有任务,则会通过get
方法调用selector.selectNow
方法,顺便拿到IO事件
Java NIO空轮询BUG也就是JavaNIO在Linux系统下的epoll空轮询问题
在NioEventLoop中,因为run方法中存在一个死循环,需要通过selector.select方法来阻塞线程。但是select方法因为BUG,可能无法阻塞线程,导致循环一直执行,使得CPU负载升高
@Override
protected void run() {
...
for(;;){
...
// 可能发生空轮询,无法阻塞NIO线程
strategy = select(curDeadlineNanos);
...
if(...) {
...
} else if (unexpectedSelectorWakeup(selectCnt) ){
// 通过unexpectedSelectorWakeup方法中的rebuildSelector重建selector
// 并将selectCnt重置为0
selectCnt = 0;
}
}
}Copy
Netty中通过selectCnt
变量来检测select
方法是否发生空轮询BUG
若发生空轮询BUG,那么selectCnt的值会增长是十分迅速。当selectCnt
的值大于等于SELECTOR_AUTO_REBUILD_THRESHOLD
(默认512)时,Netty则判断其出现了空轮询BUG,进行如下处理
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);
// 重建selector,将原selector的配置信息传给新selector
// 再用新selector覆盖旧selector
rebuildSelector();
return true;
}Copy
通过rebuildSelector
方法重建selector,将原selector的配置信息传给新selector,再用新selector覆盖旧selector。同时将selectCnt的值设置为0
NioEventLoop可以处理IO事件和其他任务。不同的操作所耗费的时间是不同的,想要控制NioEventLoop处理IO事件花费时间占执行所有操作的总时间的比例,需要通过ioRatio来控制
NioEventLoop.run方法
// 处理IO事件时间比例,默认为50%
final int ioRatio = this.ioRatio;
// 如果IO事件时间比例设置为100%
if (ioRatio == 100) {
try {
// 如果需要去处理IO事件
if (strategy > 0) {
// 先处理IO事件
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
// 剩下的时间都去处理普通任务和定时任务
ranTasks = runAllTasks();
}
} else if (strategy > 0) { // 如果需要去处理IO事件
// 记录处理IO事件前的时间
final long ioStartTime = System.nanoTime();
try {
// 去处理IO事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// ioTime为处理IO事件耗费的事件
final long ioTime = System.nanoTime() - ioStartTime;
// 计算出处理其他任务的事件
// 超过设定的时间后,将会停止任务的执行,会在下一次循环中再继续执行
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else { // 没有IO事件需要处理
// This will run the minimum number of tasks
// 直接处理普通和定时任务
ranTasks = runAllTasks(0);
}Copy
通过ioRatio控制各个任务执行的过程如下
判断ioRatio是否为100
若是,判断是否需要处理IO事件(strategy>0)
若否(或IO事件已经处理完毕),接下来去执行所有的普通任务和定时任务,直到所有任务都被处理完
// 没有指定执行任务的时间
ranTasks = runAllTasks();Copy
若ioRatio不为100
先去处理IO事件,记录处理IO事件所花费的事件保存在ioTime中
接下来去处理其他任务,根据ioTime与ioRatio计算执行其他任务可用的时间
// 比如ioTime为10s,ioRatio为50
// 那么通过 10*(100-50)/50=10 计算出其他任务可用的时间为 10s
// 处理IO事件占用的事件总比例为50%
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);Copy
执行其他任务一旦超过可用时间,则会停止执行,在下一次循环中再继续执行
若没有IO事件需要处理,则去执行最少数量的普通任务和定时任务
// 运行最少数量的任务
ranTasks = runAllTasks(0);Copy
IO事件是通过NioEventLoop.processSelectedKeys()
方法处理的
private void processSelectedKeys() {
// 如果selectedKeys是基于数组的
// 一般情况下都走这个分支
if (selectedKeys != null) {
// 处理各种IO事件
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}Copy
processSelectedKeysOptimized方法
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
// 拿到SelectionKeyec
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
// 获取SelectionKey上的附件,即NioServerSocketChannel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 处理事件,传入附件NioServerSocketChannel
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}Copy
该方法中通过fori的方法,遍历基于数组的SelectedKey,通过
final SelectionKey k = selectedKeys.keys[i];Copy
获取到SelectionKey,然后获取其再Register时添加的附件NioServerSocketChannel
// 获取SelectionKey上的附件,即NioServerSocketChannel
final Object a = k.attachment();Copy
如果附件继承自AbstractNioChannel,则会调用
// 处理事件,传入附件NioServerSocketChannel
processSelectedKey(k, (AbstractNioChannel) a);Copy
去处理各个事件
真正处理各种事件的方法processSelectedKey
获取SelectionKey的事件,然后进行相应处理
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}Copy
NIO中处理Accept事件主要有以下六步
代码如下
// 阻塞直到事件发生
selector.select();
Iterator<SelectionKey> iter = selector.selectionKeys().iterator();
while (iter.hasNext()) {
// 拿到一个事件
SelectionKey key = iter.next();
// 如果是 accept 事件
if (key.isAcceptable()) {
// 执行accept,获得SocketChannel
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
// 将SocketChannel注册到selector中,并关注read事件
channel.register(selector, SelectionKey.OP_READ);
}
// ...
}Copy
其中前三步,在NioEventLoop剖析中已经分析过了,所以接下来主要分析后三步
发生Accept事件后,会执行NioEventLoop.run
方法的如下if分支
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}Copy
NioMessageUnsafe.read
方法
public void read() {
...
try {
try {
do {
// doReadMessages中执行了accept获得了SocketChannel
// 并创建NioSocketChannel作为消息放入readBuf
// readBuf是一个ArrayList用来缓存消息
// private final List<Object> readBuf = new ArrayList<Object>();
int localRead = doReadMessages(readBuf);
...
// localRead值为1,就一条消息,即接收一个客户端连接
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发read事件,让pipeline上的handler处理
// ServerBootstrapAcceptor.channelRead
pipeline.fireChannelRead(readBuf.get(i));
}
...
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}Copy
NioSocketChannel.doReadMessages
方法
该方法中处理accpet事件,获得SocketChannel,同时创建了NioSocketChannel,作为消息放在了readBuf中
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 处理accpet事件,获得SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 创建了NioSocketChannel,作为消息放在了readBuf中
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
...
}
return 0;
}Copy
ServerBootstrapAcceptor.channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这时的msg是NioSocketChannel
final Channel child = (Channel) msg;
// NioSocketChannel添加childHandler,即初始化器
child.pipeline().addLast(childHandler);
// 设置选项
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 注册 NioSocketChannel到nio worker线程,接下来的处理也移交至nio worker线程
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}Copy
通过AbstractUnsafe.register
方法,将SocketChannel注册到了Selector中,过程与启动流程中的Register过程类似
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 这行代码完成的是nio boss -> nio worker线程的切换
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 真正的注册操作
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}Copy
AbstractChannel.AbstractUnsafe.register0
private void register0(ChannelPromise promise) {
try {
...
// 该方法将SocketChannel注册到Selector中
doRegister();
// 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
pipeline.invokeHandlerAddedIfNeeded();
// 执行后就是 head -> logging handler -> my handler -> tail
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
// 触发pipeline上active事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}Copy
AbstractNioChannel.doRegister
将SocketChannel注册到Selector中
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将Selector注册到Selector中
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}Copy
HeadContext.channelActive
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 触发read(NioSocketChannel这里read只是为了触发channel的事件注册,还未涉及数据读取)
readIfIsAutoRead();
}Copy
AbstractNioChannel.doBeginRead
,通过该方法,SocketChannel关注了read事件
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 这时候 interestOps是0
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 关注read事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}Copy
read事件的处理也是在
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}Copy
分支中,通过unsafe.read()
方法处理的,不过此处调用的方法在AbstractNioByteChannel.NioByteUnsafe类中
@Override
public final void read() {
// 获得Channel的配置
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 根据配置创建ByteBufAllocator(池化非池化、直接非直接内存)
final ByteBufAllocator allocator = config.getAllocator();
// 用来分配 byteBuf,确定单次读取大小
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 创建ByteBuf
byteBuf = allocHandle.allocate(allocator);
// 读取内容,放入ByteBUf中
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 触发read 事件,让pipeline上的handler处理
// 这时是处理NioSocketChannel上的handler
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
}
// 是否要继续循环
while (allocHandle.continueReading());
allocHandle.readComplete();
// 触发 read complete事件
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}Copy
DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle.continueReading(io.netty.util.UncheckedBooleanSupplier)
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return
// 一般为true
config.isAutoRead() &&
// respectMaybeMoreData默认为true
// maybeMoreDataSupplier的逻辑是如果预期读取字节与实际读取字节相等,返回true
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
// 小于最大次数,maxMessagePerRead默认16
totalMessages < maxMessagePerRead &&
// 实际读到了数据
totalBytesRead > 0;
}Copy
entLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 这行代码完成的是nio boss -> nio worker线程的切换
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 真正的注册操作
register0(promise);
}
});
} catch (Throwable t) {
…
}
}
}Copy
AbstractChannel.AbstractUnsafe.register0
private void register0(ChannelPromise promise) {
try {
...
// 该方法将SocketChannel注册到Selector中
doRegister();
// 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
pipeline.invokeHandlerAddedIfNeeded();
// 执行后就是 head -> logging handler -> my handler -> tail
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
// 触发pipeline上active事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}Copy
`AbstractNioChannel.doRegister`将SocketChannel注册到Selector中
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;😉 {
try {
// 将Selector注册到Selector中
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
…
}
}
}Copy
HeadContext.channelActive
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 触发read(NioSocketChannel这里read只是为了触发channel的事件注册,还未涉及数据读取)
readIfIsAutoRead();
}Copy
`AbstractNioChannel.doBeginRead`,通过该方法,SocketChannel关注了read事件
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 这时候 interestOps是0
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 关注read事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}Copy
## 4、Read剖析
read事件的处理也是在
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}Copy
分支中,通过`unsafe.read()`方法处理的,**不过此处调用的方法在AbstractNioByteChannel.NioByteUnsafe类中**
@Override
public final void read() {
// 获得Channel的配置
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 根据配置创建ByteBufAllocator(池化非池化、直接非直接内存)
final ByteBufAllocator allocator = config.getAllocator();
// 用来分配 byteBuf,确定单次读取大小
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 创建ByteBuf
byteBuf = allocHandle.allocate(allocator);
// 读取内容,放入ByteBUf中
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 触发read 事件,让pipeline上的handler处理
// 这时是处理NioSocketChannel上的handler
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
}
// 是否要继续循环
while (allocHandle.continueReading());
allocHandle.readComplete();
// 触发 read complete事件
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}Copy
DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle.continueReading(io.netty.util.UncheckedBooleanSupplier)
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return
// 一般为true
config.isAutoRead() &&
// respectMaybeMoreData默认为true
// maybeMoreDataSupplier的逻辑是如果预期读取字节与实际读取字节相等,返回true
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
// 小于最大次数,maxMessagePerRead默认16
totalMessages < maxMessagePerRead &&
// 实际读到了数据
totalBytesRead > 0;
}Copy