netty是一个异步、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。
Netty解决了TCP传输问题,如黏包、半包问题,解决了epoll空轮询导致CPU100%的问题。并且Netty对API进行增强,使之更易用。如FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer等NIO API进行增强。
- 线程问题:在单元测试中,当线程启动后,系统会立即执行下一行代码,直到所有代码执行完毕。然后,main函数会调用system.exit(0)来结束整个程序。这可能会导致你的服务器在还没有接收到任何数据的情况下就被关闭了。
- 输入/输出流:单元测试通常不会与标准输入/输出流进行交互,这可能会影响到你的服务器的运行
public static void main(String[] args) {
System.out.println("server starter connection......");
// 服务器端的启动器 负责组装Netty组件 启动服务器
new ServerBootstrap()
// 循环处理事件,通过group EventLoopGroup等事件,
// 可能会有BossEventLoopGroup WorkerEventLoopGroup(selector, thread)
.group(new NioEventLoopGroup())
// 选择channel的实现方式,可以是OIO BIO等channel实现方式
.channel(NioServerSocketChannel.class)
// 将事件的处理进行分工,类似Boss负责连接 Worker负责读写事件等操作
// ChannelInitializer和客户端连接后,对数据读写的通道
.childHandler(new ChannelInitializer<NioSocketChannel>() {
// 添加具体的handler
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 将接收到的字符串进行解码 将ByteBuf转为字符串
nioSocketChannel.pipeline().addLast(new StringDecoder());
// 自定义handler
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
}).bind(8080);
System.out.println("server connection ok...");
}
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
// 添加处理器,会在连接建立后进行调用
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 将发送的数据进行编码
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel()
// 向服务器发送数据
.writeAndFlush("hello world");
System.out.println("client connection.....");
}
客户端中的
sync
方法会阻塞,等待连接建立,当连接建立好后会获取到客户端和服务端建立连接的SocketChannel
对象,通过这个对象向服务器进行读写数据。客户端收发数据都会调用handler中的方法,将channel的数据通过StringEncoder
进行编码来发送。然后服务器通过EventLoopGroup
来接收读事件。将接收到的读事件调用服务器的childHandler
来进行处理,通过StringDecoder
来进行解码,然后将解码的数据交给ChannelInboundHandlerAdapter
自定义处理来进行处理。
1、channel可以理解为数据通道
2、把msg理解为流动的数据,最开始输入的是ByteBuf,但经过pipeline的加工,会变成其他类型对象,最后输出又变成ByteBuf
3、将handler理解为数据的加工工序
3.1、工序有多道,合在一起就是pipeline,pipeline负责发布事件(读,读取完成)传播到每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)
3.2、handler分为Inbound(入站)和Outbound(出站)两类
4、eventLoop理解为处理数据的工人
4.1、工人可以管理多个channel的io操作,并且一旦工人负责某个channel,就要负责到底(绑定)
4.2、工人既可以执行io操作,也可以进行任务处理,每位工人有任务队列,队列中可以堆放多个channel的待处理任务,任务分为普通任务、定时任务。
4.3、工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人
EventLoop本质是一个单线程执行器(同时维护一个
selector
,执行器意味着可以向该对象提交一些任务包括定时任务),里面有run方法处理channel上源源不断的IO事件EventLoop继承事件如下:
1、继承自j.u.c.ScheduledExecutorService因此包含了线程池的所有方法
2、继承自netty自己的OrderedEventExecutor
2.1、提供了
boolean inEventLoop(Thread thread)
方法判断一个线程是否属于此EventLoop2.2、提供了
parent
方法来看看自己属于哪个EventLoopGroup3、EventLoopGroup是一组EventLoop,channel一般会调用EventLoopGroup的
register
方法来绑定其中一个EventLoop,后续这个channel上的io事件都由此EventLoop来处理(保证了io事件处理时的线程安全)3.1、EventLoopGroup继承自EventExecutorGroup,实现了
iterable
接口提供遍历EventLoop的能力,同时有next
方法获取集合下一个EventLoop
NioEventLoopGroup
处理io事件,既能提交普通任务也能提交定时任务,默认使用电脑的cpu核心数*2
DefaultEventLoopGroup
DefaultEventLoopGroup() 只能处理普通任务和定时任务,不能处理io事件
public static void main(String[] args) {
// 处理io事件,既能提交普通任务也能提交定时任务,默认使用电脑的cpu核心数*2
// 有多少个线程就有多少个EventLoop
EventLoopGroup group = new NioEventLoopGroup(2);
// 提交执行普通任务
group.next().submit(() -> {
log.debug("event loop task submit....");
});
// 定时任务 参数2:初始延迟事件 参数3:间隔事件,参数4:事件单位
group.next().scheduleAtFixedRate(() -> {
log.debug("scheduled tasks ....");
}, 1L, 1L, TimeUnit.SECONDS);
log.debug("main thread exe....");
}
public static void main(String[] args) {
// 只处理普通任务以及定时任务
EventLoopGroup group = new DefaultEventLoop();
new ServerBootstrap()
// 细分:boss 只负责ServerSocketChannel的accept事件, worker只负责SocketChannel上的读写事件
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter() {
@Override// Object msg其实就是ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
// 将消息传递给下一个handler
ctx.fireChannelRead(msg);
}
// 绑定DefaultEventLoop
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
@Override// Object msg其实就是ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
}).bind(8080);
}
当第一个NioEventLoopGroup请求连接建立成功后,会由第二个NioEventLoopGroup去处理(执行childHandler中的逻辑)
关键代码
io.netty.channel.AbstractChannelHandlerContext
的invokeChannelRead
方法
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获取下一个handler的EventLoop(EventLoop继承EventExecutor)
EventExecutor executor = next.executor();
// 如果下一个handler和当前的handler属于同一个线程
// 就直接调用
if (executor.inEventLoop()) {
// 使用当前handler的线程来调用invokeChannelRead
// 寻找下一个handler
next.invokeChannelRead(m);
} else {
// 如果不是同一个线程就将执行任务代码交给
// 下一个handler线程处理
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
如果一个channel来建立连接成功后,会从
head
的headler调用invokeChannelRead
去寻找下一个handler的read事件
close可以用来关闭channel(异步操作)
closeFuture()用来处理channel的关闭
sync方法作用是同步等待channel关闭
addListener方法是异步等待channel关闭
pipeline方法添加处理器
write方法将数据写入(将数据不会立马发送到服务器或者客户端,而且缓存起来,当调用flush或者数据填满缓冲区就发送出去)
writeAndFlush方法将数据写入并刷出
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
// 添加处理器,会在连接建立后进行调用
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 将发送的数据进行编码
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel()
.writeAndFlush("hello world");
connect
方法是异步非阻塞的,main线程发起调用,这种执行连接的是NioEventLoopGroup
线程。sync
阻塞当前线程直到NioEventLoopGroup
建立连接完毕才往下执行。
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
// 添加处理器,会在连接建立后进行调用
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 将发送的数据进行编码
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
// 使用addListener异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
log.debug("{}", channel);
channel.writeAndFlush("hello, world");
}
});
operationComplete
方法在NioEventLoopGroup
建立好连接后会调用,并在NioEventLoopGroup
线程中执行,不会阻塞主线程。
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
// 添加处理器,会在连接建立后进行调用
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 将发送的数据进行编码
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
String str = null;
Scanner input = null;
input = new Scanner(System.in);
do {
str = input.nextLine();
System.out.println("input value is:\t" + str);
channel.writeAndFlush(str);
} while (!input.hasNext("exit"));
channel.close();
ChannelFuture closeFuture = channel.closeFuture();
// 同步关闭结果
closeFuture.sync();
System.out.println("channel 关闭后操作");
异步处理Close关闭问题
channel.close();
ChannelFuture closeFuture = channel.closeFuture();
// 异步关闭结果
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("channel 关闭后操作");
// 在channel关闭后关闭NioSocketChannel线程
group.shutdownGracefully();
}
});
在异步处理时,经常使用到两个接口
首先说明netty中的Future与jdk中的Future同名,但是是两个接口,netty的Future继承自jdk的Future,而Promise又对netty Future进行扩展
- jdk Future只能同步等待任务结束(或成功,或失败)才能得到结果
- netty Future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 | jdk Future | netty Future |
---|---|---|
cancel | 取消任务 | |
isCaneled | 任务是否取消 | |
isDone | 任务是否完成,不能区分任务成功失败 | |
get | 获取任务结果,阻塞等待 | |
getNow | 获取任务结果,非阻塞,还未产生结果时返回null | |
await | 等待任务结束,如果任务失败,不会抛异常,而且通过isSuccess判断 | |
sync | 等待任务结束,如果任务失败,抛出异常 | |
isSuccess | 判断任务是否成功 | |
cause | 获取失败信息,非阻塞,如果没有失败,返回null | |
addLinstener | 添加回调,异步接收结果 |
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
Future<Integer> submit = threadPool.submit(() -> {
log.debug("执行计算");
Thread.sleep(1000);
return 50;
});
log.debug("等待结果:");
// 主线程通过Future获取结果
Integer integer = submit.get();
log.debug("获取结果: {}",integer);
NioEventLoopGroup group = new NioEventLoopGroup();
// 获取一个EventGroup
EventLoop eventLoop = group.next();
Future<Integer> submit = eventLoop.submit(() -> {
log.debug("进行计算");
Thread.sleep(1000);
return 50;
});
log.debug("等待结果:");
Integer result = submit.get();
log.debug("获取结果: {}", result);
DefaultPromise<Integer> promise = new DefaultPromise<>(new NioEventLoopGroup().next());
new Thread(() -> {
try {
log.debug("开始计算");
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
promise.setSuccess(50);
}).start();
log.debug("获取结果: {}", promise.get());
ChannelHandler用来处理Channel上的各种事件,分为入站、出站两种。所有ChannelHandler被在一起就是Pipeline
- 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对写回结果进行加工
每个Channel是一个产品的加工车间,Pipeline是车间中的流水线,ChannelHandler就是流水线上的各个工序,而ByteBuf就是原材料。经过很多工序的加工最终成为产品。
EmbeddedChannel用来模拟测试入站,出站执行顺序
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("h1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("h2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("h3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("h4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel embeddedChannel = new EmbeddedChannel(h1, h2, h3, h4);
// 模拟入站操作
embeddedChannel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
// 模拟出站操作
embeddedChannel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
public static void log(ByteBuf buf) {
int length = buf.readableBytes();
int rows= length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder sb = new StringBuilder(rows * 80 * 20)
.append("read index:")
.append(buf.readerIndex())
.append(" write index:")
.append(buf.writerIndex())
.append(" capacity:")
.append(buf.capacity())
.append(NEWLINE);
appendPrettyHexDump(sb, buf);
System.out.println(sb.toString());
}
可以使用如下代码来创建池化基于堆内存的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
也可以通过如下代码来创建池化基于直接内存的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
池化的最大意义在于可以重用ByteBuf,优点如下:
池化功能开启,可以通过如下系统变量设置
-Dio.netty.allocator.type={unpooled|pooled}
Netty 4.1以后,非Android平台默认启用池化实现,Android平台默认使用非池化实现。在4.1之前,池化功能还不成熟,默认使用的是非池化实现
ByteBuf组成由capacity(ByteBuf容量,默认256)、max capacity(最大容量,值为整数的最大值),读写指针组成。
零拷贝体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的read、write指针。
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j' });
log(buf);
// 切片
ByteBuf buf1 = buf.slice(0, 5);
ByteBuf buf2 = buf.slice(5, 5);
log(buf1);
log(buf2);
}
public static void log(ByteBuf buf) {
int length = buf.readableBytes();
int rows= length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder sb = new StringBuilder(rows * 80 * 20)
.append("read index:")
.append(buf.readerIndex())
.append(" write index:")
.append(buf.writerIndex())
.append(" capacity:")
.append(buf.capacity())
.append(NEWLINE);
appendPrettyHexDump(sb, buf);
System.out.println(sb.toString());
}
修改切片的数据,原数据也会改变。同时不允许切片后的切片进行添加数据,原有的ByteBuf进行release后其他的切片不能再使用。若其他切片需要使用可以使用retain将引用计数加1,就不会因为release导致切片不可以,但需要自己手动释放切片的内存(调用release)
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[] {'a', 'b', 'c', 'd'});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.writeBytes(new byte[] {'e', 'f', 'g', 'h'});
CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
// 使用带boolean参数的方法来改变读写指针
buf3.addComponents(true, buf1, buf2);
TestSlice.log(buf3);
CompositeByteBuf避免了内存的复制
由于Netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收
Netty采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口
Unpooled是一个工具类,提供非池化的ByteBuf的创建、组合、复制等操作。
与零拷贝相关的方法有wrappedBuffer方法可以用来包装ByteBuf
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[] {'a', 'b', 'c', 'd'});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.writeBytes(new byte[] {'e', 'f', 'g', 'h'});
ByteBuf buf = Unpooled.wrappedBuffer(buf1, buf2);
TestSlice.log(buf);