【中间件】brpc之工作窃取队列
文章目录
- BRPC Work Stealing Queue
- 1 核心功能
- 2 关键数据结构
- 2.1 队列结构
- 2.2 内存布局优化
- 3 核心操作
- 3.1 本地线程操作(非线程安全)
- 3.2 窃取操作(线程安全)
- 4 设计亮点
- 4.1 无锁原子操作
- 4.2 环形缓冲区优化
- 4.3 线程角色分离
- 5 性能优化
- 6 应用场景
- 7 潜在问题与改进
- 7.1 ABA 问题
- 7.2 容量限制
- 7.3 伪共享
- 8 代码示例(简化版)
- 9 总结
BRPC Work Stealing Queue
1 核心功能
无锁工作窃取队列(Lock-Free Work Stealing Queue),用于在 BRPC 的 bthread
用户态线程库中支持高效的任务调度。
设计目标:
- 高并发性能:允许线程本地快速存取任务,减少锁竞争。
- 负载均衡:空闲线程可“窃取”其他线程队列中的任务,提升 CPU 利用率。
2 关键数据结构
2.1 队列结构
template <typename T>
class WorkStealingQueue {
private:std::atomic<size_t> _bottom; // 本地线程操作的队尾std::atomic<size_t> _top; // 其他线程窃取的队头T* _array; // 环形缓冲区size_t _capacity; // 队列容量
};
- 环形缓冲区:通过
_array
存储任务,容量固定(通常为 2 的幂次,便于位运算优化)。 - 原子指针:
_bottom
和_top
使用原子操作保证线程安全。
2.2 内存布局优化
- 缓存行对齐:
_bottom
和_top
可能被放置在不同缓存行,避免伪共享(False Sharing)。alignas(64) std::atomic<size_t> _bottom; alignas(64) std::atomic<size_t> _top;
3 核心操作
3.1 本地线程操作(非线程安全)
push(T item)
:向队尾插入任务。bool push(T item) {size_t b = _bottom.load(std::memory_order_relaxed);size_t t = _top.load(std::memory_order_acquire);if (b - t >= _capacity) return false; // 队列满_array[b % _capacity] = item;_bottom.store(b + 1, std::memory_order_release);return true; }
pop(T* item)
:从队尾取出任务(本地线程独占)。bool pop(T* item) {size_t b = _bottom.load(std::memory_order_relaxed) - 1;_bottom.store(b, std::memory_order_relaxed);std::atomic_thread_fence(std::memory_order_seq_cst);size_t t = _top.load(std::memory_order_relaxed);if (t <= b) { // 队列非空*item = _array[b % _capacity];if (t != b) return true; // 队列仍有元素// 最后一个元素,检查竞争if (_top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {_bottom.store(b + 1, std::memory_order_relaxed);return true;}_bottom.store(b + 1, std::memory_order_relaxed);return false;}_bottom.store(b + 1, std::memory_order_relaxed);return false; }
3.2 窃取操作(线程安全)
steal(T* item)
:从队头窃取任务(多线程并发安全)。bool steal(T* item) {size_t t = _top.load(std::memory_order_acquire);std::atomic_thread_fence(std::memory_order_seq_cst);size_t b = _bottom.load(std::memory_order_acquire);if (t >= b) return false; // 队列空*item = _array[t % _capacity];if (!_top.compare_exchange_strong(t, t + 1,std::memory_order_seq_cst,std::memory_order_relaxed)) {return false; // 竞争失败}return true; }
4 设计亮点
4.1 无锁原子操作
compare_exchange_strong
:用于更新_top
指针,确保窃取操作的原子性。- 内存序控制:
push
使用release
保证写入可见性。steal
使用acquire
和seq_cst
确保读取顺序。
4.2 环形缓冲区优化
- 位运算代替取模:若容量为 2 的幂次,
b % _capacity
可简化为b & (_capacity - 1)
,提升性能。 - 快速失败检查:队列满/空时直接返回,避免无效操作。
4.3 线程角色分离
- 本地线程(Owner):仅操作
_bottom
,无需原子同步(relaxed
内存序)。 - 窃取线程(Thief):操作
_top
,需严格同步(acquire
/release
)。
5 性能优化
- 低竞争场景高效:本地线程的
push
/pop
几乎无原子开销。 - 批量窃取:可扩展为一次窃取多个任务,减少竞争频率。
- 动态扩容:当前实现容量固定,实际工程中可结合
std::vector
实现动态扩容。
6 应用场景
bthread
调度器:每个 Worker 线程维护一个任务队列,空闲线程窃取任务。- 并行计算:如 Fork-Join 模型中的任务分发。
- 事件驱动框架:处理异步 I/O 任务的负载均衡。
7 潜在问题与改进
7.1 ABA 问题
- 风险:若
_top
指针被多次修改后回到原值,compare_exchange_strong
可能错误成功。 - 解决方案:使用带版本号的指针(如
(top << 16) | version
)。
7.2 容量限制
- 固定容量:队列满时需由调用方处理(如丢弃任务或动态扩容)。
- 改进方案:实现动态扩容(需原子地替换
_array
和_capacity
)。
7.3 伪共享
- 缓存行竞争:
_bottom
和_top
需对齐到不同缓存行(代码中可能已通过alignas
实现)。
8 代码示例(简化版)
template <typename T>
class WorkStealingQueue {
public:WorkStealingQueue(size_t capacity) : _bottom(0), _top(0), _capacity(next_pow2(capacity)) {_array = new T[_capacity];}bool push(T item) {size_t b = _bottom.load(std::memory_order_relaxed);if (b - _top.load(std::memory_order_acquire) >= _capacity) return false;_array[b & (_capacity - 1)] = item;_bottom.store(b + 1, std::memory_order_release);return true;}bool steal(T* item) {size_t t = _top.load(std::memory_order_acquire);size_t b = _bottom.load(std::memory_order_acquire);if (t >= b) return false;*item = _array[t & (_capacity - 1)];return _top.compare_exchange_strong(t, t + 1,std::memory_order_seq_cst,std::memory_order_relaxed);}
};
9 总结
work_stealing_queue.h
是 BRPC 高性能任务调度的核心组件,通过无锁环形缓冲区和原子操作实现高效的任务存取与窃取。其设计充分优化了本地线程操作的性能,同时通过精细的内存序控制保证窃取操作的线程安全。适用于高并发场景下的负载均衡,但需注意容量限制和 ABA 问题的潜在风险。