当前位置: 首页 > web >正文

【中间件】brpc之工作窃取队列

文章目录

  • BRPC Work Stealing Queue
    • 1 核心功能
    • 2 关键数据结构
      • 2.1 队列结构
      • 2.2 内存布局优化
    • 3 核心操作
      • 3.1 本地线程操作(非线程安全)
      • 3.2 窃取操作(线程安全)
    • 4 设计亮点
      • 4.1 无锁原子操作
      • 4.2 环形缓冲区优化
      • 4.3 线程角色分离
    • 5 性能优化
    • 6 应用场景
    • 7 潜在问题与改进
      • 7.1 ABA 问题
      • 7.2 容量限制
      • 7.3 伪共享
    • 8 代码示例(简化版)
    • 9 总结

BRPC Work Stealing Queue

1 核心功能

无锁工作窃取队列(Lock-Free Work Stealing Queue),用于在 BRPC 的 bthread 用户态线程库中支持高效的任务调度。

设计目标:

  • 高并发性能:允许线程本地快速存取任务,减少锁竞争。
  • 负载均衡:空闲线程可“窃取”其他线程队列中的任务,提升 CPU 利用率。

2 关键数据结构

2.1 队列结构

template <typename T>
class WorkStealingQueue {
private:std::atomic<size_t> _bottom;  // 本地线程操作的队尾std::atomic<size_t> _top;     // 其他线程窃取的队头T* _array;                    // 环形缓冲区size_t _capacity;             // 队列容量
};
  • 环形缓冲区:通过 _array 存储任务,容量固定(通常为 2 的幂次,便于位运算优化)。
  • 原子指针_bottom_top 使用原子操作保证线程安全。

2.2 内存布局优化

  • 缓存行对齐_bottom_top 可能被放置在不同缓存行,避免伪共享(False Sharing)。
    alignas(64) std::atomic<size_t> _bottom;
    alignas(64) std::atomic<size_t> _top;
    

3 核心操作

3.1 本地线程操作(非线程安全)

  • push(T item):向队尾插入任务。
    bool push(T item) {size_t b = _bottom.load(std::memory_order_relaxed);size_t t = _top.load(std::memory_order_acquire);if (b - t >= _capacity) return false; // 队列满_array[b % _capacity] = item;_bottom.store(b + 1, std::memory_order_release);return true;
    }
    
  • pop(T* item):从队尾取出任务(本地线程独占)。
    bool pop(T* item) {size_t b = _bottom.load(std::memory_order_relaxed) - 1;_bottom.store(b, std::memory_order_relaxed);std::atomic_thread_fence(std::memory_order_seq_cst);size_t t = _top.load(std::memory_order_relaxed);if (t <= b) { // 队列非空*item = _array[b % _capacity];if (t != b) return true; // 队列仍有元素// 最后一个元素,检查竞争if (_top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {_bottom.store(b + 1, std::memory_order_relaxed);return true;}_bottom.store(b + 1, std::memory_order_relaxed);return false;}_bottom.store(b + 1, std::memory_order_relaxed);return false;
    }
    

3.2 窃取操作(线程安全)

  • steal(T* item):从队头窃取任务(多线程并发安全)。
    bool steal(T* item) {size_t t = _top.load(std::memory_order_acquire);std::atomic_thread_fence(std::memory_order_seq_cst);size_t b = _bottom.load(std::memory_order_acquire);if (t >= b) return false; // 队列空*item = _array[t % _capacity];if (!_top.compare_exchange_strong(t, t + 1,std::memory_order_seq_cst,std::memory_order_relaxed)) {return false; // 竞争失败}return true;
    }
    

4 设计亮点

4.1 无锁原子操作

  • compare_exchange_strong:用于更新 _top 指针,确保窃取操作的原子性。
  • 内存序控制
    • push 使用 release 保证写入可见性。
    • steal 使用 acquireseq_cst 确保读取顺序。

4.2 环形缓冲区优化

  • 位运算代替取模:若容量为 2 的幂次,b % _capacity 可简化为 b & (_capacity - 1),提升性能。
  • 快速失败检查:队列满/空时直接返回,避免无效操作。

4.3 线程角色分离

  • 本地线程(Owner):仅操作 _bottom,无需原子同步(relaxed 内存序)。
  • 窃取线程(Thief):操作 _top,需严格同步(acquire/release)。

5 性能优化

  • 低竞争场景高效:本地线程的 push/pop 几乎无原子开销。
  • 批量窃取:可扩展为一次窃取多个任务,减少竞争频率。
  • 动态扩容:当前实现容量固定,实际工程中可结合 std::vector 实现动态扩容。

6 应用场景

  • bthread 调度器:每个 Worker 线程维护一个任务队列,空闲线程窃取任务。
  • 并行计算:如 Fork-Join 模型中的任务分发。
  • 事件驱动框架:处理异步 I/O 任务的负载均衡。

7 潜在问题与改进

7.1 ABA 问题

  • 风险:若 _top 指针被多次修改后回到原值,compare_exchange_strong 可能错误成功。
  • 解决方案:使用带版本号的指针(如 (top << 16) | version)。

7.2 容量限制

  • 固定容量:队列满时需由调用方处理(如丢弃任务或动态扩容)。
  • 改进方案:实现动态扩容(需原子地替换 _array_capacity)。

7.3 伪共享

  • 缓存行竞争_bottom_top 需对齐到不同缓存行(代码中可能已通过 alignas 实现)。

8 代码示例(简化版)

template <typename T>
class WorkStealingQueue {
public:WorkStealingQueue(size_t capacity) : _bottom(0), _top(0), _capacity(next_pow2(capacity)) {_array = new T[_capacity];}bool push(T item) {size_t b = _bottom.load(std::memory_order_relaxed);if (b - _top.load(std::memory_order_acquire) >= _capacity) return false;_array[b & (_capacity - 1)] = item;_bottom.store(b + 1, std::memory_order_release);return true;}bool steal(T* item) {size_t t = _top.load(std::memory_order_acquire);size_t b = _bottom.load(std::memory_order_acquire);if (t >= b) return false;*item = _array[t & (_capacity - 1)];return _top.compare_exchange_strong(t, t + 1,std::memory_order_seq_cst,std::memory_order_relaxed);}
};

9 总结

work_stealing_queue.h 是 BRPC 高性能任务调度的核心组件,通过无锁环形缓冲区和原子操作实现高效的任务存取与窃取。其设计充分优化了本地线程操作的性能,同时通过精细的内存序控制保证窃取操作的线程安全。适用于高并发场景下的负载均衡,但需注意容量限制和 ABA 问题的潜在风险。

http://www.xdnf.cn/news/4133.html

相关文章:

  • 车载通信网络安全:挑战与解决方案
  • 小微企业SaaS ERP管理系统,SpringBoot+Vue+ElementUI+UniAPP
  • PDF扫描件交叉合并工具
  • 【背包dp----01背包】例题1------[NOIP2001]装箱问题(简化的01背包)
  • Sublime PrettyJson 快捷键
  • 在 Laravel 12 中实现 WebSocket 通信时进行身份验证
  • ts bug 找不到模块或相应类型的声明,@符有红色波浪线
  • Prometheus实战教程:k8s平台-使用文件服务发现案例
  • Android Retrofit框架分析(三):自动切换回主线程;bulid的过程;create方法+ServiceMethod源码了解
  • 【Azure Redis 缓存】关于Azure Cache for Redis 服务在传输和存储键值对(Key/Value)的加密问题
  • Windows系统修改Docker Desktop(WSL2)内存分配
  • Facebook隐私保护措施的优缺点解析
  • Java面试全栈解析:Spring Boot、Kafka与Redis实战揭秘
  • Jenkins+Newman实现接口自动化测试
  • 蓝桥杯-通电(最小生成树java)
  • Axure : 列表分页、 列表翻页
  • 第1.3讲、什么是 Attention?——从点菜说起 [特殊字符]️
  • FastJSON 使用 `Feature.OrderedField` 修复 `JSONObject` 序列化字段顺序问题
  • 用 GRPO 魔法点亮Text2SQL 的推理之路:让模型“思考”得更像人类
  • AI服务器的作用都有哪些?
  • 【工具使用-数据可视化工具】Apache Superset
  • Cursor 被封解决方案
  • 2、Kafka Replica机制与ISR、HW、LEO、AR、OSR详解
  • .NET 通过回调函数执行 Shellcode启动进程
  • 广州华锐视点邀您参与2025广交会VRAR展【5月10-12日】
  • 快速体验 .NET9 提供的 HybridCache 混合缓存
  • wrod生成pdf。[特殊字符]改背景
  • 基于Piecewise Jerk Speed Optimizer的速度规划算法(附ROS C++/Python仿真)
  • C++多态详解
  • ORCAD打印pdf