当前位置: 首页 > news >正文

【中间件】bthread_基础_TaskControl

TaskControl

  • 1 Definition
  • 2 Introduce
      • **核心职责**
  • 3 成员解析
    • **3.1 数据结构与线程管理**
    • **3.2 任务调度与负载均衡**
    • **3.3 线程停放与唤醒(ParkingLot)**
    • **3.4 统计与监控**
  • 4 **工作流程**
  • 5 **设计亮点**
  • 6 **使用场景示例**
  • 7 **总结**
  • 8 学习过程中的疑问
    • 8.1 init函数为什么不在构造函数中调用

1 Definition

class TaskControl {friend class TaskGroup; // 友元类friend void wait_for_butex(void*); // 友元函数
#ifdef BRPC_BTHREAD_TRACERfriend bthread_t init_for_pthread_stack_trace(); // 友元函数
#endif // BRPC_BTHREAD_TRACERpublic:TaskControl();~TaskControl();// Must be called before using. `nconcurrency' is # of worker pthreads.int init(int nconcurrency);// Create a TaskGroup in this control.TaskGroup* create_group(bthread_tag_t tag);// Steal a task from a "random" group.bool steal_task(bthread_t* tid, size_t* seed, size_t offset);// Tell other groups that `n' tasks was just added to caller's runqueuevoid signal_task(int num_task, bthread_tag_t tag);// Stop and join worker threads in TaskControl.void stop_and_join();// Get # of worker threads.int concurrency() const { return _concurrency.load(butil::memory_order_acquire); }int concurrency(bthread_tag_t tag) const { return _tagged_ngroup[tag].load(butil::memory_order_acquire); }void print_rq_sizes(std::ostream& os);double get_cumulated_worker_time();double get_cumulated_worker_time_with_tag(bthread_tag_t tag);int64_t get_cumulated_switch_count();int64_t get_cumulated_signal_count();// [Not thread safe] Add more worker threads.// Return the number of workers actually added, which may be less than |num|int add_workers(int num, bthread_tag_t tag);// Choose one TaskGroup (randomly right now).// If this method is called after init(), it never returns NULL.TaskGroup* choose_one_group(bthread_tag_t tag);#ifdef BRPC_BTHREAD_TRACER// A stacktrace of bthread can be helpful in debugging.void stack_trace(std::ostream& os, bthread_t tid);std::string stack_trace(bthread_t tid);
#endif // BRPC_BTHREAD_TRACERprivate:typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;static const int PARKING_LOT_NUM = 4;typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot;// Add/Remove a TaskGroup.// Returns 0 on success, -1 otherwise.int _add_group(TaskGroup*, bthread_tag_t tag);int _destroy_group(TaskGroup*);// Tag groupTaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; }// Tag ngroupbutil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; }// Tag parking slotTaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; }static void delete_task_group(void* arg);static void* worker_thread(void* task_control);template <typename F>void for_each_task_group(F const& f);bvar::LatencyRecorder& exposed_pending_time();bvar::LatencyRecorder* create_exposed_pending_time();bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag);bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag);std::vector<butil::atomic<size_t>> _tagged_ngroup;std::vector<TaggedGroups> _tagged_groups;butil::Mutex _modify_group_mutex;butil::atomic<bool> _init;  // if not init, bvar will case coredumpbool _stop;butil::atomic<int> _concurrency;std::vector<pthread_t> _workers;butil::atomic<int> _next_worker_id;bvar::Adder<int64_t> _nworkers;butil::Mutex _pending_time_mutex;butil::atomic<bvar::LatencyRecorder*> _pending_time;bvar::PassiveStatus<double> _cumulated_worker_time;bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second;bvar::PassiveStatus<int64_t> _cumulated_switch_count;bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second;bvar::PassiveStatus<int64_t> _cumulated_signal_count;bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second;bvar::PassiveStatus<std::string> _status;bvar::Adder<int64_t> _nbthreads;std::vector<bvar::Adder<int64_t>*> _tagged_nworkers;std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second;std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;std::vector<TaggedParkingLot> _pl;#ifdef BRPC_BTHREAD_TRACERTaskTracer _task_tracer;
#endif // BRPC_BTHREAD_TRACER};

2 Introduce

TaskControl作为任务调度控制中心,管理多个任务组(TaskGroup)并协调工作线程的高效运作,适用于BRPC的bthread协程库:

核心职责

  1. 任务组管理:创建、销毁任务组,支持按标签(bthread_tag_t)分类管理。
  2. 线程池调度:动态调整工作线程数量,实现任务窃取(Work Stealing)以平衡负载。
  3. 同步与唤醒:通过停放区(ParkingLot)管理线程的休眠与唤醒。
  4. 性能监控:集成统计模块(bvar)跟踪任务处理时间、切换次数等指标。

3 成员解析

3.1 数据结构与线程管理

  • _tagged_groups
    类型:std::vector<TaggedGroups>
    作用:按标签存储任务组指针数组,每个标签对应一个TaggedGroups(固定大小为BTHREAD_MAX_CONCURRENCY的数组)。
    示例:标签可用于区分不同业务优先级或租户的任务。

  • _tagged_ngroup
    类型:std::vector<butil::atomic<size_t>>
    作用:记录每个标签下的任务组数量,原子操作保证线程安全。

  • _workers
    类型:std::vector<pthread_t>
    作用:保存所有工作线程的ID,用于线程生命周期管理(启动、停止、回收)。

  • _concurrency
    类型:butil::atomic<int>
    作用:总工作线程数,支持原子读写,动态调整并发度。

3.2 任务调度与负载均衡

  • steal_task(bthread_t* tid, size_t* seed, size_t offset)
    作用:从其他任务组窃取任务,避免工作线程空闲。
    实现:

    • 使用seed随机选择目标组,结合offset避免多个线程竞争同一队列。
    • 窃取成功返回true,任务ID存入tid
  • signal_task(int num_task, bthread_tag_t tag)
    作用:通知任务组有新任务加入,触发唤醒机制。
    场景:当任务被添加到队列时,调用此方法唤醒可能休眠的线程。

  • choose_one_group(bthread_tag_t tag)
    作用:根据标签选择一个任务组,用于任务分发或负载均衡。
    策略:可能采用轮询或随机算法选择组,确保任务均匀分配。

3.3 线程停放与唤醒(ParkingLot)

  • _pl
    类型:std::vector<TaggedParkingLot>
    作用:按标签管理的停放区数组,每个标签对应PARKING_LOT_NUM个停放区。
    机制:
    • 工作线程无任务时进入停放区等待,减少CPU空转。
    • 新任务到达时,通过停放区唤醒线程,降低延迟。

3.4 统计与监控

  • bvar集成
    关键指标:

    • _cumulated_worker_time:累计任务处理时间。
    • _cumulated_switch_count:上下文切换次数。
    • _signal_per_second:每秒任务唤醒次数。
      作用:通过BRPC的bvar库暴露性能指标,方便实时监控与调优。
  • 标签化统计
    成员如_tagged_nworkers_tagged_cumulated_worker_time等,按标签细分统计,支持多维分析。

4 工作流程

  1. 初始化

    • 调用init(nconcurrency)创建指定数量工作线程,每个线程执行worker_thread函数。
    • 工作线程通过choose_one_group选择任务组,执行任务循环。
  2. 任务执行

    • 线程从本地任务组获取任务,若队列为空,尝试从其他组窃取(steal_task)。
    • 无任务可执行时,进入停放区(ParkingLot)休眠。
  3. 任务通知

    • 添加新任务时,调用signal_task递增信号计数器,唤醒停放区线程。
  4. 动态扩缩容

    • add_workers动态增加指定标签的工作线程,适应负载变化。
  5. 停止与清理

    • stop_and_join设置_stop标志,通知所有线程退出,并回收资源。

5 设计亮点

  • 标签化分组
    支持多维度任务分类,适用于混合部署场景(如不同服务优先级)。
  • 任务窃取
    避免工作线程闲置,提升CPU利用率,降低任务处理延迟。
  • 高效同步
    结合原子操作与停放区,减少锁竞争,保证高吞吐量。
  • 精细化监控
    通过bvar提供详尽的运行时指标,助力性能分析与优化。

6 使用场景示例

高并发服务

  • Web服务器接收请求后,封装为bthread任务,按业务类型打标签。
  • TaskControl根据标签分发任务到不同组,保证关键业务优先调度。
  • 工作线程动态扩展应对流量高峰,空闲时自动缩减节省资源。
  • 监控指标实时反馈系统负载,辅助容量规划。

7 总结

TaskControl是BRPC bthread调度系统的核心,通过高效的任务组管理、工作线程调度及细粒度监控,实现了高并发、低延迟的协程任务处理。其设计充分考虑了扩展性、性能与可观测性,是构建高性能C++服务的基石组件。

8 学习过程中的疑问

8.1 init函数为什么不在构造函数中调用

疑问:init函数在注释中声明需要在使用前调用,为什么不能将其放在构造函数中直接调用呢?
回答:

  • 可能原因:

      1. 初始化可能失败,需要错误处理
      • 构造函数没有返回值,若在构造函数中执行可能的失败操作(eg. 创建线程、分配资源),只能通过异常或设置内部状态标记错误。brpc代码风格倾向于避免异常,习惯于返回错误码。参考代码8-1
      1. 需要依赖外部参数
      • 构造时参数可能不完整,TaskControl的初始化需要并发线程数等参数,可能在运行时动态确定,无法在编译器硬编码。
      • 更复杂的重载,如果后续需要扩展初始化参数(eg. 增加timeout/policy等配置),显式init()更容易扩展,而构造函数重载会膨胀。
    • 3 支持对象的复用
      • 销毁后重新初始化,某些场景下,用户可能希望销毁TaskControl后重新初始化(eg. 动态调整线程池的大小)。若初始化逻辑在构造函数中,则需要先析构对象再重新构造,而显式init/stop_and_join允许复用同一对象。参考代码8-2
    • 4 明确的二段式生命周期
      • 分离资源分配与初始化,二段式设计(构造+init())将对喜爱嗯的内存分配和资源初始化解耦:
        • 构造阶段:仅进行内存布局、简单成员初始化;
        • 初始化阶段:执行重量级操作(eg. 创建线程、连接资源);
      • 更符合RAII的变体模式,尤其是在需要延迟初始化时。
    • 5 避免隐藏的副作用
      • 隐式初始化可能引入意外行为,若构造函数自动初始化,用户可能在不知情的情况下触发资源分配(eg. 线程创建)。显式init()强制用户主动控制初始化时机,避免副作用。
    • 6 与brpc其他组件的设计一致性
      • 统一风格,brpc很多组件(eg. Channel / Server)均采用类似的二段式模式(先构造,再调用init() / start()),保持代码风格统一,降低用户学习成本。
  • 什么情况下应在构造函数中初始化?

    • 轻量级且无失败可能的操作,eg. 设置默认参数、初始化原子计数器等。
    • 强制一次性初始化,若对象必须在构造时完全初始化,且不允许重新初始化。
  • 两种方式对比,见代码8-3

  • 二段式的作用

    • 清晰的错误处理:通过返回int明确传递错误
    • 参数灵活性:允许运行时动态决定初始化参数
    • 对象复用:支持重新初始化而不重新构造
    • 代码一致性:符合BRPC设计惯例

代码8-1:

// 当前使用方法
TaskControl ctl;
if (ctl.init(32) != 0) {// 处理初始化失败
}// 如果放在构造函数中
TaskControl ctl(32);
if (!ctl.is_initialized()) {// 处理错误
}

代码8-2

TaskControl ctl;
ctl.init(16);
ctl.stop_and_join();
ctl.init(32);

代码8-3

// 二段式
class TaskControl {
public:TaskControl();  // 轻量构造~TaskControl();int init(int nconcurrency);  // 显式初始化// ...
};// 使用方式
TaskControl ctl;
if (ctl.init(32) != 0) {LOG(ERROR) << "Failed to initialize TaskControl";return -1;
}// 合并到构造方式
class TaskControl {
public:explicit TaskControl(int nconcurrency);  // 可能抛出异常~TaskControl();bool is_initialized() const;  // 需额外状态检查// ...
};// 使用方式
try {TaskControl ctl(32);
} catch (const std::exception& e) {LOG(ERROR) << "Construction failed: " << e.what();
}
if (!ctl.is_initialized()) {  // 需要额外检查// 处理错误
}
http://www.xdnf.cn/news/241813.html

相关文章:

  • PyTorch 与 TensorFlow:深度学习框架的深度剖析与实战对比
  • 怎么查看数据库容量
  • REST API、FastAPI与Flask API的对比分析
  • cdn服务器连接异常怎么办?cdn连接失败解决方法有哪些?
  • 深入解析 Python 应用日志监控:ELK、Graylog 的实战指南
  • WPF采集欧姆龙PLC、基恩士PLC、西门子PLC、汇川PLC、台达PLC数据
  • 从请求到响应:初探spring web
  • PCA主成分分析法(最大投影方差,最小重构距离,SVD角度)
  • AI数字人系统开发:技术架构、应用场景与未来趋势
  • 【进阶】--函数栈帧的创建和销毁详解
  • TDA4VM SDK J721E (RTOS/Linux) bootloaders梳理笔记
  • RDMA高性能网络通信实践
  • 航空客户价值分析阶段性测验
  • Python数据分析课程实验-1
  • 使用DCI和RTIT技术进行精准调优--看录像
  • C++之特殊类设计及类型转换
  • 【kafka系列】消费者组
  • 系统架构设计师:设计模式——创建型设计模式
  • 解锁 C++26 的未来:从语言标准演进到实战突破
  • Nginx核心功能——nginx代理
  • 【数据结构】- 栈
  • Vue之脚手架与组件化开发
  • 计算机网络——HTTP/IP 协议通俗入门详解
  • 优雅关闭服务:深入理解 SIGINT / SIGTERM 信号处理机制
  • WPF封装常用的TCP、串口、Modbus、MQTT、Webapi、PLC通讯工具类
  • C#扩展方法(Extension Method)
  • Python生活手册-文件二进制:从快递柜到生鲜冷链的数据保鲜术
  • 健康生活新主张:全方位养生指南
  • Linux 命令如何同时支持文件参数与管道输入?
  • 基于数字图像处理的裂缝检测与识别系统(Matlab)