宏观概览
1 | HashedWheelTimer |
这是HashedWheelTimer的组成部分,这里有三个重要的类:
1)HashedWheelBucket,表示哈希桶,是任务链的容器;包含一些添加、移除timeout的方法
2)HashedWheelTimeout,持有task,本身是链式结构;包含一些cancel,expire方法
3)Worker,实现Runnable,是真正干活的线程,它的run方法执行了对应tick的任务,并且让哈希轮转起来
方法:
1)createWheel:初始化哈希轮,主要就是初始化HashedWheelBucket[] wheel
2)newTimeout:新建一个延时任务,该方法会调用start方法让哈希轮动起来
3)normalizeTicksPerWheel:令哈希轮数组的数为2的n次方,方便取余运算
4)start:让哈希轮开始运转
5)stop:中断worker线程并返回未处理的任务
属性:
1)long startTime:哈希轮开始运转的时间,nanotime形式
这里简单拓展一下nanoTime相关的知识,我们看到jdk给出的关于System.nanotime的注解是
1 | This method can only be used to measure elapsed time and is not related to any |
这个方法只能测量流逝的时间,和真正的时间并无对应关系,所以无法相互转换。这个返回值表示的是相对某个随意的时间点而言的纳秒值,而这个时间点还有可能是未来,所以可能返回的纳秒值是个负数。在同一个jvm中调用该方法的时间锚点(某个随意的时间点)都是同一个。
2)long tickDuration: 指针每次运转的间隔时间
3)Queue
4)HashedWheelBucket[] wheel: 哈希轮实体,是一个bucket数组
5)Runnable worker: 真正执行timeOut延时任务的线程
6)workerState: worker线程的状态
深入细节
初始化工作
从哈希轮的构造函数开始
1 | public HashedWheelTimer( |
大体的流程是,先进行参数的校验,接着创建哈希轮,再初始化thread,最后进行泄露检测相关的工作。这里重点关注createWheel方法
1 | private static HashedWheelBucket[] createWheel(int ticksPerWheel) { |
先调用了normalizeTicksPerWheel(ticksPerWheel)
这个方法很简单,就是将ticksPerWheel二次方化,比如传7那么这里将返回8;
接着对wheel进行了初始化,至此初始化工作就结束了,接着看看哈希轮如何运转的。
哈希轮的运转
哈希轮的使用套路一般是,先new一个哈希轮,接着调用下面这个方法就ok了,所以这里就是哈希轮运转的入口
1 | public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { |
这个方法先是进行了必要参数的校验,接着调用了start方法,最后new了一个timeOut并放到了Queue<HashedWheelTimeout>timeouts中。
那么为啥在这里调用start方法?
这也是netty将优化做到丧心病狂的体现,因为如果没有任何任务,哈希轮『空转』是没有任何意义的,所以至少有一个timeout,哈希轮才转起来。
我们接下来就先看看start方法
1 | public void start() { |
首先通过CAS更新了workerState的状态(无锁化的体现),接着异步调用了workerThread.run()
,我们先不去看这个方法,先看后续的流程,代码最后是一个while循环,循环结束的条件是startTime!=0,这里用这个条件表示start成功,接下来具体到worker中再看。
1 | public void run() { |
按照上面代码的顺序,逐一去看每个方法
1 | //Worker.class |
下面去看expireTimeouts
1 | public void expireTimeouts(long deadline) { |
终于到了,任务执行的地方了
1 | //HashedWheelTimeout.class |
最后再来看看,stop方法
1 | public Set<Timeout> stop() { |
至此,Netty中大名鼎鼎的哈希轮就分析完毕了,还是有很多值得借鉴的地方的~
最最后,简单看下Thread.interrupt()方法,其实这个方法是没啥用的一个方法,为啥这么说呢。
举个例子,有线程t1和线程t2,如果t1调用t2.interrupt()方法,t2线程不会立即中断并且根本无视t1线程的这次调用(甚至有点想吃黄焖鸡米饭)
只有一种情况可以让其中断,比如t2调用了sleep方法,这时候如果t1调用t2.interrupt(),t2的sleep方法会抛出异常,注意如果此时t2还未执行到sleep方法,那么当其执行到的时候亦会抛出异常。
总结
大体分为两个步骤
一、new HashWheelTimer(xx)
- createWheel,初始化数组
- threadFactory.newThread(worker),初始化workerThread
二、timer.newTimeout(xx)
- 执行start方法让哈希轮开转(只有第一个任务会执行成功,其他直接return)
- 执行start方法后,计算任务的预计执行时间
- 初始化当前哈希轮任务
- 将任务存放到timeout queue中,等待调度
话分两边,再来看看哈希轮转起来之后干了啥
- 先是初始化starttime(=哈希轮开始运转的nanotime)
- sleep until 执行下次tick移动
- 处理被cancel的任务
- 分配两次tick移动之间,放到timeout queue中的任务到指定的bucket中
- 处理当前tick指向的bucket中到期的任务
- 不断执行上述过程,直到哈希轮状态不为STARTED