正式进入Netty核心组件:Reactor线程模型
看完本文应该明白这些问题:
1.为什么channel的操作是线程安全的?
2.NioEventLoop都干了啥?
3.如何解决jdk nio的空轮询的bug?
主流程
在开始之前先贴张图
关于NioEventLoop,EventLoopGroup,Selector之间的关系如图1所示。
EventLoopGroup:NioEventLoop:Thread:Selector = 1:n:n:n
下面开始分析源码,还记得在上篇文章提到过,当服务端执行channel的注册时会执行这行代码channel.eventLoop().execute(xxx)
,而这行代码是NioEventLoop的入口。
1 | //SingleThreadEventExecutor.class |
这里先判断当前线程是否在EventLoop绑定的线程中(就是简单地进行Thread的比对),在服务端刚启动的时候代码会执行到else部分,我们看下startThread方法
1 | //SingleThreadEventExecutor.class |
这里也是Netty高性能之无锁化的一个体现,通过CAS保证即使在多线程竞争的时候也是线程安全。
1 | //SingleThreadEventExecutor.class |
这里可以看到线程(我们称为Reactor线程)是由run方法创建的,这里就是简单地将当前线程设置到SingleThreadEventExecutor中,值得注意的是这个当前线程指的是由线程池调度分配给执行run方法的线程。继续跟进run方法,这时候到了一个及其核心的地方。
1 |
|
It’s time to show the 2rd picture.
如图2所示,reactor线程一直在循环做三件事情
- select
- process selected keys
- run tasks
select
1 | select(wakenUp.getAndSet(false)); |
在进行select阻塞操作之前,先将wakenUp置为false。下面那段代码Netty源码注解写的天花乱坠,但我还是没理解。。。
这里留个问题吧,会有多个线程调用selector.select()操作吗?如果有这种场景就能理解了。
接着去看看select方法,这个方法很长,这里逐步去分析
1 | private void select(boolean oldWakenUp) throws IOException { |
这段代码,还没进入真正的select操作,首先记录了当前时间以及定时任务中到期时间最近任务的执行时间,接着代码进入了无限循环中,首先判断如果当前时间已经过了最近任务执行时间0.5ms以上,如果这时候还没执行过select or selectNow操作这时候执行一次立刻返回的selectNow操作,之后退出无限循环。
1 | if (hasTasks() && wakenUp.compareAndSet(false, true)) { |
hasTask方法判断TaskQueue是否为空,如果不为空且CAS操作成功,那么立即执行一次selectNow并跳出无限循环。
如果不满足上述条件,代码来到了selector.select(timeout)方法。这是jdk的nio中的方法,带有超时时间,而且当其他线程调用selector.wakeUp时候会唤醒阻塞。执行完select操作后会对selectCnt+1。
自问自答
这里之前有些疑惑,为何要+0.5ms ?
后来想了下,如果没有这个0.5ms,那么select操作在没有定时任务的情况,可能会立刻停止阻塞甚至没有捕获任何事件。
再考虑另外一个极端情况,定时任务的delayNanos(currentTimeNanos)相当大,那么select操作岂不是要阻塞很久,那整个netty可能都玩不转了?
这里看了下源码,当select操作阻塞的时候是可以通过wakeup操作唤醒的,而添加任务的代码如下,这里是会唤醒select操作的
1 | if (!addTaskWakesUp && wakesUpForTask(task)) { |
1 | if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { |
这里注解写的很清晰,当轮询到IO事件(select something)、被用户所唤醒、旧的wakenUp为true、任务队列有任务、定时任务要被执行,这时候要执行break操作,退出无限循环。
1 | long time = System.nanoTime(); |
这段代码就是专门为了解决jdk nio bug而写的。简单查了下这个bug,就是说当selector.select阻塞的时候,即使没有轮询到io事件也没有其他线程调用wakeup也会退出阻塞,将会无法退出循环导致cpu使用率一直是100%。
Ps:啥事儿没发生还不阻塞让cpu空转 不断执行无意义的循环 异常途径提前结束阻塞达到一定次数视为触发了bug
代码最开始就是检测上次的select操作是否阻塞了timeoutMills这么长,如果是那就能够说明没触发那个bug,所以将selectCnt置为1。相反阻塞时间不超过timeoutMills的次数达到了SELECTOR_AUTO_REBUILD_THRESHOLD次,那么这时候视为触发了bug,解决的方法很简单就是rebuild一个selector,将attachment(也就是channel),interestOps一起转移到新selector上。
简单总结下select操作
1)先去判断是否有快到期的定时任务,如果有那么selectNow并break
2)判断taskQueue中是否有任务,如果有那么selectNow()并break
3)执行selector.select(timeout),这里timeout表示定时任务的到期时间
4)最后通过巧妙的方式绕过了jdk的bug
processSelectedKeys
1 | private void processSelectedKeys() { |
这里Netty对selectedKeys进行了优化,原生的是一个hashSet,这里将其存到了数组中,方便访问与遍历。
1 | private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { |
遍历selectedKeys,找到key和对应的channel,之后传入到processSelectedKey(k, (AbstractNioChannel) a);
中处理,最后判断是否需要重新select,如果需要那么执行selectAgain()
先来看processSelectedKey
1 | private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { |
可以看到这段代码就是根据轮询到的不同IO事件执行不同的操作。unsafe.read()
会在后面的文章中分析。
接着判断needsToSelectAgain
,对于每个NioEventLoop而言,每隔256个channel从selector上移除的时候,就标记 needsToSelectAgain 为true,执行一次『洗牌』。这里就不进行具体的分析了。
runAllTasks
1 | protected boolean runAllTasks(long timeoutNanos) { |
(1)将到期的定时任务从一个优先队列(PriorityQueue)转移到MPSC(多生产者单消费者,Thread-Safe BlockingQueue)队列中
(2)从taskQueue中获取一个task
(3)计算本次任务循环的截止时间,因为需要留时间给select操作和process操作
(4)进入循环
(5)执行任务,如果被执行的任务到了64个(nanotime的获取是一个相对昂贵的操作,所以不是每次循环都去判断),此时去判断是否已经timeout,如果是的话那么直接break
(6)如果不符合上述要求,那么继续从taskQueue中拉取任务并执行
总结,Netty中的任务机制是其线程安全的一大保证。当非reactor线程执行一些并发操作时,都会把这次操作抽象为一个任务并将任务放到reactor线程的任务队列中等待reactor线程去执行。比如:channel的write操作、定时任务的优先队列任务的添加等操作都会被封装为一次普通的task。