当前位置: 首页 > news >正文

linux 条件变量与生产消费者模型

目录

1.为什么需要条件变量(它解决了什么问题?)

2.条件变量的接口说明

2.1 初始化和销毁

2.2 等待(Waiting) - 核心接口

2.3 限时等待(Timed Wait)

2.4 唤醒(Signaling)

3. 生产消费者模型(CP模型)

3.1. 为什么(Why):为什么要用生产者-消费者模型?

3.2 是什么(What):模型的核心组成是什么?

3.3 怎么做(How):如何用代码实现?(C++ with pthreads)

3.3.1 基于blockingQueue实现

3.3.2 基于环形队列的生产消费模型


1.为什么需要条件变量(它解决了什么问题?)

在之前我们引入了互斥锁和信号量,确保在并发编程时,只有一个执行流可以访问共享资源。但是我们使用加锁操作也带来很多问题,比如使用互斥锁可能导致线程饥饿,消耗CPU资源等诸多问题

在最基本的层面上,线程同步是为了解决由并发执行导致的不确定性问题,从而保证程序的正确性(Correctness)

  1. 维护数据一致性(Data Consistency)

    • 核心问题:多个线程交错执行非原子操作(如 counter++),会导致最终结果依赖于不可控的执行顺序,从而产生错误。

    • 同步的作用:通过互斥锁等机制,将一系列操作包装成一个原子性(Atomic) 的临界区(Critical Section)。使得这些操作要么全做,要么不做,对外看起来就像瞬间完成的一样,避免了交错执行,从而保证了共享数据处于一致的状态。

  2. 维护操作顺序(Ordering of Operations)

    • 核心问题:线程A必须在线程B完成某项任务之后才能开始它的工作。例如,消费者线程必须等待生产者线程生产出数据后才能消费。

    • 同步的作用:通过条件变量、信号量等机制,允许线程主动等待(阻塞)或通知他人,从而协调线程间操作的先后顺序,确保逻辑上的依赖性得到满足。

这些都是线程同步(通过加锁操作)所解决的一下问题。互斥锁(Mutex)解决了互斥访问的问题,即“一次只有一个线程能进”。但它无法解决条件等待的问题,即“如果条件不满足,我该怎么等?等的时候锁怎么办?”

条件变量(Condition Variable)就是实现这种“等待-唤醒”机制的工具。 它允许线程在某个条件不成立时高效地休眠,并在条件可能成立时被其他线程唤醒。

总结:同步是为了用性能换取正确性。一个没有正确性的程序,性能再好也毫无意义。因此,同步是并发编程的“必需品”,那么条件变量是为了解决在加锁情况下避免对CPU资源的过度浪费。

  1. 同步的根本目的:牺牲一部分性能,确保程序的正确性确定性

  2. 同步的必要性:正确性是程序的底线,是不可妥协的。

  3. 条件变量的核心价值:它是一种“聪明”的同步机制,在保证正确性的同时,极大地优化了性能,解决了“忙等待”这个性能杀手。

2.条件变量的接口说明

2.1 初始化和销毁

初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);

  • cond: 指向要初始化的条件变量的指针。

  • attr: 条件变量属性,通常传 NULL 使用默认属性。

  • 返回值: 成功返回0,失败返回错误码。

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

对于全局或静态条件变量,可以用宏直接初始化:

销毁

int pthread_cond_destroy(pthread_cond_t *cond);

  • 用于销毁一个不再使用的条件变量,释放其资源。

2.2 等待(Waiting) - 核心接口

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
  • cond: 要等待的条件变量。

  • mutex一个已经由当前线程锁定的互斥锁

  • 这是一个原子操作,它做了三件事

    1. 解锁:释放绑定的互斥锁 mutex,让其他线程有机会修改共享条件和获取锁。

    2. 阻塞:将当前线程挂起,投入等待队列,让出CPU。

    3. 重新加锁:当被唤醒后,在从函数返回之前,会重新获取互斥锁 mutex

为什么必须和互斥锁一起用?
检查条件(如 buffer is empty)和进入等待状态(wait)必须是原子操作。否则,可能会发生:

  1. 线程A检查条件,发现不满足(缓冲区空),准备等待

  2. 就在它调用 wait 之前,调度器切换到了线程B(生产者)。

  3. 线程B生产了数据,发送了 signal。但此时线程A还没在等待,这个 signal 丢失了

  4. 线程A恢复执行,调用 wait(),但可能再也没有人来唤醒它了 -> 永久休眠
    使用互斥锁可以确保在检查条件和进入等待的整个过程中,其他线程无法修改条件

2.3 限时等待(Timed Wait)

int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
  • abstime: 指定一个绝对时间(从1970年1月1日UTC时间开始的秒和纳秒),表示等待的最后期限。

  • 如果在 abstime 指定的时间前被唤醒,返回0。

  • 如果超时时间到仍未唤醒,返回 ETIMEDOUT

  • 用于避免线程因丢失信号或其他原因而无限期等待

2.4 唤醒(Signaling)

int pthread_cond_signal(pthread_cond_t *cond);
唤醒至少一个线程
  • 发送一个信号给条件变量 cond,唤醒至少一个正在等待该条件的线程(具体唤醒哪个取决于调度策略)。

  • 如果当前没有线程在等待,这个信号就什么也不做(即不会累积)。

  • 效率较高,在只需唤醒一个消费者就能处理事件时使用(例如,缓冲区中只有一件商品)。

    int pthread_cond_broadcast(pthread_cond_t *cond);
    唤醒所有线程

  • 发送一个信号,唤醒所有正在等待该条件变量的线程。

  • 开销比 signal 大,通常在多个线程都能处理当前状态时使用(例如,缓冲区从满变为非满,所有生产者线程都可以被唤醒;或者资源可用性发生重大变化)

3. 生产消费者模型(CP模型)

3.1. 为什么(Why):为什么要用生产者-消费者模型?

核心目的:解耦(Decoupling)和平衡效率(Balancing Efficiency)。

想象一个现实生活中的例子:一个面包店。

  • 没有模型(耦合紧密):顾客(消费者)必须站在面包师(生产者)旁边,每做好一个面包,面包师就亲手交给顾客。面包师快,顾客就得等着接;顾客没来,面包师做好了也没地方放,只能等着。效率极低,双方互相依赖、互相阻塞。

  • 使用模型(解耦):在面包师和顾客之间引入一个货架(缓冲区)

    • 面包师只管生产面包,做好后就放到货架上。

    • 顾客只管从货架上取走面包。

    • 双方不需要知道对方的存在,也不需要直接交互。他们只和货架打交道。

这样做的好处是巨大的:

  1. 解耦(Decoupling):生产者和消费者之间不直接通信,而是通过缓冲区进行通信。这使得它们可以独立地、并发地执行,修改一方的代码不会直接影响另一方。

  2. 平衡速度差异(Speed Matching):生产者可能有时快有时慢(如处理网络请求),消费者也是如此。缓冲区就像一个“水库”,可以削峰填谷,平衡双方的速度差异,避免快的等待慢的。

  3. 提高整体吞吐量(Throughput):当生产者爆发时,它可以快速生产并填满缓冲区,然后去做别的事(或等待)。消费者可以持续地从缓冲区消费。反之亦然。这最大限度地利用了系统资源。

在计算机中的典型应用:

  • 消息队列(如Kafka, RabbitMQ):应用程序(生产者)产生消息放入队列,其他服务(消费者)从队列取出处理。

  • 线程池/任务队列:主线程(生产者)提交任务到队列,工作线程(消费者)从队列获取并执行任务。

  • 数据处理管道:一个阶段处理完数据放入缓冲区,下一个阶段从缓冲区读取。

  • I/O 和计算的重叠:一个线程从磁盘读数据(生产)到缓冲区,另一个线程同时处理缓冲区中的数据(消费)。

3.2 是什么(What):模型的核心组成是什么?

生产者-消费者模型有三个核心组成部分:

  1. 生产者(Producer(s))

    • 职责:生成数据或任务单元。

    • 行为:将生成的数据放入一个共享的缓冲区中。

    • 问题:如果缓冲区满了,它必须等待

  2. 消费者(Consumer(s))

    • 职责:处理数据或任务单元。

    • 行为:从共享缓冲区中取出数据并进行处理。

    • 问题:如果缓冲区空了,它必须等待

  3. 缓冲区(Buffer)

    • 职责:作为生产者和消费者之间的通信通道。

    • 形态:通常是一个有大小限制的队列(FIFO)。它屏蔽了生产者和消费者的细节。

    • 关键:对缓冲区的所有操作(放入和取出)都必须是线程安全的。

这个模型引出了两个关键的同步条件:

  • 缓冲区空 -> 消费者必须等待,生产者放入后通知消费者。

  • 缓冲区满 -> 生产者必须等待,消费者取出后通知生产者。

注:321原则,三种关系(生产者和消费者之间的同步与互斥),两个角色(生产者和消费者),一个交易场所(存放数据的容器)。

3.3 怎么做(How):如何用代码实现?(C++ with pthreads)

3.3.1 基于blockingQueue实现
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进行操作时会被阻塞)

BlockingQueue类模版

#ifndef BLOCKING_QUEUE_H
#define BLOCKING_QUEUE_H#include <queue>
#include <mutex>
#include <condition_variable>
#include <iostream>template <typename T>
class BlockingQueue {
public:explicit BlockingQueue(size_t max_size) : max_size_(max_size) {}// 放入数据,如果队列满则阻塞void put(const T& item) {std::unique_lock<std::mutex> lock(mutex_);// 等待队列不满not_full_.wait(lock, [this]() { return queue_.size() < max_size_; });queue_.push(item);std::cout << "Produced: " << item << " (Size: " << queue_.size() << ")" << std::endl;// 通知消费者队列不空(唤醒消费者)not_empty_.notify_one();}// 取出数据,如果队列空则阻塞T take() {std::unique_lock<std::mutex> lock(mutex_);// 等待队列不空not_empty_.wait(lock, [this]() { return !queue_.empty(); });T item = queue_.front();queue_.pop();std::cout << "Consumed: " << item << " (Size: " << queue_.size() << ")" << std::endl;// 通知生产者队列不满not_full_.notify_one();return item;}// 非阻塞尝试放入bool try_put(const T& item) {std::unique_lock<std::mutex> lock(mutex_);if (queue_.size() >= max_size_) {return false;}queue_.push(item);std::cout << "Produced: " << item << " (Size: " << queue_.size() << ")" << std::endl;not_empty_.notify_one();return true;}// 非阻塞尝试取出bool try_take(T& item) {std::unique_lock<std::mutex> lock(mutex_);if (queue_.empty()) {return false;}item = queue_.front();queue_.pop();std::cout << "Consumed: " << item << " (Size: " << queue_.size() << ")" << std::endl;not_full_.notify_one();return true;}size_t size() const {std::unique_lock<std::mutex> lock(mutex_);return queue_.size();}bool empty() const {std::unique_lock<std::mutex> lock(mutex_);return queue_.empty();}bool full() const {std::unique_lock<std::mutex> lock(mutex_);return queue_.size() >= max_size_;}private:std::queue<T> queue_;mutable std::mutex mutex_;std::condition_variable not_empty_;std::condition_variable not_full_;size_t max_size_;
};

模拟生产与消费的过程


#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <random>// 随机数生成器
int get_random_number(int min, int max) {static std::random_device rd;static std::mt19937 gen(rd());std::uniform_int_distribution<> distrib(min, max);return distrib(gen);
}// 生产者函数
void producer(BlockingQueue<int>& queue, int id) {int item = 0;while (true) {// 模拟生产耗时std::this_thread::sleep_for(std::chrono::milliseconds(get_random_number(100, 500)));// 生产一个产品int product = id * 1000 + item++;queue.put(product);// 生产一定数量后退出if (item > 10) break;}std::cout << "Producer " << id << " finished." << std::endl;
}// 消费者函数
void consumer(BlockingQueue<int>& queue, int id) {int count = 0;while (true) {// 模拟消费耗时std::this_thread::sleep_for(std::chrono::milliseconds(get_random_number(200, 800)));// 消费一个产品int product = queue.take();count++;// 处理产品(这里只是打印)std::cout << "Consumer " << id << " processed: " << product << std::endl;// 消费一定数量后退出if (count > 15) break;}std::cout << "Consumer " << id << " finished." << std::endl;
}int main() {const size_t BUFFER_SIZE = 5;const int NUM_PRODUCERS = 2;const int NUM_CONSUMERS = 3;BlockingQueue<int> queue(BUFFER_SIZE);std::vector<std::thread> producers;std::vector<std::thread> consumers;// 创建生产者线程for (int i = 0; i < NUM_PRODUCERS; ++i) {producers.emplace_back(producer, std::ref(queue), i + 1);}// 创建消费者线程for (int i = 0; i < NUM_CONSUMERS; ++i) {consumers.emplace_back(consumer, std::ref(queue), i + 1);}// 等待所有生产者完成for (auto& p : producers) {p.join();}// 等待所有消费者完成(可能需要额外机制确保消费者也能退出)// 这里简单等待,实际应用中可能需要优雅退出机制for (auto& c : consumers) {c.join();}std::cout << "All threads finished. Final queue size: " << queue.size() << std::endl;return 0;
}
3.3.2 基于环形队列的生产消费模型

#include <iostream>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
#include <vector>#define BUFFER_SIZE 5class RingBufferSem {
private:std::vector<int> buffer;int head;int tail;sem_t empty_sem;  // 空闲槽位信号量sem_t full_sem;   // 已填充槽位信号量sem_t mutex;      // 互斥信号量public:RingBufferSem(int size = BUFFER_SIZE) : buffer(size), head(0), tail(0) {// 初始化信号量sem_init(&empty_sem, 0, size);  // 初始有size个空闲槽位sem_init(&full_sem, 0, 0);      // 初始没有已填充槽位sem_init(&mutex, 0, 1);         // 二进制信号量,用于互斥}~RingBufferSem() {sem_destroy(&empty_sem);sem_destroy(&full_sem);sem_destroy(&mutex);}void produce(int item) {// 等待空闲槽位sem_wait(&empty_sem);// 获取互斥锁访问缓冲区sem_wait(&mutex);// 生产物品buffer[tail] = item;tail = (tail + 1) % buffer.size();std::cout << "Produced: " << item << " (head: " << head << ", tail: " << tail << ")" << std::endl;// 释放互斥锁sem_post(&mutex);// 增加已填充槽位计数sem_post(&full_sem);}int consume() {// 等待已填充槽位sem_wait(&full_sem);// 获取互斥锁访问缓冲区sem_wait(&mutex);// 消费物品int item = buffer[head];head = (head + 1) % buffer.size();std::cout << "Consumed: " << item << " (head: " << head << ", tail: " << tail << ")" << std::endl;// 释放互斥锁sem_post(&mutex);// 增加空闲槽位计数sem_post(&empty_sem);return item;}
};// 生产者线程函数
void* producer_sem(void* arg) {RingBufferSem* rb = static_cast<RingBufferSem*>(arg);int item = 0;while (true) {rb->produce(item++);usleep(300000); // 生产间隔0.3秒}return nullptr;
}// 消费者线程函数
void* consumer_sem(void* arg) {RingBufferSem* rb = static_cast<RingBufferSem*>(arg);while (true) {rb->consume();usleep(500000); // 消费间隔0.5秒(消费比生产慢)}return nullptr;
}// 多生产者示例
void* multi_producer(void* arg) {RingBufferSem* rb = static_cast<RingBufferSem*>(arg);int thread_id = *(static_cast<int*>(arg));int item = thread_id * 1000;while (true) {rb->produce(item++);usleep(400000 + (rand() % 200000)); // 随机间隔}return nullptr;
}int main() {RingBufferSem rb;pthread_t prod_thread, cons_thread;pthread_t prod_thread2; // 第二个生产者std::cout << "=== 单生产者单消费者示例 ===" << std::endl;pthread_create(&prod_thread, nullptr, producer_sem, &rb);pthread_create(&cons_thread, nullptr, consumer_sem, &rb);// 运行5秒后结束示例sleep(5);pthread_cancel(prod_thread);pthread_cancel(cons_thread);pthread_join(prod_thread, nullptr);pthread_join(cons_thread, nullptr);std::cout << "\n=== 多生产者单消费者示例 ===" << std::endl;int id1 = 1, id2 = 2;pthread_create(&prod_thread, nullptr, [](void* arg) -> void* {RingBufferSem* rb = static_cast<RingBufferSem*>(arg);int item = 1000;while (true) {rb->produce(item++);usleep(350000);}return nullptr;}, &rb);pthread_create(&prod_thread2, nullptr, [](void* arg) -> void* {RingBufferSem* rb = static_cast<RingBufferSem*>(arg);int item = 2000;while (true) {rb->produce(item++);usleep(450000);}return nullptr;}, &rb);pthread_create(&cons_thread, nullptr, consumer_sem, &rb);// 运行5秒sleep(5);pthread_cancel(prod_thread);pthread_cancel(prod_thread2);pthread_cancel(cons_thread);pthread_join(prod_thread, nullptr);pthread_join(prod_thread2, nullptr);pthread_join(cons_thread, nullptr);return 0;
}

通过信号量就可以完美解决这些问题。

http://www.xdnf.cn/news/1395433.html

相关文章:

  • 玳瑁的嵌入式日记D29-0829(进程间通信)
  • Python OpenCV图像处理与深度学习:Python OpenCV开发环境搭建与入门
  • 基于能量方法的纳维-斯托克斯方程高阶范数有界性理论推导-陈墨仙
  • STM32CubeMX + HAL 库:基于 I²C 通信的 AHT20 高精度温湿度测量实验
  • 【系列03】端侧AI:构建与部署高效的本地化AI模型 第2章:端侧AI硬件入门
  • 134-细粒度多尺度符号熵和鲸鱼优化算法的滚动轴承故障诊断技术MSVM
  • Redis搭建哨兵模式一主两从三哨兵
  • 线程安全及死锁问题
  • 【好题推荐】运算符的构造运用
  • 光伏发多少电才够用?匹配家庭用电需求
  • #医疗AI时代的生物医学Go编程:高性能计算与精准医疗的案例分析(五)
  • Linux内核进程管理子系统有什么第三十八回 —— 进程主结构详解(34)
  • JUC并发编程09 - 内存(01) - JMM/cache
  • 嵌入式Linux设备树驱动开发 - dtsof驱动
  • Unity DateTime 相关
  • 处理器(CPU/MPU)的双发射是什么?
  • 命令扩展与重定向
  • 可解释人工智能XAI
  • 【机器学习深度学习】Embedding 与 RAG:让 AI 更“聪明”的秘密
  • leetcode 191 位1的个数
  • 【0422】SMgrRelationData 中 md_num_open_segs 和 md_seg_fds 数组为什么是 4 个元素? 第四个元素表示什么?
  • Ubuntu磁盘分区重新挂载读写指南
  • 不一样的发票管理模式-发票识别+发票查验接口
  • ContextMenuManager for Win:优化右键菜单,解决用户痛点
  • lxml库如何使用
  • ElasticSearch对比Solr
  • C语言————操作符详解
  • TypeScript的Type
  • MySQL 中如果发生死锁应该如何解决?
  • 每日算法题【二叉树】:对称二叉树、二叉树的前中后序遍历