当前位置: 首页 > web >正文

【中间件】brpc_基础_TimerThread

文章目录

  • TimerThread
    • 1 简介
    • 2 主要设计点
      • 2.1 数据结构:分层时间轮(Hierarchical Timing Wheel)
      • 2.2 线程模型
      • 2.3 任务管理
    • 3 关键代码分析
      • 3.1 类定义(`timer_thread.h`)
      • 3.2 时间轮初始化(`timer_thread.cpp`)
      • 3.3 任务调度(`timer_thread.cpp`)
      • 3.4 线程主循环(`timer_thread.cpp`)
    • 4 性能优化
    • 5 关键参数与配置
    • 6 应用场景
    • 7 总结

TimerThread

1 简介

BRPC 中用于 高效管理定时任务 的组件,负责调度延时任务(如 RPC 超时处理、周期性任务)。

设计目标:支持高并发、低延迟的定时任务管理,

适用场景:大规模分布式系统中的超时控制和任务调度。


2 主要设计点

2.1 数据结构:分层时间轮(Hierarchical Timing Wheel)

  • 时间轮层级:BRPC 使用 多层时间轮(例如秒级、毫秒级),每层时间轮覆盖不同的时间精度和范围,减少任务迁移频率。
  • 时间槽(Bucket):每个时间轮分为多个槽位,任务按到期时间分布到对应槽位中。
  • 任务迁移:高层时间轮的任务在到期后迁移到低层时间轮,逐步细化调度精度。

2.2 线程模型

  • 独立线程TimerThread 运行在单独的线程中,通过 bthread 异步唤醒usleep 轮询 检查任务到期。
  • 低延迟唤醒:使用 butexepoll 实现高效休眠唤醒机制,减少 CPU 空转。

2.3 任务管理

  • 任务结构
    struct TimerTask {int64_t expiration_ms;  // 到期时间(绝对时间戳)void (*fn)(void*);      // 回调函数void* arg;              // 回调参数TimerTask* next;        // 链表指针
    };
    
  • 任务插入:根据到期时间计算所属时间轮层级和槽位,插入对应链表。
  • 任务执行:到期槽位的任务链表被批量取出并执行回调。

3 关键代码分析

3.1 类定义(timer_thread.h

class TimerThread {
public:TimerThread();~TimerThread();// 启动定时器线程int start(const TimerThreadOptions* options);// 提交定时任务(绝对时间 expiration_ms)void schedule(void (*fn)(void*), void* arg, int64_t expiration_ms);// 停止线程void stop();private:// 时间轮层级定义struct Wheel {int64_t tick_ms;       // 本层时间轮每个槽位的时间跨度int num_slots;         // 槽位数量TimerTask** buckets;   // 槽位任务链表int64_t current_tick;  // 当前指向的槽位};std::vector<Wheel> _wheels;// 线程控制bthread_t _tid;std::atomic<bool> _stop;// 任务队列同步butil::Mutex _mutex;butil::ConditionVariable _cond;
};

3.2 时间轮初始化(timer_thread.cpp

int TimerThread::start(const TimerThreadOptions* options) {// 初始化多层时间轮(例如:毫秒级、秒级)_wheels.emplace_back(Wheel{1, 64});    // 1ms per tick, 64 slots (64ms range)_wheels.emplace_back(Wheel{64, 64});   // 64ms per tick, 64 slots (~4s range)_wheels.emplace_back(Wheel{4096, 64}); // 4096ms per tick, 64 slots (~4min range)// 启动线程return bthread_start_background(&_tid, nullptr, TimerThread::run, this);
}

3.3 任务调度(timer_thread.cpp

void TimerThread::schedule(void (*fn)(void*), void* arg, int64_t expiration_ms) {TimerTask* task = new TimerTask{expiration_ms, fn, arg, nullptr};butil::MutexLockGuard lock(_mutex);// 计算任务所属的时间轮层级和槽位int level = find_wheel_level(expiration_ms);int slot = calculate_slot(expiration_ms, level);// 插入对应槽位链表task->next = _wheels[level].buckets[slot];_wheels[level].buckets[slot] = task;
}

3.4 线程主循环(timer_thread.cpp

void* TimerThread::run(void* arg) {TimerThread* t = static_cast<TimerThread*>(arg);while (!t->_stop.load(std::memory_order_relaxed)) {int64_t now_ms = butil::gettimeofday_ms();// 处理所有到期任务for (auto& wheel : t->_wheels) {int64_t current_tick = now_ms / wheel.tick_ms;int slot = current_tick % wheel.num_slots;TimerTask* head = wheel.buckets[slot];while (head != nullptr) {TimerTask* next = head->next;if (head->expiration_ms <= now_ms) {head->fn(head->arg);  // 执行回调delete head;} else {// 重新插入更精确的时间轮t->reschedule(head);}head = next;}wheel.buckets[slot] = nullptr;}// 休眠至下一个检查点usleep(next_check_interval_ms * 1000);}return nullptr;
}

4 性能优化

  • 分层时间轮:减少任务迁移次数,提升插入和删除效率。
  • 批量处理:每次处理一个槽位的所有任务,减少锁竞争。
  • 惰性删除:任务未到期时重新插入更合适的时间轮,避免重复计算。
  • 无锁任务队列:使用 bthread 的原子操作减少同步开销。

5 关键参数与配置

  • 时间轮层级:通过 TimerThreadOptions 可配置层级数量和每层参数。
  • 检查间隔:动态计算下一个最近任务的到期时间,优化休眠时长。
  • 线程优先级:通过 bthread_attr_t 设置定时器线程的调度优先级。

6 应用场景

  1. RPC 超时控制:为每个 RPC 请求设置超时任务,超时后取消请求。
  2. 心跳检测:周期性发送心跳包,维护连接活性。
  3. 缓存刷新:定时刷新本地缓存,保证数据一致性。

7 总结

TimerThread 通过分层时间轮和高效线程调度机制,实现了高吞吐、低延迟的定时任务管理。其核心优势包括:

  • 高效任务插入:平均时间复杂度 O(1)。
  • 低调度开销:通过分层减少任务迁移。
  • 线程安全:结合 bthread 的原子操作和锁机制。

改进方向

  • 支持高精度定时:如微秒级定时(需调整时间轮层级)。
  • 任务取消接口:允许动态取消已提交的任务。
  • 资源限制:防止恶意任务耗尽内存。
http://www.xdnf.cn/news/4013.html

相关文章:

  • 五一假期作业
  • springboot单体项目的执行流程
  • LFU算法解析
  • 【PostgreSQL数据分析实战:从数据清洗到可视化全流程】4.5 清洗流程自动化(存储过程/定时任务)
  • 【中间件】brpc_基础_单例
  • FreeRTOS学习系列·二值信号量
  • Linux查询日志常用命令
  • 解锁现代健康密码:科学养生新主张
  • 基于PLC的换热器温度控制系统设计
  • 状态模式(State Pattern)
  • 电子商务商家后台运营专员模板
  • C++ 中二级指针的正确释放方法
  • 【KWDB 创作者计划】_Ubuntu 22.04系统KWDB数据库安装部署使用教程
  • Qt中的UIC
  • Amazon Bedrock Converse API:开启对话式AI新体验
  • Qt开发:容器组控件的介绍和使用
  • 20、数据可视化:魔镜报表——React 19 图表集成
  • 408考研逐题详解:2009年第8题
  • Java后端程序员学习前端之CSS
  • Python matplotlib 成功使用SimHei 中文字体
  • 详解RabbitMQ工作模式之发布订阅模式
  • 基于C++实现的深度学习(cnn/svm)分类器Demo
  • Baklib知识中台:智能服务架构新实践
  • 【算法学习】递归、搜索与回溯算法(一)
  • python函数复习(形参实参,收集参数,关键字参数)
  • uniapp中用canvas绘制简单柱形图,小容量,不用插件——简单使用canvas
  • QT 在圆的边界画出圆
  • IP属地是我的定位吗?——解析两者区别
  • Python异步编程入门:从同步到异步的思维转变
  • VBA信息获取与处理专题五:VBA利用CDO发送电子邮件