当前位置: 首页 > ops >正文

源码解析-时间轮[HashedWheelTimer]

源码解析-时间轮[HashedWheelTimer]

    • 一、介绍
    • 二、架构
      • 2.1 结构
      • 2.2 整体执行流程
    • 三、对比
    • 四、源码分析
      • 4.1 初始化 - 构造函数
        • 4.1.1 注意点 &小结
      • 4.2 调用task 方法
        • 4.2.1 注意点& 小结
      • 4.3 WorkerThread的运行核心逻辑.
        • 4.3.1 run 逻辑
        • 4.3.2 waitForNextTick 获取下一个deadline 逻辑
        • 4.3.3 取消task processCancelledTasks 逻辑
        • 4.3.4 transferTimeoutsToBuckets 将task 放到bucket 逻辑
        • 4.3.5 注意点& 小结
      • 4.4 HashedWheelTimeout的运行逻辑.
        • 4.4.1 cancel 取消逻辑
        • 4.4.2 remove 取消逻辑
        • 4.4.3 expire 真正的运行逻辑
      • 4.5 HashedWheelBucket的运行逻辑.
        • 4.5.1 addTimeout 添加task 到bucket
        • 4.5.2 remove bucket中移除task
        • 4.5.3 remove expireTimeouts task
    • 五、总结

一、介绍

**时间轮 **: 是一种 高效, 低内存 开销的定时器 来管理海量,短周期的定时任务,比如心跳检测, 最著名的就是 Netty 库中的 io.netty.util.HashedWheelTimer.
本文深入剖析 Netty 中 HashedWheelTimer 的实现原理,介绍其核心数据结构与工作流程,包括时间轮的设计、任务调度机制及定时任务的执行过程.
本文的 Netty-common.jar 版本是4.1.121.Final.jar, 不同版本源码有一定的区别.

二、架构

2.1 结构

如图:
HashedWheelTimer代表时间轮,是一个==环形队列,==底层使用数组实现
每个槽存储一个HashedWheelBucket. 这个bucket就是存储任务列表的容器,里面有一个的双向链表HashedWheelTimeout.
每一个HashedWheelTimeout里面有一个timer(TimerTask)代表任务
在这里插入图片描述

2.2 整体执行流程

在这里插入图片描述

三、对比

Timer: Java源码分析 – 任务调度 Timer

维度TimerScheduledThreadPoolExecutorHashedWheelTimer
实现机制单线程,基于 wait/notify 机制调度任务基于 DelayQueue(优先队列 + 堆排序),线程池消费延时任务时间轮算法(环形数组 + 槽位 + 链表),任务分桶存放,tick 推进调度
时间复杂度添加/取消任务 O(n),需要遍历或重排添加/取消任务 O(logN),堆排序维护添加/取消任务 O(1),执行时根据槽位遍历
精度毫秒级,但误差较大纳秒级(较高精度)毫秒级(取决于 tickDuration,非绝对精准)
并发能力单线程,无法并发执行多个任务多线程执行任务,支持高并发单独一个 worker 线程推进 tick,低开销,高并发场景下仍能处理海量定时任务
可扩展性不可扩展,线程模型固定可配置核心线程数,适合任务较少但要求精度高的场景wheel 大小可调,适合定时任务量大的场景
任务数上限任务多时调度效率急剧下降任务数太多时堆操作成本高,性能下降可轻松支撑数十万甚至上百万任务(低内存占用)
适用场景简单定时任务(如 demo、小工具)少量高精度定时任务(如调度器、定时统计、定时任务框架)大量定时任务(如 Netty 定时心跳、延迟队列、网络超时管理)
局限性单线程瓶颈,任务执行慢可能阻塞后续任务堆操作开销大,任务量过多性能急剧下降精度有限,不适合严格实时性任务;只适合延迟/超时类任务,不适合周期性复杂调度

一句话总结:

  • Timer → 简单轻量的单线程定时器,只适合小 demo 或低并发场景
  • ScheduledThreadPoolExecutor → 任务量不大但要求较高精度的定时任务调度(调度中心、后台任务执行)
  • HashedWheelTimer → 高并发、大量定时任务,性能优先,精度要求不高(Netty、心跳、超时检测)

四、源码分析

4.1 初始化 - 构造函数

在这里插入图片描述
构造函数比较多,我们就以参数最多的分析, 参数解析:

  • threadFactory: 用来创建一个Thread 后续运行 task. 这里可以自定义[指定优先级,是否守护线程], 如果不传默认:Executors.defaultThreadFactory()
  • tickDuration: 每一个tick(刻度) 的时间间隔, 即时间轮推进一个槽位所需要的时间. tickDuration 越小,时间精度越高,但 CPU 消耗越大.
  • unit: 时间单位
  • ticksPerWheel: 槽位数量 . 决定了时间轮一圈所能表示的最大时间范围。总范围 = tickDuration * ticksPerWheel
  • leakDetection: 是否需要check 内存泄漏
  • maxPendingTimeouts: queue 里面可以放入的最大等待任务 数量
  • taskExecutor: 任务到期执行器. 默认就是使用 workerThread 执行,可以传入线程池异步执行,workerThread 执行耗时任务时,会阻塞时间轮.
public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts, Executor taskExecutor) {**//check 参数**checkNotNull(threadFactory, "threadFactory");checkNotNull(unit, "unit");checkPositive(tickDuration, "tickDuration");checkPositive(ticksPerWheel, "ticksPerWheel");this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");// 将传入的值转化位2的幂次方并初始化(50->64)wheel = createWheel(ticksPerWheel);//mask就是掩码,用来取模%, 极大提升性能mask = wheel.length - 1;// Convert tickDuration to nanos.long duration = unit.toNanos(tickDuration);// Prevent overflow.if (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}workerThread = threadFactory.newThread(worker);leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;this.maxPendingTimeouts = maxPendingTimeouts;//这里有一个检查机制,就是单机的时候,不建议创建很多实例,其实一个就可以// 如果超过最大值,会给出告警提示if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}
4.1.1 注意点 &小结
  1. ticksPerWheel 槽位数量都是 2的幂次方, 如果我设置50, 其实时 64. 默认是512.

  2. mask = wheel.length - 1; :用来取模 %, 极大提升性能 使用场景是只能对2的幂次方取模.

 1. 普通做法: 9 % 8  = 12. & 9: 10018: 1000mask: 01119 & 7 = 1001 & 0111 = 1 
  1. check 是否创建了过多的实例,只需要一个实例即可

4.2 调用task 方法

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {//check 参数checkNotNull(task, "task");checkNotNull(unit, "unit");//pending task 数量加1long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();//check 是否达到最大值if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}//开始启动workerThreadstart();//计算deadline,便于在====**下一个**====tick时 放到对应的bucket 并且按时执行long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// Guard against overflow.if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}//将task 封装成 HashedWheelTimeout 放到队列HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}  
 public void start() {//如果没有启动workerThread,那就启动,如果启动了,忽略.switch (WORKER_STATE_UPDATER.get(this)) {case WORKER_STATE_INIT:if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}// 等==workerThread== 完全启动好,双重检查,确保完全启动好while (startTime == 0) {try {startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}}
4.2.1 注意点& 小结
  1. 每次加入一个task ,都会调用start(), 如果启动了,就跳过.
  2. 每次task 计算deadline , ==** 都是在下一个tick 运行时加入到bucket** ==,所以这里是有一定的误差的.

4.3 WorkerThread的运行核心逻辑.

4.3.1 run 逻辑
        public void run() {// 初始化startTime,这里用的是相对时间,比较精确startTime = System.nanoTime();if (startTime == 0) {// 因为相对时间,可能出现0,极小概率,我们需要确保不是0,如果是0,重置为1startTime = 1;}//通知其他阻塞的线程可以运行了startTimeInitialized.countDown();do {//等到下一个tick 的时间点,返回当前tick 的截止时间final long deadline = waitForNextTick();//一般不会小于等于0,小于等于0 说明定时器停止if (deadline > 0) {//计算当前的tick对应的槽位下标int idx = (int) (tick & mask);//移除取消了的任务processCancelledTasks();//根据idx,获取要处理的bucketHashedWheelBucket bucket =wheel[idx];//将task加到对应的bucket 里面transferTimeoutsToBuckets();// 处理当前bucket 里面满足的任务bucket.expireTimeouts(deadline);//tick加1 ,指向下一个buckettick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// 运行到这里说明 被 stop()了, 将未处理的task放到 unprocessedTimeouts里面for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}//同样将 Queue(还没有进bucket 的task) 放到unprocessedTimeouts里面for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}//再次处理取消的taskprocessCancelledTasks();}
4.3.2 waitForNextTick 获取下一个deadline 逻辑

这里就是时间轮的节拍器,确保workerThread 按照 tickDuration间隔推进
主要逻辑:

  1. 获取到下一个tick 还差多少时间
  2. 进行sleep
private long waitForNextTick() {//下一个tick应该达到的时间点(相对于startTime)long deadline = tickDuration * (tick + 1);for (;;) {//从时间轮启动到目前为止的相对时间final long currentTime = System.nanoTime() - startTime;//获取到下一个tick 还差多少时间,向上取整long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;// 判断时间是否已经超时,如果 sleepTimeMs <= 0,说明时间已经到点(或过点), 直接返回当前时间。//case1: 正常推进//case2: 如果上一个某个任务耗时特别多,超过了一个tickDuration 的时间,那就会出现超时现象,即 sleepTimeMs <= 0if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}//windows 机器特殊处理一下//See https://github.com/netty/netty/issues/356if (PlatformDependent.isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;if (sleepTimeMs == 0) {sleepTimeMs = 1;}}try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {//出现异常,返回 Long.MIN_VALUE,让外围停止if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}
4.3.3 取消task processCancelledTasks 逻辑

这里的主要逻辑就是:
从 cancel Queue 里面的队列 逐个取出来,每一个都是HashedWheelTimeout对象
调用HashedWheelTimeout的remove 方法.
直到Queue为空

        private void processCancelledTasks() {for (;;) {HashedWheelTimeout timeout = cancelledTimeouts.poll();if (timeout == null) {// all processedbreak;}try {timeout.remove();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while process a cancellation task", t);}}}}
4.3.4 transferTimeoutsToBuckets 将task 放到bucket 逻辑

主要流程:
每次tick ,只处理最多100000个task ,不然会影响性能.

private void transferTimeoutsToBuckets() {// 每次tick只处理最多100000, 不然容易卡在这里,没有时间去运行其他的for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// 全部处理完成,结束break;}//cancelled 的跳过if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {continue;}//任务从时间轮启动到到期需要经历的总 tick 数long calculated = timeout.deadline / tickDuration;//计算在第多少圈开始执行, ====这里可能会出现负数timeout.remainingRounds = (calculated - tick) / wheel.length;// 确保不会分配到已经运行过的bucket 上final long ticks = Math.max(calculated, tick);// 取模计算indexint stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];// 加到bucket 里面bucket.addTimeout(timeout);}}
4.3.5 注意点& 小结

这一章就是时间罗的核心逻辑, 涵盖了run , 加到bucket ,取消的重要逻辑.

4.4 HashedWheelTimeout的运行逻辑.

4.4.1 cancel 取消逻辑

这里的主要逻辑就是:

  • 先把状态置为CANCElLED
  • 加入cancel 队列,在下一个tick 时会清除掉.
        public boolean cancel() {// 设置状态为CANCELLEDif (!compareAndSetState(ST_INIT, ST_CANCELLED)) {return false;}//添加到队列,下一个tick 将会删除timer.cancelledTimeouts.add(this);return true;}
4.4.2 remove 取消逻辑

主要逻辑:

  • 找到所在的bucket
  • 从bucket 里面移除
  • 等待队列的数量减一
        void remove() {//获取task 所在的bucketHashedWheelBucket bucket = this.bucket;if (bucket != null) {//bucket 里面的链表结构,移除taskbucket.remove(this);}//总数减一timer.pendingTimeouts.decrementAndGet();}
4.4.3 expire 真正的运行逻辑

这里就是执行逻辑,或者用线程池执行,或者workerThread 执行

        public void expire() {if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}try {//将自己从bucket 里面移除, 在运行.remove();timer.taskExecutor.execute(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()+ " for execution.", t);}}}

4.5 HashedWheelBucket的运行逻辑.

HashedWheelBucket 是一个类似链表的 数据存储结构,便于轻松移除中间的数据.
HashedWheelTimeout 自身充当节点,因此不需要额外的对象创建

4.5.1 addTimeout 添加task 到bucket
        public void addTimeout(HashedWheelTimeout timeout) {assert timeout.bucket == null;// 将timeout 里面的bucket 设置当前bucket,后续知道自己属于哪个buckettimeout.bucket = this;// 如果表头为null, 就将表头,表尾都指向timeout, 这是添加第一个节点时的特殊情况if (head == null) {head = tail = timeout;} else {// 表尾指向timeout, timeout的前置指针指向tail,同时tail 下移一位tail.next = timeout;timeout.prev = tail;tail = timeout;}}
4.5.2 remove bucket中移除task
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {HashedWheelTimeout next = timeout.next;// 如果前面的节点不为null, 将前面的节点指向下一个节点if (timeout.prev != null) {timeout.prev.next = next;}// 如果后面的节点不为null, 将后面的节点的前置节点指向前一个if (timeout.next != null) {timeout.next.prev = timeout.prev;}if (timeout == head) {// 如果timeout 既是头节点,也是tail 节点,说明只有一个节点,需要置空if (timeout == tail) {tail = null;head = null;} else {// 否者头节点下移一位head = next;}} else if (timeout == tail) {// 尾节点前移一位tail = timeout.prev;}// 清理引用,便于GCtimeout.prev = null;timeout.next = null;timeout.bucket = null;return next;}
4.5.3 remove expireTimeouts task
public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;// 从表头开始处理所有的 taskwhile (timeout != null) {HashedWheelTimeout next = timeout.next;== // remainingRounds <=0 表示已经过期了,或者到了最后一圈==if (timeout.remainingRounds <= 0) {== // 如果任务的绝对到期时间 <= 当前 tick 的 deadline,说明可以执行了==if (timeout.deadline <= deadline) {timeout.expire();} else {//正常情况下应该不会发生throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}// 任务没有被取消,圈数减一} else if (!timeout.isCancelled()) {timeout.remainingRounds --;}继续判断下一个tasktimeout = next;}}

五、总结

HashedWheelTimer 源码清晰简单,特定场景下,是一个比较好用的工具.

http://www.xdnf.cn/news/19204.html

相关文章:

  • 项目管理方法如何选择
  • Python实现京东商品数据自动化采集的实用指南
  • 水库/油箱/化工罐区...无线液位控制系统如何实现远程监控?
  • C++ constexpr:编译时计算的高效秘籍
  • 动态规划--Day05--最大子数组和--53. 最大子数组和,2606. 找到最大开销的子字符串,1749. 任意子数组和的绝对值的最大值
  • 音视频学习(六十):H264中的PPS
  • 基于Kubernetes Operator的自动化运维平台设计与实践
  • Ethan开发者创新项目日报 | 2025-08-30
  • OpenGeode 综合介绍(基于 GitHub 仓库)
  • pikachu之XSS
  • JavaWeb前端06(ElementPlus快速构建网页)
  • JavaScript 一些进阶知识点与注意事项
  • Python可视化与交互-matplotlib库
  • 仓颉编程语言青少年基础教程:程序基本结构和语言特点
  • 【leetcode】112. 路径总和
  • # `std::basic_istream`总结
  • 基于 MyBatis-Plus 拦截器实现“结账后禁止修改”的优雅方案
  • 数据库的CURD
  • 【C++】红黑树(详解)
  • 【点云工具】CloudCompare学习记录,自用分享
  • Java对接Redis全攻略:Jedis/SpringData/Redisson三剑客对决
  • 机器人控制器开发(底层模块)——rk3588s 的 CAN 配置
  • CSS学习与心得分享
  • 码农特供版《消费者权益保护法》逆向工程指北——附源码级注释与异常处理方案
  • 轻量化模型-知识蒸馏1
  • Carrier Aggregation Enabled MIMO-OFDM Integrated Sensing and Communication
  • Spring Cache实现简化缓存功能开发
  • 内网穿透系列十二:一款基于 HTTP 传输和 SSH 加密保护的内网穿透工具 Chisel ,具备抗干扰、稳定、安全特性
  • 聊一聊 .NET 的 AssemblyLoadContext 可插拔程序集
  • HarmonyOS AppStorage:跨组件状态管理的高效解决方案