当前位置: 首页 > backend >正文

【C/C++】如何在一个事件驱动的生产者-消费者模型中使用观察者进行通知与解耦

文章目录

  • 如何在一个事件驱动的生产者-消费者模型中使用观察者进行通知与解耦?
    • 1 假设场景设计
    • 2 Codes
    • 3 流程图
    • 4 优劣势
    • 5 风险可能

如何在一个事件驱动的生产者-消费者模型中使用观察者进行通知与解耦?


1 假设场景设计

  • Producer(生产者):生成任务并推送到队列中。
  • TaskQueue(主题/被观察者):任务队列,同时也是一个“可被观察”的对象,它在收到新任务后,会主动通知观察者(消费者)
  • Consumer(观察者):注册到队列中,当有新任务时被通知,并从队列中拉取任务。

避免了消费者主动等待(如传统条件变量 wait),改用回调通知


2 Codes

#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <functional>
#include <memory>
#include <atomic>// ========== Observer 接口 ==========
class Observer {
public:virtual void onNotified() = 0;virtual ~Observer() = default;
};// ========== 主题(被观察者) ==========
class TaskQueue {
public:void addObserver(std::shared_ptr<Observer> obs) {std::lock_guard<std::mutex> lock(observerMutex_);observers_.push_back(obs);}void pushTask(int task) {{std::lock_guard<std::mutex> lock(queueMutex_);queue_.push(task);}notifyObservers();}bool popTask(int& task) {std::lock_guard<std::mutex> lock(queueMutex_);if (queue_.empty()) return false;task = queue_.front();queue_.pop();return true;}bool hasTask() {std::lock_guard<std::mutex> lock(queueMutex_);return !queue_.empty();}private:void notifyObservers() {std::lock_guard<std::mutex> lock(observerMutex_);for (auto& obs : observers_) {if (obs) obs->onNotified();  // 回调通知}}private:std::queue<int> queue_;std::mutex queueMutex_;std::vector<std::shared_ptr<Observer>> observers_;std::mutex observerMutex_;
};// ========== 消费者(观察者) ==========
class Consumer : public Observer, public std::enable_shared_from_this<Consumer> {
public:Consumer(std::shared_ptr<TaskQueue> queue, int id): queue_(queue), id_(id), stopFlag_(false) {}void start() {thread_ = std::thread([self = shared_from_this()] {self->run();});}void stop() {stopFlag_ = true;cv_.notify_all();  // 所有线程都唤醒}void onNotified() override {cv_.notify_one();  // 唤醒 run 中等待的线程}private:void run() {while (true) {std::unique_lock<std::mutex> lock(cvMutex_);cv_.wait(lock, [this]() {return stopFlag_ || queue_->hasTask(); });if (stopFlag_ && !queue_->hasTask()) break; int task;while (queue_->popTask(task)) {std::cout << "[Consumer " << id_ << "] Consumed task: " << task << std::endl;}}}private:std::shared_ptr<TaskQueue> queue_;int id_;std::thread thread_;std::atomic<bool> stopFlag_;std::condition_variable cv_;std::mutex cvMutex_;
};// ========== 生产者 ==========
void producer(std::shared_ptr<TaskQueue> queue) {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(200));std::cout << "[Producer] Produced task: " << i << std::endl;queue->pushTask(i);}
}int main() {auto queue = std::make_shared<TaskQueue>();// 启动两个消费者auto consumer1 = std::make_shared<Consumer>(queue, 1);auto consumer2 = std::make_shared<Consumer>(queue, 2);queue->addObserver(consumer1);queue->addObserver(consumer2);consumer1->start();consumer2->start();// 启动生产者线程std::thread prodThread(producer, queue);prodThread.join();std::this_thread::sleep_for(std::chrono::seconds(1));consumer1->stop();consumer2->stop();return 0;
}

输出

[Producer] Produced task: 0
[Consumer 2] Consumed task: 0
[Producer] Produced task: 1
[Consumer 2] Consumed task: 1
[Producer] Produced task: 2
[Consumer 2] Consumed task: 2
[Producer] Produced task: 3
[Consumer 2] Consumed task: 3
[Producer] Produced task: 4
[Consumer 2] Consumed task: 4
[Producer] Produced task: 5
[Consumer 2] Consumed task: 5
[Producer] Produced task: 6
[Consumer 2] Consumed task: 6
[Producer] Produced task: 7
[Consumer 2] Consumed task: 7
[Producer] Produced task: 8
[Consumer 2] Consumed task: 8
[Producer] Produced task: 9
[Consumer 2] Consumed task: 9

关键代码解读:
Consumer 类中的 onNotified()run() 方法是如何配合实现消费者监听通知的 ==》背后即“观察者 + 条件变量”的事件驱动机制
TaskQueue::notifyObservers() 调用Consumer::onNotified,唤醒等待的 Consumer::run() 线程。

二者配合流程详解

  1. run() 是消费者线程主循环(由 start() 启动)

    • 每个 Consumer 启动后会在一个独立线程中运行 run() 方法;
    • 它使用 cv_.wait(lock) 进入 阻塞等待状态,直到被通知(由 notify_one() 唤醒);
    • 唤醒后尝试从 TaskQueuepopTask(),直到队列为空;
    • 然后再次进入等待。
  2. onNotified() 是“被观察者”的回调通知函数

    • TaskQueue::notifyObservers() 被调用(例如 pushTask() 中调用)时,会遍历注册的观察者;
    • 每个观察者(即 Consumer)都会被调用 onNotified()
    • onNotified() 会调用 cv_.notify_one(),唤醒 run() 中正在等待的线程。

3 流程图

[Producer]↓ pushTask()
[TaskQueue]↓ notifyObservers()
[Consumer]↓ onNotified()→ cv_.notify_one()↓
[run() loop]→ cv_.wait() 被唤醒↓→ popTask()↓→ 处理任务

4 优劣势

编码可能遇到的问题原因/应对
cv.wait() 可能虚假唤醒可用 cv.wait(lock, condition) 代替裸 wait(),避免无任务时误唤醒。
多个消费者抢任务多个消费者被唤醒时要竞争 queue_ 锁,可通过加任务标签或调度器来分配。
重复唤醒开销大若任务频繁到达,建议合并通知、或按“任务计数”通知。
优点描述
解耦消费者不需要主动轮询,事件驱动机制带来良好模块化。
可扩展支持多个消费者动态注册,符合微服务或事件分发模型。
降低等待利用通知机制唤醒消费者,避免空轮询带来的 CPU 消耗。
灵活性可轻松拓展为异步观察者队列、支持任务优先级、过滤等机制。

5 风险可能

  • 若消费者数量多,且频繁 wakeup,可能存在“惊群效应”。
  • 可以通过线程绑定负载均衡策略来优化通知粒度。
  • 可扩展为事件过滤、类型区分(如不同类型的消费者响应不同事件)。
http://www.xdnf.cn/news/9090.html

相关文章:

  • [TriCore] 01.QEMU 虚拟化 TriCore 架构中的寄存器 指令
  • 小红书文章内容提取免费API接口教程
  • java基础(面向对象进阶高级)内部类
  • leetcode hot100刷题日记——17.搜索插入位置
  • Linux中logger命令的使用方法详解
  • 嵌入式开发STM32 -- 江协科技笔记
  • window 显示驱动开发-呈现开销改进(二)
  • c++进阶——智能指针
  • maven中的grpc编译插件protobuf-maven-plugin详解
  • SQL进阶之旅 Day 4:子查询与临时表优化
  • C/C++语言中成双成对出现的都有哪些?
  • STM32程序运行不了,仿真功能也异常,连断点和复位都异常了
  • 网络流学习笔记(基础)
  • Beckhoff PLC 功能块 FB_CTRL_ACTUAL_VALUE_FILTER (模拟量滤波)
  • vSphere 7.0 client 提示HTTP状态 500- 内部服务器错误
  • GROUP BY SQL
  • 【动态规划】子数组系列(一)
  • 【备战秋招】C++音视频开发经典面试题整理
  • 学校住宿管理系统——仙盟创梦IDE
  • OpenGL Chan视频学习-7 How I Deal with Shaders in OpenGL
  • 0基础学习Linux之揭开朦胧一面:环境基础开发工具
  • java8函数式接口(函数式接口的匿名实现类作为某些方法的入参)
  • 2025年5月系统架构设计师考试真题回忆版
  • 7.安卓逆向2-frida hook技术-介绍
  • 重学计算机网络之命令整理
  • 数据加密技术:守护网络通信安全的基石
  • ceph 报错 full ratio(s) out of order
  • Elasticsearch数据同步方案
  • VS Code设置Dev Containers: Reopen in Container
  • MongoDB基础知识(浅显)