源码解析-时间轮[HashedWheelTimer]
源码解析-时间轮[HashedWheelTimer]
- 一、介绍
- 二、架构
- 2.1 结构
- 2.2 整体执行流程
- 三、对比
- 四、源码分析
- 4.1 初始化 - 构造函数
- 4.1.1 注意点 &小结
- 4.2 调用task 方法
- 4.2.1 注意点& 小结
- 4.3 WorkerThread的运行核心逻辑.
- 4.3.1 run 逻辑
- 4.3.2 waitForNextTick 获取下一个deadline 逻辑
- 4.3.3 取消task processCancelledTasks 逻辑
- 4.3.4 transferTimeoutsToBuckets 将task 放到bucket 逻辑
- 4.3.5 注意点& 小结
- 4.4 HashedWheelTimeout的运行逻辑.
- 4.4.1 cancel 取消逻辑
- 4.4.2 remove 取消逻辑
- 4.4.3 expire 真正的运行逻辑
- 4.5 HashedWheelBucket的运行逻辑.
- 4.5.1 addTimeout 添加task 到bucket
- 4.5.2 remove bucket中移除task
- 4.5.3 remove expireTimeouts task
- 五、总结
一、介绍
**时间轮 **: 是一种 高效, 低内存 开销的定时器 来管理海量,短周期的定时任务,比如心跳检测, 最著名的就是 Netty 库中的 io.netty.util.HashedWheelTimer.
本文深入剖析 Netty 中 HashedWheelTimer 的实现原理,介绍其核心数据结构与工作流程,包括时间轮的设计、任务调度机制及定时任务的执行过程.
本文的 Netty-common.jar 版本是4.1.121.Final.jar, 不同版本源码有一定的区别.
二、架构
2.1 结构
如图:
HashedWheelTimer代表时间轮,是一个==环形队列,==底层使用数组实现
每个槽存储一个HashedWheelBucket. 这个bucket就是存储任务列表的容器,里面有一个的双向链表HashedWheelTimeout.
每一个HashedWheelTimeout里面有一个timer(TimerTask)代表任务
2.2 整体执行流程
三、对比
Timer: Java源码分析 – 任务调度 Timer
维度 | Timer | ScheduledThreadPoolExecutor | HashedWheelTimer |
---|---|---|---|
实现机制 | 单线程,基于 wait/notify 机制调度任务 | 基于 DelayQueue(优先队列 + 堆排序),线程池消费延时任务 | 时间轮算法(环形数组 + 槽位 + 链表),任务分桶存放,tick 推进调度 |
时间复杂度 | 添加/取消任务 O(n),需要遍历或重排 | 添加/取消任务 O(logN),堆排序维护 | 添加/取消任务 O(1),执行时根据槽位遍历 |
精度 | 毫秒级,但误差较大 | 纳秒级(较高精度) | 毫秒级(取决于 tickDuration,非绝对精准) |
并发能力 | 单线程,无法并发执行多个任务 | 多线程执行任务,支持高并发 | 单独一个 worker 线程推进 tick,低开销,高并发场景下仍能处理海量定时任务 |
可扩展性 | 不可扩展,线程模型固定 | 可配置核心线程数,适合任务较少但要求精度高的场景 | wheel 大小可调,适合定时任务量大的场景 |
任务数上限 | 任务多时调度效率急剧下降 | 任务数太多时堆操作成本高,性能下降 | 可轻松支撑数十万甚至上百万任务(低内存占用) |
适用场景 | 简单定时任务(如 demo、小工具) | 少量高精度定时任务(如调度器、定时统计、定时任务框架) | 大量定时任务(如 Netty 定时心跳、延迟队列、网络超时管理) |
局限性 | 单线程瓶颈,任务执行慢可能阻塞后续任务 | 堆操作开销大,任务量过多性能急剧下降 | 精度有限,不适合严格实时性任务;只适合延迟/超时类任务,不适合周期性复杂调度 |
一句话总结:
- Timer → 简单轻量的单线程定时器,只适合小 demo 或低并发场景
- ScheduledThreadPoolExecutor → 任务量不大但要求较高精度的定时任务调度(调度中心、后台任务执行)
- HashedWheelTimer → 高并发、大量定时任务,性能优先,精度要求不高(Netty、心跳、超时检测)
四、源码分析
4.1 初始化 - 构造函数
构造函数比较多,我们就以参数最多的分析, 参数解析:
- threadFactory: 用来创建一个Thread 后续运行 task. 这里可以自定义[指定优先级,是否守护线程], 如果不传默认:Executors.defaultThreadFactory()
- tickDuration: 每一个tick(刻度) 的时间间隔, 即时间轮推进一个槽位所需要的时间. tickDuration 越小,时间精度越高,但 CPU 消耗越大.
- unit: 时间单位
- ticksPerWheel: 槽位数量 . 决定了时间轮一圈所能表示的最大时间范围。总范围 =
tickDuration * ticksPerWheel
- leakDetection: 是否需要check 内存泄漏
- maxPendingTimeouts: queue 里面可以放入的最大等待任务 数量
- taskExecutor: 任务到期执行器. 默认就是使用 workerThread 执行,可以传入线程池异步执行,workerThread 执行耗时任务时,会阻塞时间轮.
public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts, Executor taskExecutor) {**//check 参数**checkNotNull(threadFactory, "threadFactory");checkNotNull(unit, "unit");checkPositive(tickDuration, "tickDuration");checkPositive(ticksPerWheel, "ticksPerWheel");this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");// 将传入的值转化位2的幂次方并初始化(50->64)wheel = createWheel(ticksPerWheel);//mask就是掩码,用来取模%, 极大提升性能mask = wheel.length - 1;// Convert tickDuration to nanos.long duration = unit.toNanos(tickDuration);// Prevent overflow.if (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}workerThread = threadFactory.newThread(worker);leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;this.maxPendingTimeouts = maxPendingTimeouts;//这里有一个检查机制,就是单机的时候,不建议创建很多实例,其实一个就可以// 如果超过最大值,会给出告警提示if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}
4.1.1 注意点 &小结
-
ticksPerWheel 槽位数量都是 2的幂次方, 如果我设置50, 其实时 64. 默认是512.
-
mask = wheel.length - 1; :
用来取模 %, 极大提升性能
使用场景是只能对2的幂次方取模.
1. 普通做法: 9 % 8 = 12. & 9: 10018: 1000mask: 01119 & 7 = 1001 & 0111 = 1
- check 是否创建了过多的实例,只需要一个实例即可
4.2 调用task 方法
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {//check 参数checkNotNull(task, "task");checkNotNull(unit, "unit");//pending task 数量加1long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();//check 是否达到最大值if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}//开始启动workerThreadstart();//计算deadline,便于在====**下一个**====tick时 放到对应的bucket 并且按时执行long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// Guard against overflow.if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}//将task 封装成 HashedWheelTimeout 放到队列HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}
public void start() {//如果没有启动workerThread,那就启动,如果启动了,忽略.switch (WORKER_STATE_UPDATER.get(this)) {case WORKER_STATE_INIT:if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}// 等==workerThread== 完全启动好,双重检查,确保完全启动好while (startTime == 0) {try {startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}}
4.2.1 注意点& 小结
- 每次加入一个task ,都会调用start(), 如果启动了,就跳过.
- 每次task 计算deadline , ==** 都是在下一个tick 运行时加入到bucket** ==,所以这里是有一定的误差的.
4.3 WorkerThread的运行核心逻辑.
4.3.1 run 逻辑
public void run() {// 初始化startTime,这里用的是相对时间,比较精确startTime = System.nanoTime();if (startTime == 0) {// 因为相对时间,可能出现0,极小概率,我们需要确保不是0,如果是0,重置为1startTime = 1;}//通知其他阻塞的线程可以运行了startTimeInitialized.countDown();do {//等到下一个tick 的时间点,返回当前tick 的截止时间final long deadline = waitForNextTick();//一般不会小于等于0,小于等于0 说明定时器停止if (deadline > 0) {//计算当前的tick对应的槽位下标int idx = (int) (tick & mask);//移除取消了的任务processCancelledTasks();//根据idx,获取要处理的bucketHashedWheelBucket bucket =wheel[idx];//将task加到对应的bucket 里面transferTimeoutsToBuckets();// 处理当前bucket 里面满足的任务bucket.expireTimeouts(deadline);//tick加1 ,指向下一个buckettick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// 运行到这里说明 被 stop()了, 将未处理的task放到 unprocessedTimeouts里面for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}//同样将 Queue(还没有进bucket 的task) 放到unprocessedTimeouts里面for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}//再次处理取消的taskprocessCancelledTasks();}
4.3.2 waitForNextTick 获取下一个deadline 逻辑
这里就是时间轮的节拍器,确保workerThread 按照 tickDuration间隔推进
主要逻辑:
- 获取到下一个tick 还差多少时间
- 进行sleep
private long waitForNextTick() {//下一个tick应该达到的时间点(相对于startTime)long deadline = tickDuration * (tick + 1);for (;;) {//从时间轮启动到目前为止的相对时间final long currentTime = System.nanoTime() - startTime;//获取到下一个tick 还差多少时间,向上取整long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;// 判断时间是否已经超时,如果 sleepTimeMs <= 0,说明时间已经到点(或过点), 直接返回当前时间。//case1: 正常推进//case2: 如果上一个某个任务耗时特别多,超过了一个tickDuration 的时间,那就会出现超时现象,即 sleepTimeMs <= 0if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}//windows 机器特殊处理一下//See https://github.com/netty/netty/issues/356if (PlatformDependent.isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;if (sleepTimeMs == 0) {sleepTimeMs = 1;}}try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {//出现异常,返回 Long.MIN_VALUE,让外围停止if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}
4.3.3 取消task processCancelledTasks 逻辑
这里的主要逻辑就是:
从 cancel Queue 里面的队列 逐个取出来,每一个都是HashedWheelTimeout对象
调用HashedWheelTimeout的remove 方法.
直到Queue为空
private void processCancelledTasks() {for (;;) {HashedWheelTimeout timeout = cancelledTimeouts.poll();if (timeout == null) {// all processedbreak;}try {timeout.remove();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while process a cancellation task", t);}}}}
4.3.4 transferTimeoutsToBuckets 将task 放到bucket 逻辑
主要流程:
每次tick ,只处理最多100000个task ,不然会影响性能.
private void transferTimeoutsToBuckets() {// 每次tick只处理最多100000, 不然容易卡在这里,没有时间去运行其他的for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// 全部处理完成,结束break;}//cancelled 的跳过if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {continue;}//任务从时间轮启动到到期需要经历的总 tick 数long calculated = timeout.deadline / tickDuration;//计算在第多少圈开始执行, ====这里可能会出现负数timeout.remainingRounds = (calculated - tick) / wheel.length;// 确保不会分配到已经运行过的bucket 上final long ticks = Math.max(calculated, tick);// 取模计算indexint stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];// 加到bucket 里面bucket.addTimeout(timeout);}}
4.3.5 注意点& 小结
这一章就是时间罗的核心逻辑, 涵盖了run , 加到bucket ,取消的重要逻辑.
4.4 HashedWheelTimeout的运行逻辑.
4.4.1 cancel 取消逻辑
这里的主要逻辑就是:
- 先把状态置为CANCElLED
- 加入cancel 队列,在下一个tick 时会清除掉.
public boolean cancel() {// 设置状态为CANCELLEDif (!compareAndSetState(ST_INIT, ST_CANCELLED)) {return false;}//添加到队列,下一个tick 将会删除timer.cancelledTimeouts.add(this);return true;}
4.4.2 remove 取消逻辑
主要逻辑:
- 找到所在的bucket
- 从bucket 里面移除
- 等待队列的数量减一
void remove() {//获取task 所在的bucketHashedWheelBucket bucket = this.bucket;if (bucket != null) {//bucket 里面的链表结构,移除taskbucket.remove(this);}//总数减一timer.pendingTimeouts.decrementAndGet();}
4.4.3 expire 真正的运行逻辑
这里就是执行逻辑,或者用线程池执行,或者workerThread 执行
public void expire() {if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}try {//将自己从bucket 里面移除, 在运行.remove();timer.taskExecutor.execute(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()+ " for execution.", t);}}}
4.5 HashedWheelBucket的运行逻辑.
HashedWheelBucket 是一个类似链表的 数据存储结构,便于轻松移除中间的数据.
HashedWheelTimeout 自身充当节点,因此不需要额外的对象创建
4.5.1 addTimeout 添加task 到bucket
public void addTimeout(HashedWheelTimeout timeout) {assert timeout.bucket == null;// 将timeout 里面的bucket 设置当前bucket,后续知道自己属于哪个buckettimeout.bucket = this;// 如果表头为null, 就将表头,表尾都指向timeout, 这是添加第一个节点时的特殊情况if (head == null) {head = tail = timeout;} else {// 表尾指向timeout, timeout的前置指针指向tail,同时tail 下移一位tail.next = timeout;timeout.prev = tail;tail = timeout;}}
4.5.2 remove bucket中移除task
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {HashedWheelTimeout next = timeout.next;// 如果前面的节点不为null, 将前面的节点指向下一个节点if (timeout.prev != null) {timeout.prev.next = next;}// 如果后面的节点不为null, 将后面的节点的前置节点指向前一个if (timeout.next != null) {timeout.next.prev = timeout.prev;}if (timeout == head) {// 如果timeout 既是头节点,也是tail 节点,说明只有一个节点,需要置空if (timeout == tail) {tail = null;head = null;} else {// 否者头节点下移一位head = next;}} else if (timeout == tail) {// 尾节点前移一位tail = timeout.prev;}// 清理引用,便于GCtimeout.prev = null;timeout.next = null;timeout.bucket = null;return next;}
4.5.3 remove expireTimeouts task
public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;// 从表头开始处理所有的 taskwhile (timeout != null) {HashedWheelTimeout next = timeout.next;== // remainingRounds <=0 表示已经过期了,或者到了最后一圈==if (timeout.remainingRounds <= 0) {== // 如果任务的绝对到期时间 <= 当前 tick 的 deadline,说明可以执行了==if (timeout.deadline <= deadline) {timeout.expire();} else {//正常情况下应该不会发生throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}// 任务没有被取消,圈数减一} else if (!timeout.isCancelled()) {timeout.remainingRounds --;}继续判断下一个tasktimeout = next;}}
五、总结
HashedWheelTimer 源码清晰简单,特定场景下,是一个比较好用的工具.