【中间件】brpc_基础_TimerThread
文章目录
- TimerThread
- 1 简介
- 2 主要设计点
- 2.1 数据结构:分层时间轮(Hierarchical Timing Wheel)
- 2.2 线程模型
- 2.3 任务管理
- 3 关键代码分析
- 3.1 类定义(`timer_thread.h`)
- 3.2 时间轮初始化(`timer_thread.cpp`)
- 3.3 任务调度(`timer_thread.cpp`)
- 3.4 线程主循环(`timer_thread.cpp`)
- 4 性能优化
- 5 关键参数与配置
- 6 应用场景
- 7 总结
TimerThread
1 简介
BRPC 中用于 高效管理定时任务 的组件,负责调度延时任务(如 RPC 超时处理、周期性任务)。
设计目标:支持高并发、低延迟的定时任务管理,
适用场景:大规模分布式系统中的超时控制和任务调度。
2 主要设计点
2.1 数据结构:分层时间轮(Hierarchical Timing Wheel)
- 时间轮层级:BRPC 使用 多层时间轮(例如秒级、毫秒级),每层时间轮覆盖不同的时间精度和范围,减少任务迁移频率。
- 时间槽(Bucket):每个时间轮分为多个槽位,任务按到期时间分布到对应槽位中。
- 任务迁移:高层时间轮的任务在到期后迁移到低层时间轮,逐步细化调度精度。
2.2 线程模型
- 独立线程:
TimerThread
运行在单独的线程中,通过bthread
异步唤醒 或usleep
轮询 检查任务到期。 - 低延迟唤醒:使用
butex
或epoll
实现高效休眠唤醒机制,减少 CPU 空转。
2.3 任务管理
- 任务结构:
struct TimerTask {int64_t expiration_ms; // 到期时间(绝对时间戳)void (*fn)(void*); // 回调函数void* arg; // 回调参数TimerTask* next; // 链表指针 };
- 任务插入:根据到期时间计算所属时间轮层级和槽位,插入对应链表。
- 任务执行:到期槽位的任务链表被批量取出并执行回调。
3 关键代码分析
3.1 类定义(timer_thread.h
)
class TimerThread {
public:TimerThread();~TimerThread();// 启动定时器线程int start(const TimerThreadOptions* options);// 提交定时任务(绝对时间 expiration_ms)void schedule(void (*fn)(void*), void* arg, int64_t expiration_ms);// 停止线程void stop();private:// 时间轮层级定义struct Wheel {int64_t tick_ms; // 本层时间轮每个槽位的时间跨度int num_slots; // 槽位数量TimerTask** buckets; // 槽位任务链表int64_t current_tick; // 当前指向的槽位};std::vector<Wheel> _wheels;// 线程控制bthread_t _tid;std::atomic<bool> _stop;// 任务队列同步butil::Mutex _mutex;butil::ConditionVariable _cond;
};
3.2 时间轮初始化(timer_thread.cpp
)
int TimerThread::start(const TimerThreadOptions* options) {// 初始化多层时间轮(例如:毫秒级、秒级)_wheels.emplace_back(Wheel{1, 64}); // 1ms per tick, 64 slots (64ms range)_wheels.emplace_back(Wheel{64, 64}); // 64ms per tick, 64 slots (~4s range)_wheels.emplace_back(Wheel{4096, 64}); // 4096ms per tick, 64 slots (~4min range)// 启动线程return bthread_start_background(&_tid, nullptr, TimerThread::run, this);
}
3.3 任务调度(timer_thread.cpp
)
void TimerThread::schedule(void (*fn)(void*), void* arg, int64_t expiration_ms) {TimerTask* task = new TimerTask{expiration_ms, fn, arg, nullptr};butil::MutexLockGuard lock(_mutex);// 计算任务所属的时间轮层级和槽位int level = find_wheel_level(expiration_ms);int slot = calculate_slot(expiration_ms, level);// 插入对应槽位链表task->next = _wheels[level].buckets[slot];_wheels[level].buckets[slot] = task;
}
3.4 线程主循环(timer_thread.cpp
)
void* TimerThread::run(void* arg) {TimerThread* t = static_cast<TimerThread*>(arg);while (!t->_stop.load(std::memory_order_relaxed)) {int64_t now_ms = butil::gettimeofday_ms();// 处理所有到期任务for (auto& wheel : t->_wheels) {int64_t current_tick = now_ms / wheel.tick_ms;int slot = current_tick % wheel.num_slots;TimerTask* head = wheel.buckets[slot];while (head != nullptr) {TimerTask* next = head->next;if (head->expiration_ms <= now_ms) {head->fn(head->arg); // 执行回调delete head;} else {// 重新插入更精确的时间轮t->reschedule(head);}head = next;}wheel.buckets[slot] = nullptr;}// 休眠至下一个检查点usleep(next_check_interval_ms * 1000);}return nullptr;
}
4 性能优化
- 分层时间轮:减少任务迁移次数,提升插入和删除效率。
- 批量处理:每次处理一个槽位的所有任务,减少锁竞争。
- 惰性删除:任务未到期时重新插入更合适的时间轮,避免重复计算。
- 无锁任务队列:使用
bthread
的原子操作减少同步开销。
5 关键参数与配置
- 时间轮层级:通过
TimerThreadOptions
可配置层级数量和每层参数。 - 检查间隔:动态计算下一个最近任务的到期时间,优化休眠时长。
- 线程优先级:通过
bthread_attr_t
设置定时器线程的调度优先级。
6 应用场景
- RPC 超时控制:为每个 RPC 请求设置超时任务,超时后取消请求。
- 心跳检测:周期性发送心跳包,维护连接活性。
- 缓存刷新:定时刷新本地缓存,保证数据一致性。
7 总结
TimerThread
通过分层时间轮和高效线程调度机制,实现了高吞吐、低延迟的定时任务管理。其核心优势包括:
- 高效任务插入:平均时间复杂度 O(1)。
- 低调度开销:通过分层减少任务迁移。
- 线程安全:结合
bthread
的原子操作和锁机制。
改进方向:
- 支持高精度定时:如微秒级定时(需调整时间轮层级)。
- 任务取消接口:允许动态取消已提交的任务。
- 资源限制:防止恶意任务耗尽内存。