NioEventLoop
是Netty
基于Java NIO(new I/O)
包实现的一个重要组件,它具备以下几个重要的特点:
NioEventLoop
内部包含一个事件队列,它会并按照收到的事件类型分发到对应的处理器上。(Selector)
监听多个通道多个通道(即各个接入的连接),使得我们无需为每一个连接创建一个线程,不仅节约系统资源开销,还提升了系统的吞吐量。如下图所示,Netty
对于EventLoop
的设计仍然是遵循Reactor
模型的,通过非阻塞式的事件驱动模式,让EventLoop
中的Selector
完成通道的注册、事件的轮询、任务的分发。
所以本篇文章就会基于源码分析的方式,来了解NioEventLoop
的创建、启动、执行这几个过程。
在正式分析源码之前,笔者这里给出一段服务端配置的代码示例,作为调试的入口,读者可以根据笔者的讲解自行调试阅读。
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new AuthHandler());
//..
}
});
// option() 方法 用于给服务端Channel设置一些TCP参数 SO_BACKLOG 表示系统用于临时存放已完成三次握手的请求的队列的最大长度
b.option(ChannelOption.SO_BACKLOG, 1024);
// attr() 方法 用于给 NioServerSocketChannel 维护一个 Map,通常也用不到这个方法
b.attr(AttributeKey.newInstance("serverName"), "nettyServer");
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("handlerAdded");
}
}
}
其中,上述代码中有两段重要的代码,即本文解析的关键:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
Netty的线程模型是典型的Reactor,上述声明的bossGroup和workerGroup可以理解为下图的mainReactor和subReactor,Netty为了提升IO处理效率,用mainReactor处理客户端连接请求,一旦客户端和服务端建立连接之后,后续的IO读写请求一律交由subReactor处理,需要注意的是,考虑到处理的读写请求可能涉及长时间的IO逻辑,使用过程中我们也可以将subReactor的业务处理提交到到自定义的ThreadPool中进行处理。
自此我们对Netty的线程模型有了一个比较初步的了解,接下来我们就开始对NioEventLoop进行剖析。
先来了解一下NioEventLoop
的创建,这里我们以这段代码作为入口了解一下bossGroup
的创建:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
于是我们步入NioEventLoopGroup
方法,于是我们代码来到了NioEventLoopGroup
对构造方法的自调用。
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
再次步入,还是自调用,入参分别是线程数、一个空的执行器,这里不难猜出这个executor
是一个和线程池有关的变量,还有一个provider
这里我们可以大抵猜测一个是和Selector
有关的东西。
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
经过重重步进,我们的代码来到了MultithreadEventLoopGroup
,由此我们可以得出上述入参的含义:
nThreads
为0则传DEFAULT_EVENT_LOOP_THREADS
,该数的值为CPU
核心数的两倍,读者可以自行阅读源码了解。executor
:这里我们传入null,后续会帮我们创建好。args
,这里面记录了我们上文传入的SelectorProvider.provider()
。 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
再次不断的步进,代码来到了MultithreadEventExecutorGroup
,这里就是我们的核心代码段了,逻辑比较长,大抵分为以下几步:
nThreads
是否为有效值,若无效则直接抛异常,反之进入步骤2。executor
是否为空,若为空则初始化一个ThreadPerTaskExecutor
。EventExecutor
,即NioEventLoop
数组。EventExecutor
数组中的每一个元素,即对数组每一个存一个NioEventLoop
线程。EventLoop
线程创建失败则优雅关闭线程组。EventExecutor
创建线程选择器。Future
处理异步回调。核心步骤笔者在下文已贴出注释,读者可自行阅读,后文笔者会对每一个步骤展开讲解。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//判断nThreads是否为有效值,若无效则直接抛异常
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//如果executor 为空,则进行初始化,我们传入的executor 确实为null所以,这里会进行一次初始化工作
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//初始化EventExecutor,即NioEventLoop数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//初始化EventExecutor数组中的每一个元素,即对数组每一个存一个NioEventLoop线程
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//如果EventLoop线程创建失败则优雅关闭线程组
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//初始化选择器
chooser = chooserFactory.newChooser(children);
//设置Future处理异步回调
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
//略
}
我们先来说说初始化executor
这一步,因为我们传入的executor
为空,所以在这一步会进行创建,可以看到入参为newDefaultThreadFactory
方法创建一个默认的线程工厂,我们步入查看一下其内部执行逻辑。
步入代码我们来到MultithreadEventLoopGroup
,其内部的代码如下所示,从参数可知它传入了某个类的类型,以及线程优先级,我们可以步入查看一下。
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
继续步入我们来到了DefaultThreadFactory
的构造方法,可以看到传了3个参数,自调用构造方法:
public DefaultThreadFactory(Class<?> poolType, int priority) {
this(poolType, false, priority);
}
查看构造方法,由此我们可知上述参数的含义:
poolType
获取线程池的名字,这里内部涉及一些字符串转换不是本文重点,读者可自行参阅。false
,确保JVM
会在终止之前等待任何用户线程完成其任务。public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
最终我们的代码来到DefaultThreadFactory
的构造方法,将上述的参数存到DefaultThreadFactory
的成员变量中完成创建。
//直接调用下方的方法
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, System.getSecurityManager() == null ?
Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
//略
//设置线程中线程的前缀
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
//设置为非守护线程
this.daemon = daemon;
//优先级设置为最高级
this.priority = priority;
//设置线程组
this.threadGroup = threadGroup;
}
自此我们将下述代码newDefaultThreadFactory
创建讲解完成,而ThreadPerTaskExecutor
构造方法就会拿着newDefaultThreadFactory
这个工厂给成员变量threadFactory
赋值,后续每个NioEventLoop
对应的线程的创建都是基于executor
中的线程工厂进行创建。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
接下来就是完成children
的创建和初始化工作,它是一个NioEventLoop
数组,通过遍历的方式调用newChild
完成NioEventLoop
创建,而在创建过程中如果失败的话,会通过遍历children
的shutdownGracefully
关闭执行器,并通过awaitTermination
等待执行器完成所有工作后关闭。
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
查看newChild
方法,步入源码会来到NioEventLoopGroup
的newChild
,它会传入大量的入参,它们依次是:
this
对象。executor
。SelectorProvider
即 Java NIO
的一个类,它提供了创建和管理 Selector
对象的功能。SelectStrategyFactory
是Java NIO
提供的一个工厂类,用于创建SelectStrategy
类,SelectStrategy
定义选择期间应该使用的选择策略。RejectedExecutionHandler
即线程池的拒绝策略。@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
基于这些参数,后续的工作分为两大部分的创建,先来看看外层这一部分:
selectorProvider
。selectorProvider
完成selector
创建selectStrategy
选择策略的初始化。NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
//略
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}
再来看看super
做了些什么:
super
构造完成EventLoop
数组设置。executor
。taskQueue
创建,后续非EventLoop
线程的任务都会丢到这个队列中。protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
//调用`super`构造完成`EventLoop`数组设置。
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
//执行器赋值。
this.executor = ObjectUtil.checkNotNull(executor, "executor");
//任务队列taskQueue 创建,后续非EventLoop线程的任务都会丢到这个队列中
taskQueue = newTaskQueue(this.maxPendingTasks);
//拒绝策略初始化
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
自此我们将初始化EventLoop
数组步骤也讲完了,来小结一下它的整体步骤:
NioEventLoop
数组调用newChild
方法。newChild
对为数组每个索引位置设置EventLoop
。EventLoop
都会记录executor
、SelectorProvider
、以及Selector
选择时用到选择策略对象SelectStrategy
、还有线程对应的拒绝策略RejectedExecutionHandler
,这里我们只要知道EventLoop
的成员属性有哪些,具体会在后文的工作流程中展开。接下来就是选择器的初始化工作,chooserFactory
是入参传入的DefaultEventExecutorChooserFactory
单例对象。而children
即是上一步初始化好的NioEventLoop
数组。
chooser = chooserFactory.newChooser(children);
步入newChooser
方法可以看到,它会将我们上文所创建的EventLoop
线程数组作为参数传入,并判断该方法是否是2的幂次方,如果是则创建PowerOfTowEventExecutorChooser
并返回,反之创建GenericEventExecutorChooser
返回。
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
//如果是2的幂次方则返回PowerOfTowEventExecutorChooser
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
//反之返回GenericEventExecutorChooser
return new GenericEventExecutorChooser(executors);
}
}
先来看看PowerOfTowEventExecutorChooser
,可以看到它对于执行器executors
的选择算法是通过原子类自增和executors.length - 1
进行&运算
。
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
//索引计算原子类
private final AtomicInteger idx = new AtomicInteger();
//NioEventLoop数组
private final EventExecutor[] executors;
//获取NioEventLoop的算法
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
这种算法通过位运算的方式提升计算效率,那么是否存在索引越界问题呢?假设数组长度为8,那么实际进行与运算的值就是7,笔者分别带入索引0、5、8,进行与运算时,真正参与的二进制永远是和永远是6以内的进制,得出的结果分别是0、5、0,永远不会越界,并且运算性能还能得到保证。
同理非2的次幂则用无法使用位运算,所以GenericEventExecutorChooser
是直接进行取模运算再取绝对值即可。
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
最后就是terminationFuture
回调注册了,确保linkEventExecutorGroup
管理的所有EventExecutor
都已终止时,并及时通知回调该方法。
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
自此我们将NioEventLoop
创建的创建流程全部梳理完毕,小结一下整体步骤:
EventLoop
数组。EventLoop
选择器初始化。terminationFuture
回调注册接下来我们就来了解一下NioEventLoop
的启动流程,我们在这段代码上打个断点:
ChannelFuture f = b.bind(8888).sync();
不断步入bind
源码可以来到了AbstractBootstrap
的bind
方法,可以看到该方法会调用一个叫doBind
的方法。
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
步入doBind
可以看到它channel
的创建和初始化方法initAndRegister
。
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化和注册channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
}
initAndRegister
内部在完成channel
创建之后就开始注册channel
,而register
方法就是我们调用的要讲解的核心。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//创建并初始化channel
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
//略
}
//注册channel
ChannelFuture regFuture = config().group().register(channel);
//略
}
可以看到这里会通过上文中初始化好的NioEventLoop
数组并通过EventLoop
选择器EventExecutorChooser
的next
方法得到一个的NioEventLoop
然后调用register
。
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
于是我们就来到了最核心的部分,可以看到这样一段逻辑:
eventLoop
则调用直接调用register0
,反之进入步骤2。eventLoop
的execute
让register0
在eventLoop
中执行。@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//略
AbstractChannel.this.eventLoop = eventLoop;
//如果当前线程是`eventLoop`则调用直接调用`register0`
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//通过eventLoop的execute让register0在eventLoop中执行
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
//略
}
}
}
execute
逻辑比较简单:
eventLoop
,如果是则直接添加任务,反之执行步骤3。eventLoop
线程必将任务提交到task
队列中。因为我们当前执行的线程是main
线程,所以走了第二个分支,启动一个eventLoop
。
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
//是eventLoop则直接添加任务
if (inEventLoop) {
addTask(task);
} else {
//因为当前线程不是eventLoop,先为eventLoop分配一个线程,然后再提交任务
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
//略
}
因为我们的线程不是eventLoop
,于是走到第二个分支调用startThread
方法,于是我们来到了SingleThreadEventExecutor
,该方法会先判断当前线程执行状态是否未启动然后通过CAS
的方式修改状态为启动,然后调用doStartThread
真正开始线程的创建工作。
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
doStartThread
方法整体逻辑如下:
executor
(即我们上文创建的ThreadPerTaskExecutor
这个线程工厂创建一个线程并执行run
方法。run
方法将当前这个创建好的线程赋值到eventLoop
的thread
成员变量。SingleThreadEventExecutor.this.run();
开始轮询并处理各种读写任务。private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//略
}
});
}
我们直接看execute
方法做了些什么,步入源码来到ThreadPerTaskExecutor
的execute
,代码通过我们上文的线程工厂创建一个线程,然后调用start
方法将其启动,并执行上文的任务。
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
而newThread
逻辑比较简单了,基于线程设置的线程池名称以及原子类自增创建一个特殊名字的线程,然后设置是否是守护线程以及优先级,优先级上文设置线程池时已经赋值为了为10,所以这里的setPriority也为10。
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
try {
//设置是否守护线程
if (t.isDaemon()) {
if (!daemon) {
t.setDaemon(false);
}
} else {
if (daemon) {
t.setDaemon(true);
}
}
//设置优先级
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
自此NioEventLoop
线程的创建和启动流程也讲解完成了。
上文提及NioEventLoop
执行逻辑即我们上一步所提交的任务中的这段代码:
SingleThreadEventExecutor.this.run();
步入run
方法,我们来到了NioEventLoop
的逻辑中,它会执行这样一段for
循环:
hasTasks
判断是否有任务,如果有任务则selectStrategy
调用calculateStrategy
时会直接通过NioEventLoop.this.selectNow()
进行非阻塞事件轮询,如果返回 -2则走到SelectStrategy.CONTINUE
进入下一次循环,反之进入步骤2。hasTasks
返回false
,calculateStrategy
会返回-1,于是走到SelectStrategy.SELECT
进行阻塞式事件轮询。ioRatio
的值,如果ioRatio
为100,则先执行IO操作,然后在finally模块执行taskQueue中的非IO操作,反之执行步骤4。ioRatio
为50则先执行IO任务,然后按照同等IO任务的执行时间继续执行非IO任务。 @Override
protected void run() {
for (;;) {
try {
//先调用hasTasks判断是否有任务,如果有任务则selectStrategy调用calculateStrategy时会直接通过NioEventLoop.this.selectNow()进行非阻塞事件轮询
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
//基于阻塞式IO轮询获取事件
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
//默认情况下ioRatio 为50
final int ioRatio = this.ioRatio;
//如果ioRatio 为100,则先执行IO操作,然后在finally模块执行taskQueue中的非IO操作
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
//先调用processSelectedKeys执行IO任务,然后基于IO任务执行时间调用runAllTasks执行剩下的非IO任务
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
//略
}
}
接下来我们对核心步骤进行讲解,当代码走到SelectStrategy.SELECT
,代码会走到NioEventLoop
的select
方法,开始一段for循环:
timeoutMillis
,如果小于等于则说明还未超时,再判断selectCnt
是否为0,若为0则说是第一次进行select
操作,则进行非阻塞的select
操作后将标记selectCnt
为1,跳出循环,反之进入步骤2。task
,且通过CAS将wakenUp
状态设置为true之后进行非阻塞式select,标记selectCnt
为1后跳出循环,反之进入步骤3。select
,并对selectCnt
进行一次自增,只要选择到了任务或者执行任务时eventLoop
已经被唤醒,或者wakeUp
唤醒成功,亦或者task
队列或者定时任务队列有任务,则结束循环,进行事件处理,反之进入步骤4。time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
,若为true
则说明本次进行了一次长时间的空轮询操作,标记selectCnt
为1,反之进入步骤5。selectCnt
大于SELECTOR_AUTO_REBUILD_THRESHOLD
则说明空轮询进行了512次,则调用rebuildSelector
将所有的channel
注册到新的selector
上,从而避免空轮询问题。private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
//计算超时事件
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
//如果未超时且第一次轮询则进行一次非阻塞式循环,然后退出循环
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
//如果存在任务,则基于CAS将wakenUp设置为true成功,则进行非阻塞轮询并退出循环去处理这些任务
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
//如下注释所示,轮询上述步骤轮询到了任务、或者当前wakeup被用户唤醒、或当前队列中还有任务则退出循环去处理
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
//略
long time = System.nanoTime();
//判断time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos,若为true则说明本次进行了一次长时间的空轮询操作,标记selectCnt为1
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//如果selectCnt 大于SELECTOR_AUTO_REBUILD_THRESHOLD则说明空轮询进行了512次,则调用rebuildSelector将所有的channel注册到新的selector上
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
//略
}
}
自此我们将SelectStrategy.SELECT
这个分支的流程讲解完成,读者可以参考上述的核心步骤讲解概述自行阅读代码。
我们再来说说processSelectedKeys
会对轮询到的事件进行处理,如果selectedKeys
不为空,则直接调用processSelectedKeysOptimized
,我们都知道selectedKeys
在之前的步骤中创建NioEventLoop
时调用openSelector
就已经完成了创建,所以自然不为空,于是我们直接走到第一个分支。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
processSelectedKeysOptimized
方法会遍历selectedKeys
,然后通过processSelectedKey
进行处理。
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
//遍历并处理这些key
for (int i = 0;; i ++) {
//拿到当前的SelectionKey
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
//将当前位置数组置空,以便后续进行GC
selectedKeys[i] = null;
//拿到附加在这个SelectionKey的对象
final Object a = k.attachment();
//如果是channel则说明当前这个事件是这个channel的事件,直接调用processSelectedKey处理这个channel的事件
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
//略
}
}
最终代码来到了processSelectedKey
,可以看到改方法对应每个事件都有特定的处理逻辑,而对应事件有:
Connect
:连接事件(TCP 连接), 对应于SelectionKey.OP_CONNECT
值为8。Accept
:客户端连接请求事件, 对应于SelectionKey.OP_ACCEPT
值为16。Read
: 读事件, 对应于SelectionKey.OP_READ
,表示 buffer 可读值为1。Write
:写事件, 对应于SelectionKey.OP_WRITE
,表示 buffer 可写值为4。整体处理流程详见下面代码注释:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//如果当前的SelectionKey 为无效key,则直接关闭当前channel
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
//如果是connect事件,则通过位运算将connect事件的数值删除并通过unsafe完成连接
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//如果是写事件则调用unsafe类的forceFlush进行写数据强制刷新到底层
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//无论是读即Read事件或者accept即连接事件都是读事件都调用 unsafe.read()进行处理
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
自此processSelectedKey
的核心流程也讲解完成了,它的整体流程比较简单:
selectedKeys
初始化好不为空,则直接处理selectedKeys中轮询到的事件,反之调用selector进行事件轮询再进入步骤2。keys
,如果key
为空直接结束循环,反之进入步骤3。channel
,如果是则说明这个事件是channel
的读写请求,直接调用processSelectedKey
,按照事件类型调用unsafe
类进行处理,反之进入步骤4。NioTask
,任务会在Selector
选择SelectableChannel
完成时调用。先说说答案,默认情况下线程数为CPU
核心数目的两倍,我们可以在NioEventLoopGroup
的构造方法看到,在默认情况下NioEventLoopGroup
的线程数会传0,调用来到MultithreadEventLoopGroup
时,按照三元表达式,线程数就会设置为DEFAULT_EVENT_LOOP_THREADS
。
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//默认情况下,nThreads为0,所以最终会创建DEFAULT_EVENT_LOOP_THREADS即CPU核心数两倍的线程
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
查看DEFAULT_EVENT_LOOP_THREADS
的声明和初始化,可以印证默认情况下线程数为CPU
核心数的两倍。
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
//默认情况下DEFAULT_EVENT_LOOP_THREADS设置为CPU核心数的两倍
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
//略
}
NioEventLoop
会在channel
注册时启动,我们可以查看AbstractBootstrap
的initAndRegister
方法,可以看到一个EventLoopGroup
对register
方法的调用。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//channel创建和初始化
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
//略
}
//channel注册
ChannelFuture regFuture = config().group().register(channel);
//略
return regFuture;
}
步入register
方法可以看到,因为我们当前执行的线程不是eventLoop
,所以会调用eventLoop
的execute
方法将NioEventLoop
线程启动。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//略
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//因为当前执行线程不是eventLoop,所以调用execute启动eventLoop线程完成register0
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
//略
}
}
}
步入其内部就可以看到一个startThread
的调用为eventLoop
分配工作线程。
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
//因为调用execute不是eventLoop线程,所以调用startThread方法启动一个eventLoop线程
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
//略
}
它最终会通过doStartThread
方法基于executor创建一个线程,然后执行run方法,将当前创建的eventLoop线程赋值给eventLoop,并调用SingleThreadEventExecutor.this.run();
方法(注意:这里的this
就是eventLoop
)将NioEventLoop
启动,便开始进行常规的事件轮询、IO任务处理、队列任务处理等常规步骤了。
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//将NioEventLoop启动,开始进行常规的事件轮询、IO任务处理、队列任务处理
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//略
}
}
});
}
这个问题是JDK NIO epoll
模型的经典bug,尽管官方说在JDK1.6
的18
修复了这个问题,但是在JDK7
仍然存在问题,问题的原因即在没有事件且没有wakeup
时,Selector
不断进行空轮询,导致CPU100%。
而Netty解决这个问题的方式很简单:
Selector
轮询结束记录结束时间。selectCnt
达到512次(默认值为512)
,则重新创建Selector
,调用rebuildSelector
将所有的channel
注册到新的selector
上对应核心代码在NioEventLoop
的select
方法上。
//当事件轮询次数超过SELECTOR_AUTO_REBUILD_THRESHOLD(默认值为512),调用rebuildSelector完成
else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//略
//将所有的channel注册到新的selector上
rebuildSelector();
selector = this.selector;
// 进行非阻塞式select和重置计数selectCnt
selector.selectNow();
selectCnt = 1;
break;
}
步入rebuildSelector
可以看到对应重新注册的源码,核心步骤为:
Selector
。Selector
中取消。Selector
上,并激活。NioEventLoop
的selector
设置为新的selector
。public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
//创建一个新的Selector
newSelector = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
//遍历旧的Selector中注册的事件
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
//将原有事件从旧Selector中取消并注册到新的Selector上
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
//如果a是channel则将其和新的Selector上的key进行绑定,确保收到相关事件时能够及时通知到这个channel
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
//略
}
}
} catch (ConcurrentModificationException e) {
//略
}
//将NioEventLoop的selector 设置为新的selector
selector = newSelector;
//略
}
首先是保证任务的由统一的线程管理和以及任务线程安全的添加,它的执行步骤大致为:
Netty
在提交任务时会判断提交这个任务的线程是否是EventLoop
线程,如果不是EventLoop
则提到到MpscChunkedArrayQueue
这个队列中,然后启动当前EventLoop
执行。反之进入步骤2。MpscChunkedArrayQueue
队列中,并把任务wakeup
。EventLoop
线程中的thread
线程,会串行的从队列中取出任务执行,由此完成异步串行无锁化。我们可以从SingleThreadEventExecutor
的execute
看到,对于任务的管理都会调用一个addTask
完成任务的同理管理,我们不妨步入查看。
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
而addTask
内部则是直接调用offerTask
将任务提交到MpscChunkedArrayQueue
中,MpscChunkedArrayQueue
从名字即可看出是个多生产者单消费者的队列(Mp(producer)s(single)c(consumer))
。
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
从源码中可以看出MpscChunkedArrayQueue
的添加操作都是基于CAS
的,多个生产者可以同时将元素添加到队列中。每个生产者使用自旋(spin)
方式尝试获取可用的位置,并将元素放入合适的位置。所以可以保证多线程添加时的线程安全:
public boolean offer(E e) {
if (null == e) {
throw new NullPointerException();
} else {
while(true) {
while(true) {
//获得限制生产者线程的写入速率lvProducerLimit,以及当前生产者写入元素的索引位置lvProducerIndex
long offset = this.lvProducerLimit();
long pIndex = this.lvProducerIndex();
//略
//通过CAS操作更新下次存放元素的索引位置,如果成功则基于pIndex获得实际内存偏移量将元素存到数组中
if (this.casProducerIndex(pIndex, pIndex + 2L)) {
offset = modifiedCalcElementOffset(pIndex, mask);
UnsafeRefArrayAccess.soElement(buffer, offset, e);
return true;
}
}
}
}
}
}
于是eventLoop
的中唯一的线程就不用依靠上锁的方式直接到MpscChunkedArrayQueue
队列中取任务,完成任务的执行。
Netty底层源码解析-NioEventLoop原理分析:https://github.com/coderbruis/JavaSourceCodeLearning/blob/master/note/Netty/Netty底层源码解析-NioEventLoop原理分析.md
netty4核心源码分析第四篇一NioEventLoopGroup创建详解:https://blog.51cto.com/u_11108174/5952318
Netty源码分析 (一)----- NioEventLoopGroup:https://www.cnblogs.com/java-chen-hao/p/11453562.html
一文秒懂 Java 守护线程 ( Daemon Thread ):https://www.twle.cn/c/yufei/javatm/javatm-basic-daemon-thread.html
Netty源码------NioEventLoop源码详解:https://blog.csdn.net/qqq3117004957/article/details/106458358
Netty中的异步串行无锁化:https://zhuanlan.zhihu.com/p/91097888