Netty核心NioEventLoop源码解析

发布时间:2023年12月18日

简介

NioEventLoopNetty基于Java NIO(new I/O)包实现的一个重要组件,它具备以下几个重要的特点:

  1. 非阻塞式IO模型:通过非阻塞的IO模型,实现单个线程处理大量并发连接。
  2. 事件驱动:基于事件驱动模型,管理和执行各种网络操作,例如:数据读写、连接管理、定时任务等,实现高效异步处理。
  3. 事件队列:NioEventLoop内部包含一个事件队列,它会并按照收到的事件类型分发到对应的处理器上。
  4. IO多路复用:通过操作系统底层所提供的选择器(Selector)监听多个通道多个通道(即各个接入的连接),使得我们无需为每一个连接创建一个线程,不仅节约系统资源开销,还提升了系统的吞吐量。

如下图所示,Netty对于EventLoop的设计仍然是遵循Reactor模型的,通过非阻塞式的事件驱动模式,让EventLoop中的Selector完成通道的注册、事件的轮询、任务的分发。

在这里插入图片描述

所以本篇文章就会基于源码分析的方式,来了解NioEventLoop的创建、启动、执行这几个过程。

NioEventLoop源码分析

示例代码

在正式分析源码之前,笔者这里给出一段服务端配置的代码示例,作为调试的入口,读者可以根据笔者的讲解自行调试阅读。

public final class Server {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
                    .handler(new ServerHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new AuthHandler());
                            //..

                        }
                    });

            // option() 方法 用于给服务端Channel设置一些TCP参数 SO_BACKLOG 表示系统用于临时存放已完成三次握手的请求的队列的最大长度
            b.option(ChannelOption.SO_BACKLOG, 1024);

            // attr() 方法 用于给 NioServerSocketChannel 维护一个 Map,通常也用不到这个方法
            b.attr(AttributeKey.newInstance("serverName"), "nettyServer");

            ChannelFuture f = b.bind(8888).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }


    private static class ServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            System.out.println("channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            System.out.println("handlerAdded");
        }


    }

}

其中,上述代码中有两段重要的代码,即本文解析的关键:

 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 EventLoopGroup workerGroup = new NioEventLoopGroup();

Netty的线程模型是典型的Reactor,上述声明的bossGroup和workerGroup可以理解为下图的mainReactor和subReactor,Netty为了提升IO处理效率,用mainReactor处理客户端连接请求,一旦客户端和服务端建立连接之后,后续的IO读写请求一律交由subReactor处理,需要注意的是,考虑到处理的读写请求可能涉及长时间的IO逻辑,使用过程中我们也可以将subReactor的业务处理提交到到自定义的ThreadPool中进行处理。

在这里插入图片描述

自此我们对Netty的线程模型有了一个比较初步的了解,接下来我们就开始对NioEventLoop进行剖析。

NioEventLoop创建

先来了解一下NioEventLoop的创建,这里我们以这段代码作为入口了解一下bossGroup 的创建:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

于是我们步入NioEventLoopGroup方法,于是我们代码来到了NioEventLoopGroup对构造方法的自调用。

public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

再次步入,还是自调用,入参分别是线程数、一个空的执行器,这里不难猜出这个executor是一个和线程池有关的变量,还有一个provider这里我们可以大抵猜测一个是和Selector有关的东西。

public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }

经过重重步进,我们的代码来到了MultithreadEventLoopGroup,由此我们可以得出上述入参的含义:

  1. 线程数:这里传参时会做个判断,如果nThreads 为0则传DEFAULT_EVENT_LOOP_THREADS,该数的值为CPU核心数的两倍,读者可以自行阅读源码了解。
  2. executor:这里我们传入null,后续会帮我们创建好。
  3. 可变参数args,这里面记录了我们上文传入的SelectorProvider.provider()
 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

再次不断的步进,代码来到了MultithreadEventExecutorGroup,这里就是我们的核心代码段了,逻辑比较长,大抵分为以下几步:

  1. 判断nThreads是否为有效值,若无效则直接抛异常,反之进入步骤2。
  2. 判断executor是否为空,若为空则初始化一个ThreadPerTaskExecutor
  3. 初始化EventExecutor,即NioEventLoop数组。
  4. 初始化EventExecutor数组中的每一个元素,即对数组每一个存一个NioEventLoop线程。
  5. 如果EventLoop线程创建失败则优雅关闭线程组。
  6. EventExecutor创建线程选择器。
  7. 设置Future处理异步回调。

核心步骤笔者在下文已贴出注释,读者可自行阅读,后文笔者会对每一个步骤展开讲解。

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
         //判断nThreads是否为有效值,若无效则直接抛异常
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
		//如果executor 为空,则进行初始化,我们传入的executor 确实为null所以,这里会进行一次初始化工作
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
		
		//初始化EventExecutor,即NioEventLoop数组
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
            //初始化EventExecutor数组中的每一个元素,即对数组每一个存一个NioEventLoop线程
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
            	//如果EventLoop线程创建失败则优雅关闭线程组
                if (!success) {
                	
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
		//初始化选择器
        chooser = chooserFactory.newChooser(children);
		//设置Future处理异步回调
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        //略
    }
初始化executor

我们先来说说初始化executor这一步,因为我们传入的executor为空,所以在这一步会进行创建,可以看到入参为newDefaultThreadFactory方法创建一个默认的线程工厂,我们步入查看一下其内部执行逻辑。

步入代码我们来到MultithreadEventLoopGroup,其内部的代码如下所示,从参数可知它传入了某个类的类型,以及线程优先级,我们可以步入查看一下。

@Override
    protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
    }

继续步入我们来到了DefaultThreadFactory的构造方法,可以看到传了3个参数,自调用构造方法:

public DefaultThreadFactory(Class<?> poolType, int priority) {
        this(poolType, false, priority);
    }

查看构造方法,由此我们可知上述参数的含义:

  1. 基于类类型poolType获取线程池的名字,这里内部涉及一些字符串转换不是本文重点,读者可自行参阅。
  2. 守护线程设置false,确保JVM 会在终止之前等待任何用户线程完成其任务。
  3. 线程的优先级设置为最高级。
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
        this(toPoolName(poolType), daemon, priority);
    }

最终我们的代码来到DefaultThreadFactory的构造方法,将上述的参数存到DefaultThreadFactory的成员变量中完成创建。


//直接调用下方的方法
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
        this(poolName, daemon, priority, System.getSecurityManager() == null ?
                Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
    }



 public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
        //略
		//设置线程中线程的前缀
        prefix = poolName + '-' + poolId.incrementAndGet() + '-';
        //设置为非守护线程
        this.daemon = daemon;
        //优先级设置为最高级
        this.priority = priority;
        //设置线程组
        this.threadGroup = threadGroup;
    }

自此我们将下述代码newDefaultThreadFactory创建讲解完成,而ThreadPerTaskExecutor构造方法就会拿着newDefaultThreadFactory这个工厂给成员变量threadFactory赋值,后续每个NioEventLoop对应的线程的创建都是基于executor 中的线程工厂进行创建。

if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
 }
初始化EventLoop数组

接下来就是完成children 的创建和初始化工作,它是一个NioEventLoop数组,通过遍历的方式调用newChild完成NioEventLoop创建,而在创建过程中如果失败的话,会通过遍历childrenshutdownGracefully关闭执行器,并通过awaitTermination等待执行器完成所有工作后关闭。

children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

查看newChild方法,步入源码会来到NioEventLoopGroupnewChild,它会传入大量的入参,它们依次是:

  1. 当前类,this对象。
  2. 线程池即执行器executor
  3. SelectorProviderJava NIO 的一个类,它提供了创建和管理 Selector 对象的功能。
  4. SelectStrategyFactoryJava NIO提供的一个工厂类,用于创建SelectStrategy类,SelectStrategy定义选择期间应该使用的选择策略。
  5. RejectedExecutionHandler即线程池的拒绝策略。
@Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

基于这些参数,后续的工作分为两大部分的创建,先来看看外层这一部分:

  1. 设置selectorProvider
  2. 基于selectorProvider完成selector创建
  3. selectStrategy 选择策略的初始化。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        //略
        provider = selectorProvider;
        selector = openSelector();
        selectStrategy = strategy;
    }

再来看看super做了些什么:

  1. 调用super构造完成EventLoop数组设置。
  2. 设置executor
  3. 任务队列taskQueue 创建,后续非EventLoop线程的任务都会丢到这个队列中。
  4. 拒绝策略初始化。
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        //调用`super`构造完成`EventLoop`数组设置。
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        //执行器赋值。
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        //任务队列taskQueue 创建,后续非EventLoop线程的任务都会丢到这个队列中
        taskQueue = newTaskQueue(this.maxPendingTasks);
        //拒绝策略初始化
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

自此我们将初始化EventLoop数组步骤也讲完了,来小结一下它的整体步骤:

  1. 遍历NioEventLoop数组调用newChild方法。
  2. 基于newChild对为数组每个索引位置设置EventLoop
  3. 每个EventLoop都会记录executorSelectorProvider、以及Selector选择时用到选择策略对象SelectStrategy、还有线程对应的拒绝策略RejectedExecutionHandler,这里我们只要知道EventLoop的成员属性有哪些,具体会在后文的工作流程中展开。
选择器初始化

接下来就是选择器的初始化工作,chooserFactory是入参传入的DefaultEventExecutorChooserFactory单例对象。而children即是上一步初始化好的NioEventLoop数组。

chooser = chooserFactory.newChooser(children);

步入newChooser方法可以看到,它会将我们上文所创建的EventLoop线程数组作为参数传入,并判断该方法是否是2的幂次方,如果是则创建PowerOfTowEventExecutorChooser并返回,反之创建GenericEventExecutorChooser返回。

@Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
    	//如果是2的幂次方则返回PowerOfTowEventExecutorChooser
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
        	//反之返回GenericEventExecutorChooser
            return new GenericEventExecutorChooser(executors);
        }
    }

先来看看PowerOfTowEventExecutorChooser ,可以看到它对于执行器executors的选择算法是通过原子类自增和executors.length - 1进行&运算

private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
		//索引计算原子类
        private final AtomicInteger idx = new AtomicInteger();
        //NioEventLoop数组
        private final EventExecutor[] executors;

        //获取NioEventLoop的算法
        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

这种算法通过位运算的方式提升计算效率,那么是否存在索引越界问题呢?假设数组长度为8,那么实际进行与运算的值就是7,笔者分别带入索引0、5、8,进行与运算时,真正参与的二进制永远是和永远是6以内的进制,得出的结果分别是0、5、0,永远不会越界,并且运算性能还能得到保证。

在这里插入图片描述

同理非2的次幂则用无法使用位运算,所以GenericEventExecutorChooser 是直接进行取模运算再取绝对值即可。

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
terminationFuture回调注册

最后就是terminationFuture回调注册了,确保linkEventExecutorGroup管理的所有EventExecutor都已终止时,并及时通知回调该方法。

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

自此我们将NioEventLoop创建的创建流程全部梳理完毕,小结一下整体步骤:

  1. 创建并初始化EventLoop数组。
  2. EventLoop选择器初始化。
  3. terminationFuture回调注册

NioEventLoop启动流程

接下来我们就来了解一下NioEventLoop的启动流程,我们在这段代码上打个断点:

    ChannelFuture f = b.bind(8888).sync();

不断步入bind源码可以来到了AbstractBootstrapbind方法,可以看到该方法会调用一个叫doBind的方法。

public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

步入doBind可以看到它channel的创建和初始化方法initAndRegister

private ChannelFuture doBind(final SocketAddress localAddress) {
		//初始化和注册channel
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        
    }

initAndRegister内部在完成channel创建之后就开始注册channel,而register方法就是我们调用的要讲解的核心。

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
        	//创建并初始化channel
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
           //略
        }
		//注册channel
        ChannelFuture regFuture = config().group().register(channel);
       //略
    }

可以看到这里会通过上文中初始化好的NioEventLoop数组并通过EventLoop选择器EventExecutorChoosernext方法得到一个的NioEventLoop然后调用register

@Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

于是我们就来到了最核心的部分,可以看到这样一段逻辑:

  1. 如果当前线程是eventLoop则调用直接调用register0,反之进入步骤2。
  2. 反之通过eventLoopexecuteregister0eventLoop中执行。
@Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            //略
            
            AbstractChannel.this.eventLoop = eventLoop;
			//如果当前线程是`eventLoop`则调用直接调用`register0`
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                //通过eventLoop的execute让register0在eventLoop中执行
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                   //略
                }
            }
        }

execute逻辑比较简单:

  1. 判断任务是否为空,若为空则抛异常,反之执行步骤2。
  2. 判断当前线程是否是是eventLoop ,如果是则直接添加任务,反之执行步骤3。
  3. 启动一个eventLoop线程必将任务提交到task队列中。

因为我们当前执行的线程是main线程,所以走了第二个分支,启动一个eventLoop

@Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        //是eventLoop则直接添加任务
        if (inEventLoop) {
            addTask(task);
        } else {
        	//因为当前线程不是eventLoop,先为eventLoop分配一个线程,然后再提交任务
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        //略
    }

因为我们的线程不是eventLoop,于是走到第二个分支调用startThread方法,于是我们来到了SingleThreadEventExecutor,该方法会先判断当前线程执行状态是否未启动然后通过CAS的方式修改状态为启动,然后调用doStartThread真正开始线程的创建工作。

 private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }

doStartThread方法整体逻辑如下:

  1. 通过executor(即我们上文创建的ThreadPerTaskExecutor这个线程工厂创建一个线程并执行run方法。
  2. run方法将当前这个创建好的线程赋值到eventLoopthread成员变量。
  3. 更新执行时间。
  4. 调用SingleThreadEventExecutor.this.run();开始轮询并处理各种读写任务。
private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    //略
            }
        });
    }

我们直接看execute方法做了些什么,步入源码来到ThreadPerTaskExecutorexecute,代码通过我们上文的线程工厂创建一个线程,然后调用start方法将其启动,并执行上文的任务。

 @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }

newThread逻辑比较简单了,基于线程设置的线程池名称以及原子类自增创建一个特殊名字的线程,然后设置是否是守护线程以及优先级,优先级上文设置线程池时已经赋值为了为10,所以这里的setPriority也为10。

@Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
        try {
        	//设置是否守护线程
            if (t.isDaemon()) {
                if (!daemon) {
                    t.setDaemon(false);
                }
            } else {
                if (daemon) {
                    t.setDaemon(true);
                }
            }
			//设置优先级
            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        }
        return t;
    }

自此NioEventLoop线程的创建和启动流程也讲解完成了。

NioEventLoop执行逻辑

整体步骤

上文提及NioEventLoop执行逻辑即我们上一步所提交的任务中的这段代码:

 SingleThreadEventExecutor.this.run();

步入run方法,我们来到了NioEventLoop的逻辑中,它会执行这样一段for循环:

  1. 先调用hasTasks判断是否有任务,如果有任务则selectStrategy调用calculateStrategy时会直接通过NioEventLoop.this.selectNow()进行非阻塞事件轮询,如果返回 -2则走到SelectStrategy.CONTINUE进入下一次循环,反之进入步骤2。
  2. 来到步骤2则说明hasTasks返回falsecalculateStrategy会返回-1,于是走到SelectStrategy.SELECT进行阻塞式事件轮询。
  3. 获取ioRatio的值,如果ioRatio 为100,则先执行IO操作,然后在finally模块执行taskQueue中的非IO操作,反之执行步骤4。
  4. 来到步骤4说明ioRatio为50则先执行IO任务,然后按照同等IO任务的执行时间继续执行非IO任务。
 @Override
    protected void run() {
        for (;;) {
            try {
            //先调用hasTasks判断是否有任务,如果有任务则selectStrategy调用calculateStrategy时会直接通过NioEventLoop.this.selectNow()进行非阻塞事件轮询
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                     //基于阻塞式IO轮询获取事件
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                //默认情况下ioRatio 为50
                final int ioRatio = this.ioRatio;
                //如果ioRatio 为100,则先执行IO操作,然后在finally模块执行taskQueue中的非IO操作
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        runAllTasks();
                    }
                } else {
                //先调用processSelectedKeys执行IO任务,然后基于IO任务执行时间调用runAllTasks执行剩下的非IO任务
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
           //略
        }
    }
阻塞式select

接下来我们对核心步骤进行讲解,当代码走到SelectStrategy.SELECT,代码会走到NioEventLoopselect方法,开始一段for循环:

  1. 先计算超时时间timeoutMillis ,如果小于等于则说明还未超时,再判断selectCnt 是否为0,若为0则说是第一次进行select操作,则进行非阻塞的select操作后将标记selectCnt 为1,跳出循环,反之进入步骤2。
  2. 判断是否存在task,且通过CAS将wakenUp状态设置为true之后进行非阻塞式select,标记selectCnt 为1后跳出循环,反之进入步骤3。
  3. 来到步骤3则说明上述操作没有得到任何可以轮询到任何事件,便基于超时事件进行一次阻塞式select,并对selectCnt进行一次自增,只要选择到了任务或者执行任务时eventLoop已经被唤醒,或者wakeUp唤醒成功,亦或者task队列或者定时任务队列有任务,则结束循环,进行事件处理,反之进入步骤4。
  4. 判断time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos,若为true则说明本次进行了一次长时间的空轮询操作,标记selectCnt为1,反之进入步骤5。
  5. 如果selectCnt 大于SELECTOR_AUTO_REBUILD_THRESHOLD则说明空轮询进行了512次,则调用rebuildSelector将所有的channel注册到新的selector上,从而避免空轮询问题。
private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
            //计算超时事件
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                //如果未超时且第一次轮询则进行一次非阻塞式循环,然后退出循环
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                //如果存在任务,则基于CAS将wakenUp设置为true成功,则进行非阻塞轮询并退出循环去处理这些任务
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

				//如下注释所示,轮询上述步骤轮询到了任务、或者当前wakeup被用户唤醒、或当前队列中还有任务则退出循环去处理
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
               //略

                long time = System.nanoTime();
                //判断time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos,若为true则说明本次进行了一次长时间的空轮询操作,标记selectCnt为1
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        //如果selectCnt 大于SELECTOR_AUTO_REBUILD_THRESHOLD则说明空轮询进行了512次,则调用rebuildSelector将所有的channel注册到新的selector上
                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
           //略
        }
    }

自此我们将SelectStrategy.SELECT这个分支的流程讲解完成,读者可以参考上述的核心步骤讲解概述自行阅读代码。

轮询事件处理processSelectedKeys

我们再来说说processSelectedKeys会对轮询到的事件进行处理,如果selectedKeys 不为空,则直接调用processSelectedKeysOptimized,我们都知道selectedKeys 在之前的步骤中创建NioEventLoop时调用openSelector就已经完成了创建,所以自然不为空,于是我们直接走到第一个分支。

private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

processSelectedKeysOptimized方法会遍历selectedKeys,然后通过processSelectedKey进行处理。

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
		//遍历并处理这些key
        for (int i = 0;; i ++) {
        //拿到当前的SelectionKey 
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
           //将当前位置数组置空,以便后续进行GC
            selectedKeys[i] = null;
			//拿到附加在这个SelectionKey的对象
            final Object a = k.attachment();
			//如果是channel则说明当前这个事件是这个channel的事件,直接调用processSelectedKey处理这个channel的事件
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            //略
        }
    }

最终代码来到了processSelectedKey,可以看到改方法对应每个事件都有特定的处理逻辑,而对应事件有:

  1. Connect:连接事件(TCP 连接), 对应于SelectionKey.OP_CONNECT值为8。
  2. Accept:客户端连接请求事件, 对应于SelectionKey.OP_ACCEPT值为16。
  3. Read: 读事件, 对应于SelectionKey.OP_READ,表示 buffer 可读值为1。
  4. Write:写事件, 对应于SelectionKey.OP_WRITE,表示 buffer 可写值为4。

整体处理流程详见下面代码注释:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        //如果当前的SelectionKey 为无效key,则直接关闭当前channel
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
               
                return;
            }
         
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
         //如果是connect事件,则通过位运算将connect事件的数值删除并通过unsafe完成连接
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
              
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

           //如果是写事件则调用unsafe类的forceFlush进行写数据强制刷新到底层
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }

            //无论是读即Read事件或者accept即连接事件都是读事件都调用 unsafe.read()进行处理
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

自此processSelectedKey的核心流程也讲解完成了,它的整体流程比较简单:

  1. 如果selectedKeys初始化好不为空,则直接处理selectedKeys中轮询到的事件,反之调用selector进行事件轮询再进入步骤2。
  2. 遍历轮询到的keys,如果key为空直接结束循环,反之进入步骤3。
  3. 判断当前key绑定的对象是否是一个channel,如果是则说明这个事件是channel的读写请求,直接调用processSelectedKey,按照事件类型调用unsafe类进行处理,反之进入步骤4。
  4. 来到步骤4说明这个任务是一个NioTask,任务会在Selector选择SelectableChannel完成时调用。

NioEventLoop常见面试题

Netty在默认情况下起多少个线程

先说说答案,默认情况下线程数为CPU核心数目的两倍,我们可以在NioEventLoopGroup的构造方法看到,在默认情况下NioEventLoopGroup的线程数会传0,调用来到MultithreadEventLoopGroup时,按照三元表达式,线程数就会设置为DEFAULT_EVENT_LOOP_THREADS

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
      //默认情况下,nThreads为0,所以最终会创建DEFAULT_EVENT_LOOP_THREADS即CPU核心数两倍的线程
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

查看DEFAULT_EVENT_LOOP_THREADS 的声明和初始化,可以印证默认情况下线程数为CPU核心数的两倍。

	private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
    	//默认情况下DEFAULT_EVENT_LOOP_THREADS设置为CPU核心数的两倍
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

      //略
    }

NioEventLoop会在什么时候启动

NioEventLoop会在channel注册时启动,我们可以查看AbstractBootstrapinitAndRegister方法,可以看到一个EventLoopGroupregister方法的调用。

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
        //channel创建和初始化
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
           //略
        }
		//channel注册
        ChannelFuture regFuture = config().group().register(channel);
       //略
        return regFuture;
    }

步入register方法可以看到,因为我们当前执行的线程不是eventLoop,所以会调用eventLoopexecute方法将NioEventLoop线程启动。

 @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
           //略
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                //因为当前执行线程不是eventLoop,所以调用execute启动eventLoop线程完成register0
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                  //略
                }
            }
        }

步入其内部就可以看到一个startThread的调用为eventLoop分配工作线程。

 @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        //因为调用execute不是eventLoop线程,所以调用startThread方法启动一个eventLoop线程
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        //略
    }

它最终会通过doStartThread方法基于executor创建一个线程,然后执行run方法,将当前创建的eventLoop线程赋值给eventLoop,并调用SingleThreadEventExecutor.this.run();方法(注意:这里的this就是eventLoop)将NioEventLoop启动,便开始进行常规的事件轮询、IO任务处理、队列任务处理等常规步骤了。

private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                //将NioEventLoop启动,开始进行常规的事件轮询、IO任务处理、队列任务处理
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                   
                   //略
                }
            }
        });
    }

Netty如何解决jdk空轮询问题的

这个问题是JDK NIO epoll模型的经典bug,尽管官方说在JDK1.618修复了这个问题,但是在JDK7仍然存在问题,问题的原因即在没有事件且没有wakeup时,Selector不断进行空轮询,导致CPU100%。
而Netty解决这个问题的方式很简单:

  1. 记录本轮执行起始时间,以及超时时限。
  2. Selector轮询结束记录结束时间。
  3. 如果本月没有轮询到任务且如果结束时间减去超时时限小于起始时间,则自增selectCnt。
  4. selectCnt达到512次(默认值为512),则重新创建Selector,调用rebuildSelector将所有的channel注册到新的selector

对应核心代码在NioEventLoopselect方法上。

//当事件轮询次数超过SELECTOR_AUTO_REBUILD_THRESHOLD(默认值为512),调用rebuildSelector完成
else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    //略
					
					//将所有的channel注册到新的selector上
                    rebuildSelector();
                    selector = this.selector;

                    // 进行非阻塞式select和重置计数selectCnt 
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

步入rebuildSelector可以看到对应重新注册的源码,核心步骤为:

  1. 创建一个新的Selector
  2. 将原有事件从旧Selector中取消。
  3. 注册可用事件到新的Selector上,并激活。
  4. NioEventLoopselector 设置为新的selector
public void rebuildSelector() {
        if (!inEventLoop()) {
            execute(new Runnable() {
                @Override
                public void run() {
                    rebuildSelector();
                }
            });
            return;
        }

        final Selector oldSelector = selector;
        final Selector newSelector;

        if (oldSelector == null) {
            return;
        }

        try {
        	//创建一个新的Selector
            newSelector = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }

        // Register all channels to the new Selector.
        int nChannels = 0;
        for (;;) {
            try {
            	//遍历旧的Selector中注册的事件
                for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                    try {
                        if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                            continue;
                        }

                        int interestOps = key.interestOps();
                        key.cancel();
                        //将原有事件从旧Selector中取消并注册到新的Selector上
                        SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                        //如果a是channel则将其和新的Selector上的key进行绑定,确保收到相关事件时能够及时通知到这个channel
                        if (a instanceof AbstractNioChannel) {
                            // Update SelectionKey
                            ((AbstractNioChannel) a).selectionKey = newKey;
                        }
                        nChannels ++;
                    } catch (Exception e) {
                        //略
                    }
                }
            } catch (ConcurrentModificationException e) {
                 //略
        }
		//将NioEventLoop的selector 设置为新的selector 
        selector = newSelector;

        //略
    }

Netty如何保证异步串行无锁化

首先是保证任务的由统一的线程管理和以及任务线程安全的添加,它的执行步骤大致为:

  1. Netty在提交任务时会判断提交这个任务的线程是否是EventLoop线程,如果不是EventLoop则提到到MpscChunkedArrayQueue这个队列中,然后启动当前EventLoop执行。反之进入步骤2。
  2. 如果当前调用这个任务的是EventLoop线程,它会直接将任务存到MpscChunkedArrayQueue队列中,并把任务wakeup
  3. EventLoop线程中的thread线程,会串行的从队列中取出任务执行,由此完成异步串行无锁化。

我们可以从SingleThreadEventExecutorexecute看到,对于任务的管理都会调用一个addTask完成任务的同理管理,我们不妨步入查看。

@Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
		
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

addTask内部则是直接调用offerTask将任务提交到MpscChunkedArrayQueue中,MpscChunkedArrayQueue从名字即可看出是个多生产者单消费者的队列(Mp(producer)s(single)c(consumer))

 protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }

从源码中可以看出MpscChunkedArrayQueue的添加操作都是基于CAS的,多个生产者可以同时将元素添加到队列中。每个生产者使用自旋(spin)方式尝试获取可用的位置,并将元素放入合适的位置。所以可以保证多线程添加时的线程安全:

public boolean offer(E e) {
        if (null == e) {
            throw new NullPointerException();
        } else {
            while(true) {
                while(true) {
                //获得限制生产者线程的写入速率lvProducerLimit,以及当前生产者写入元素的索引位置lvProducerIndex
                    long offset = this.lvProducerLimit();
                    long pIndex = this.lvProducerIndex();
                 	//略
                 	
						//通过CAS操作更新下次存放元素的索引位置,如果成功则基于pIndex获得实际内存偏移量将元素存到数组中
                        if (this.casProducerIndex(pIndex, pIndex + 2L)) {
                            offset = modifiedCalcElementOffset(pIndex, mask);
                            UnsafeRefArrayAccess.soElement(buffer, offset, e);
                            return true;
                        }
                    }
                }
            }
        }
    }

于是eventLoop的中唯一的线程就不用依靠上锁的方式直接到MpscChunkedArrayQueue队列中取任务,完成任务的执行。

参考文献

Netty底层源码解析-NioEventLoop原理分析:https://github.com/coderbruis/JavaSourceCodeLearning/blob/master/note/Netty/Netty底层源码解析-NioEventLoop原理分析.md

netty4核心源码分析第四篇一NioEventLoopGroup创建详解:https://blog.51cto.com/u_11108174/5952318

Netty源码分析 (一)----- NioEventLoopGroup:https://www.cnblogs.com/java-chen-hao/p/11453562.html

一文秒懂 Java 守护线程 ( Daemon Thread ):https://www.twle.cn/c/yufei/javatm/javatm-basic-daemon-thread.html

Netty源码------NioEventLoop源码详解:https://blog.csdn.net/qqq3117004957/article/details/106458358

Netty中的异步串行无锁化:https://zhuanlan.zhihu.com/p/91097888

文章来源:https://blog.csdn.net/shark_chili3007/article/details/121461362
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。