第4章Netty第二节入门案例+channel,future,promise介绍

发布时间:2023年12月21日

需求

开发一个简单的服务器端和客户端

  • 客户端向服务器端发送 hello, world
  • 服务器仅接收,不返回
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>

服务端

new ServerBootstrap()
    .group(new NioEventLoopGroup()) // 1
    .channel(NioServerSocketChannel.class) // 2
    .childHandler(new ChannelInitializer<NioSocketChannel>() { // 3
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new StringDecoder()); // 5
            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6
                @Override
                protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                    System.out.println(msg);
                }
            });
        }
    })
    .bind(8080); // 4

代码解读:

  1. 1处创建NIOEventLoopGroup,可以简单理解为线程池+selector
  2. 2处选择服务socket实现类,其中NIOServerSocketChannel表示基于NIO的服务端实现。其他还有
    在这里插入图片描述
  3. 3处,为啥方法骄傲childHandler,是接下来处理器都是给socketChannel用的,而不是给ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端socketChannel建立连接后,执行initChannel以便添加更多的处理器。
  4. 4 处,ServerSocketChannel 绑定的监听端口
  5. 5处,socketChannel的处理器,解码ByteBuf=>String
  6. 6处,socketChannel的业务处理器,使用上一个处理器的处理结果

客户端

new Bootstrap()
    .group(new NioEventLoopGroup()) // 1
    .channel(NioSocketChannel.class) // 2
    .handler(new ChannelInitializer<Channel>() { // 3
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder()); // 8
        }
    })
    .connect("127.0.0.1", 8080) // 4
    .sync() // 5
    .channel() // 6
    .writeAndFlush(new Date() + ": hello world!"); // 7

代码解读:
1.1处,创建NIOEventLoopGroup,同server
2. 2处, 选择客户的socket实现类,NIOSocketChannel表示基于NIO的客户端实现类。其他实现还有
在这里插入图片描述

  1. 3处,添加socketChannel的处理器,ChannelInitializer处理器(仅执行一次),它的作用是待客户端socketChannel建立连接后,执行initChannel以便添加跟多的处理器。
  2. 4 处,指定要连接的服务器和端口
  3. 5处,Netty中很多方法都是异步的,如connect,这时需要使用sync方法等待connect建立连接完毕。
  4. 6处,获取channel对象,它即为通道抽象,可以进行数据读写。
  5. 7处,写入消息并清空缓冲区
  6. 8处,建立连接后,消息会经过通道handler处理,这里是将string=>ByteBuf发出。
  7. 数据经过网络传输,到达服务端,服务端5和6处的handlerx先后被触发走完一个流程。

流程梳理

在这里插入图片描述
注意:

  • 把channel理解为数据的通道
  • 把msg理解为流动的数据,最开始输入的是ByteBuf,但经过pipeLine的加工,会变成其他类型的对象,最后输出又变成ByteBuf
  • 把handler理解为数据的处理工序
    • 工序有多道,合在一起就是pipeline.pipeline负责发布事件(读,读取完成。。。)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)
    • handler分Inbound和outbound两类
  • 把eventloop理解为处理数据的工人
    • 工人可以管理多个channel的io操作。并且一旦工人负责了某个channel就要负责到底(绑定)
    • 工人既可以执行io操作,也可以进行任务处理。每位工人有任务队列,队列里可以堆放多个channel待处理的任务,把任务分为普通任务和定时任务。
    • 工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每一道工序执行不同的工人。

组件-EventLoop

事件循环对象 EventLoop

本质上是一个单线程执行器同时维护了一个selector,里面有run方法处理channel上源源不断的io事件。
继承关系:

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor
    • 提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
    • 提供了parent方法来看看自己属于哪个EventLoopGroup

事件循环组 EventLoopGroup

是一组EventLoop,channel一般会调用EventLoopGroup的register方法来绑定一个EventLoop,后续这个channel上的io事件都由此EventLoop来处理(保证了io事件处理时的线程安全)
继承自 netty 自己的 EventExecutorGroup

  • 实现了 Iterable 接口提供遍历 EventLoop 的能力
  • 另有 next 方法获取集合中下一个 EventLoop
    以一个简单的实现为例:
// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());

输出

io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
io.netty.channel.DefaultEventLoop@60f82f98

也可以使用 for 循环

DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
for (EventExecutor eventLoop : group) {
    System.out.println(eventLoop);
}

输出

io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6

优雅关闭

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

演示NioEventLoop处理io事件

服务端两个Nio worker工人

new ServerBootstrap()
    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
                    if (byteBuf != null) {
                        byte[] buf = new byte[16];
                        ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
                        log.debug(new String(buf));
                    }
                }
            });
        }
    }).bind(8080).sync();

客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)

public static void main(String[] args) throws InterruptedException {
    Channel channel = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    System.out.println("init...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            })
            .channel(NioSocketChannel.class).connect("localhost", 8080)
            .sync()
            .channel();

    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
    Thread.sleep(2000);
    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));

最后输出

22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan       
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan       
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi           
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi           
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu        
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu         

可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定
在这里插入图片描述
再增加两个非 nio 工人

DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);
new ServerBootstrap()
    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch)  {
            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
            ch.pipeline().addLast(normalWorkers,"myhandler",
              new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
                    if (byteBuf != null) {
                        byte[] buf = new byte[16];
                        ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
                        log.debug(new String(buf));
                    }
                }
            });
        }
    }).bind(8080).sync();

客户端代码不变,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)

输出

22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] REGISTERED
22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] ACTIVE
22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7a 68 61 6e 67 73 61 6e                         |zhangsan        |
+--------+-------------------------------------------------+----------------+
22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ COMPLETE
22:19:48 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - zhangsan        
22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7a 68 61 6e 67 73 61 6e                         |zhangsan        |
+--------+-------------------------------------------------+----------------+
22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ COMPLETE
22:19:50 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - zhangsan        
22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] REGISTERED
22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] ACTIVE
22:20:25 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ: 4B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6c 69 73 69                                     |lisi            |
+--------+-------------------------------------------------+----------------+
22:20:25 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ COMPLETE
22:20:25 [DEBUG] [defaultEventLoopGroup-2-2] c.i.o.EventLoopTest - lisi            
22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ: 4B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6c 69 73 69                                     |lisi            |
+--------+-------------------------------------------------+----------------+
22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ COMPLETE
22:20:27 [DEBUG] [defaultEventLoopGroup-2-2] c.i.o.EventLoopTest - lisi            
22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] REGISTERED
22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] ACTIVE
22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ: 6B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 61 6e 67 77 75                               |wangwu          |
+--------+-------------------------------------------------+----------------+
22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ COMPLETE
22:20:38 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - wangwu          
22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ: 6B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 61 6e 67 77 75                               |wangwu          |
+--------+-------------------------------------------------+----------------+
22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ COMPLETE
22:20:40 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - wangwu          

可以看到,nio 工人和 非 nio 工人也分别绑定了 channel(LoggingHandler 由 nio 工人执行,而我们自己的 handler 由非 nio 工人执行)

在这里插入图片描述

handler 执行中如何换人?

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
    EventExecutor executor = next.executor();
    
    // 是,直接调用
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } 
    // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
    else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);// 用新的线程去执行
            }
        });
    }
}

在这里插入图片描述

  • 如果两个handler绑定的是同一个线程,那么在当前线程中直接调用
  • 否则,提交给一个新的线程调用。

演示 NioEventLoop 处理普通任务

NioEventLoop 除了可以处理 io 事件,同样可以向它提交普通任务

 // 1. 创建事件循环组
EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
         // 3. 执行普通任务。submit和execute都可以。将这个任务加到事件循环组中的一个线程去执行不会造成阻塞。实现了异步
group.next().submit(() -> {
     try {
         Thread.sleep(1000);
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
     log.debug("ok");
 });
log.debug("main");

输出

19:52:29 [DEBUG] [main] c.i.n.c.TestEventLoop - main
19:52:30 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok

可以用来执行耗时较长的任务,不会被阻塞。

演示 NioEventLoop 处理定时任务

 // 1. 创建事件循环组
 EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
 // 4. 执行定时任务
 group.next().scheduleAtFixedRate(() -> {
     log.debug("ok");
 }, 0, 1, TimeUnit.SECONDS);

 log.debug("main");
19:53:32 [DEBUG] [main] c.i.n.c.TestEventLoop - main
19:53:32 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
19:53:33 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
19:53:34 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok

可以用来执行定时任务。

组件-Channel

channel的主要方法

  • close() 可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入到缓冲区中,但没有将数据发出。
  • writeAndFlush() 方法将数据写入缓存去,并立刻将缓冲区数据刷出。会立即将数据发出。

ChannelFuture

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    // 异步非阻塞的。异步:main线程发起调用,真正执行连接是另一个线程-Nio线程。如果没有sync,是无阻塞的,会直接向下执行。连接都还没建立好,信息不知道发送到哪里去。
    .connect("127.0.0.1", 8080); // 1

channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!");

说明:
1处返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象
注意:connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象。加了sync才能变成同步,一直阻塞直到建立连接。然后执行后面的才能正确发送数据。
实验如下:

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080);

System.out.println(channelFuture.channel()); // 1
channelFuture.sync(); // 2 会阻塞直到返回结果
System.out.println(channelFuture.channel()); // 3
  • 执行到 1 时,连接未建立,打印 [id: 0x2e1884dd]
  • 执行到 2 时,sync 方法是同步等待连接建立完成
  • 执行到 3 时,连接肯定建立了,打印 [id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]
    除了用 sync 方法可以让异步操作同步以外,还可以使用回调的方式:
ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); // 1
channelFuture.addListener((ChannelFutureListener) future -> {
    System.out.println(future.channel()); // 2 连接建立好了会调用该方法
});
// 或者用下面的方法
/*
// 使用 addListener(回调对象) 方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    // 在 nio 线程连接建立好之后,会调用 operationComplete
    public void operationComplete(ChannelFuture future) throws Exception {
        Channel channel = future.channel();
        log.debug("{}", channel);
        channel.writeAndFlush("hello, world");
    }
});
*/

核心:channelFuture.addListener()。注意带有future,promise的类型都可以和addListener方法配套使用的。
说明:这种方法不同于上面的同步的方法,同步的方法是有main线程完成打印的操作。而这里main线程直接将任务交给其他线程以异步的方式执行,当连接建立完成后自动完成打印操作。

CloseFuture

用法基本同上面的channelFuture。closeFuture是用来处理关闭之后的操作的,一般用来关闭EventLoopGroup。

@Slf4j
public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        Channel channel = channelFuture.sync().channel();
        log.debug("{}", channel);
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String line = scanner.nextLine();
                if ("q".equals(line)) {
                    channel.close(); // close 异步操作 1s 之后
//                    log.debug("处理关闭之后的操作"); // 不能在这里善后
                    break;
                }
                channel.writeAndFlush(line);
            }
        }, "input").start();

        // 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
        ChannelFuture closeFuture = channel.closeFuture();
        // 1.同步关闭的方法
        /*log.debug("waiting close...");
        closeFuture.sync();
        log.debug("处理关闭之后的操作");*/
        // 2.异步处理的方法
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                log.debug("处理关闭之后的操作");
                group.shutdownGracefully();
            }
        });
    }
}

组件-Future&Promise

在异步处理的时候,经常用到这两个接口。

netty中的Future和JDK中的Future同名。Netty的Future是继承自JDK的Future。Promise是对Netty中的Future进行拓展(Extend)。

  • JDK中的Future只能同步等待任务结束(或成功或失败)才能得到结果。
  • netty Future可以同步等待任务结束得到结果,也可以异步(addListener)得到结果。但都要等任务结束
  • netty Promise不仅有netty Future的功能,而且可以「主动创建」一个Promise,作为两个线程间传递结果的容器。
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果

JDK Future

只能通过同步等待任务结束,future.get()会阻塞,直到拿到结果。

  // 1. 线程池
        ExecutorService service = Executors.newFixedThreadPool(2);
        // 2. 提交任务
        Future<Integer> future = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算");
                Thread.sleep(1000);
                return 50;
            }
        });
        // 3. 主线程通过 future 来获取结果
        log.debug("等待结果");
        log.debug("结果是 {}", future.get());
22:29:50 [DEBUG] [pool-1-thread-1] c.i.n.c.TestJdkFuture - 执行计算
22:29:50 [DEBUG] [main] c.i.n.c.TestJdkFuture - 等待结果
22:29:51 [DEBUG] [main] c.i.n.c.TestJdkFuture - 结果是 50

netty Future

可以通过同步(get方法会阻塞)也可以通过异步(addListener不会阻塞,交给另外的线程处理)获得结果

 NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算");
                Thread.sleep(1000);
                return 70;
            }
        });
        
//        log.debug("等待结果");
//        log.debug("结果是 {}", future.get()); // 1. 同步的方式
        //2. 异步的方式
        future.addListener(new GenericFutureListener<Future<? super Integer>>(){
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                log.debug("接收结果:{}", future.getNow());
            }
        });

上面异步的方式输出结果:

22:34:24 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestNettyFuture - 执行计算
22:34:25 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestNettyFuture - 接收结果:70

netty Promise(常用的)

Promise 继承自(extend)netty的Future。不仅有Future的功能,而且能主动创建,作为线程 间存放数据的容器

 // 1. 准备 EventLoop 对象
        EventLoop eventLoop = new NioEventLoopGroup().next();
        // 2. 可以主动创建 promise, 结果容器
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
        new Thread(() -> {
            // 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果
            log.debug("开始计算...");
            try {
                int i = 1 / 0;
                Thread.sleep(1000);
                promise.setSuccess(80);
            } catch (Exception e) {
                e.printStackTrace();
                promise.setFailure(e);
            }

        }).start();
        // 4. 接收结果的线程
        log.debug("等待结果...");
        log.debug("结果是: {}", promise.get());
22:37:03 [DEBUG] [main] c.i.n.c.TestNettyPromise - 等待结果...
22:37:03 [DEBUG] [Thread-0] c.i.n.c.TestNettyPromise - 开始计算...
22:37:04 [DEBUG] [main] c.i.n.c.TestNettyPromise - 结果是: 80

后续netty一般都用Promise

文章来源:https://blog.csdn.net/qq_44850917/article/details/135133847
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。