linux 条件变量与生产消费者模型
目录
1.为什么需要条件变量(它解决了什么问题?)
2.条件变量的接口说明
2.1 初始化和销毁
2.2 等待(Waiting) - 核心接口
2.3 限时等待(Timed Wait)
2.4 唤醒(Signaling)
3. 生产消费者模型(CP模型)
3.1. 为什么(Why):为什么要用生产者-消费者模型?
3.2 是什么(What):模型的核心组成是什么?
3.3 怎么做(How):如何用代码实现?(C++ with pthreads)
3.3.1 基于blockingQueue实现
3.3.2 基于环形队列的生产消费模型
1.为什么需要条件变量(它解决了什么问题?)
在之前我们引入了互斥锁和信号量,确保在并发编程时,只有一个执行流可以访问共享资源。但是我们使用加锁操作也带来很多问题,比如使用互斥锁可能导致线程饥饿,消耗CPU资源等诸多问题
在最基本的层面上,线程同步是为了解决由并发执行导致的不确定性问题,从而保证程序的正确性(Correctness)。
-
维护数据一致性(Data Consistency)
-
核心问题:多个线程交错执行非原子操作(如
counter++
),会导致最终结果依赖于不可控的执行顺序,从而产生错误。 -
同步的作用:通过互斥锁等机制,将一系列操作包装成一个原子性(Atomic) 的临界区(Critical Section)。使得这些操作要么全做,要么不做,对外看起来就像瞬间完成的一样,避免了交错执行,从而保证了共享数据处于一致的状态。
-
-
维护操作顺序(Ordering of Operations)
-
核心问题:线程A必须在线程B完成某项任务之后才能开始它的工作。例如,消费者线程必须等待生产者线程生产出数据后才能消费。
-
同步的作用:通过条件变量、信号量等机制,允许线程主动等待(阻塞)或通知他人,从而协调线程间操作的先后顺序,确保逻辑上的依赖性得到满足。
-
这些都是线程同步(通过加锁操作)所解决的一下问题。互斥锁(Mutex)解决了互斥访问的问题,即“一次只有一个线程能进”。但它无法解决条件等待的问题,即“如果条件不满足,我该怎么等?等的时候锁怎么办?”
条件变量(Condition Variable)就是实现这种“等待-唤醒”机制的工具。 它允许线程在某个条件不成立时高效地休眠,并在条件可能成立时被其他线程唤醒。
总结:同步是为了用性能换取正确性。一个没有正确性的程序,性能再好也毫无意义。因此,同步是并发编程的“必需品”,那么条件变量是为了解决在加锁情况下避免对CPU资源的过度浪费。
同步的根本目的:牺牲一部分性能,确保程序的正确性和确定性。
同步的必要性:正确性是程序的底线,是不可妥协的。
条件变量的核心价值:它是一种“聪明”的同步机制,在保证正确性的同时,极大地优化了性能,解决了“忙等待”这个性能杀手。
2.条件变量的接口说明
2.1 初始化和销毁
初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
cond
: 指向要初始化的条件变量的指针。
attr
: 条件变量属性,通常传NULL
使用默认属性。返回值: 成功返回0,失败返回错误码。
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
对于全局或静态条件变量,可以用宏直接初始化:
销毁
int pthread_cond_destroy(pthread_cond_t *cond);
用于销毁一个不再使用的条件变量,释放其资源。
2.2 等待(Waiting) - 核心接口
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
cond
: 要等待的条件变量。
mutex
: 一个已经由当前线程锁定的互斥锁。这是一个原子操作,它做了三件事:
解锁:释放绑定的互斥锁
mutex
,让其他线程有机会修改共享条件和获取锁。阻塞:将当前线程挂起,投入等待队列,让出CPU。
重新加锁:当被唤醒后,在从函数返回之前,会重新获取互斥锁
mutex
。为什么必须和互斥锁一起用?
检查条件(如buffer is empty
)和进入等待状态(wait
)必须是原子操作。否则,可能会发生:
线程A检查条件,发现不满足(缓冲区空),准备等待。
就在它调用
wait
之前,调度器切换到了线程B(生产者)。线程B生产了数据,发送了
signal
。但此时线程A还没在等待,这个 signal 丢失了。线程A恢复执行,调用
wait()
,但可能再也没有人来唤醒它了 -> 永久休眠。
使用互斥锁可以确保在检查条件和进入等待的整个过程中,其他线程无法修改条件
2.3 限时等待(Timed Wait)
int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
abstime
: 指定一个绝对时间(从1970年1月1日UTC时间开始的秒和纳秒),表示等待的最后期限。如果在
abstime
指定的时间前被唤醒,返回0。如果超时时间到仍未唤醒,返回
ETIMEDOUT
。用于避免线程因丢失信号或其他原因而无限期等待
2.4 唤醒(Signaling)
int pthread_cond_signal(pthread_cond_t *cond); 唤醒至少一个线程
发送一个信号给条件变量
cond
,唤醒至少一个正在等待该条件的线程(具体唤醒哪个取决于调度策略)。如果当前没有线程在等待,这个信号就什么也不做(即不会累积)。
效率较高,在只需唤醒一个消费者就能处理事件时使用(例如,缓冲区中只有一件商品)。
int pthread_cond_broadcast(pthread_cond_t *cond); 唤醒所有线程
发送一个信号,唤醒所有正在等待该条件变量的线程。
开销比
signal
大,通常在多个线程都能处理当前状态时使用(例如,缓冲区从满变为非满,所有生产者线程都可以被唤醒;或者资源可用性发生重大变化)
3. 生产消费者模型(CP模型)
3.1. 为什么(Why):为什么要用生产者-消费者模型?
核心目的:解耦(Decoupling)和平衡效率(Balancing Efficiency)。
想象一个现实生活中的例子:一个面包店。
-
没有模型(耦合紧密):顾客(消费者)必须站在面包师(生产者)旁边,每做好一个面包,面包师就亲手交给顾客。面包师快,顾客就得等着接;顾客没来,面包师做好了也没地方放,只能等着。效率极低,双方互相依赖、互相阻塞。
-
使用模型(解耦):在面包师和顾客之间引入一个货架(缓冲区)。
-
面包师只管生产面包,做好后就放到货架上。
-
顾客只管从货架上取走面包。
-
双方不需要知道对方的存在,也不需要直接交互。他们只和货架打交道。
-
这样做的好处是巨大的:
-
解耦(Decoupling):生产者和消费者之间不直接通信,而是通过缓冲区进行通信。这使得它们可以独立地、并发地执行,修改一方的代码不会直接影响另一方。
-
平衡速度差异(Speed Matching):生产者可能有时快有时慢(如处理网络请求),消费者也是如此。缓冲区就像一个“水库”,可以削峰填谷,平衡双方的速度差异,避免快的等待慢的。
-
提高整体吞吐量(Throughput):当生产者爆发时,它可以快速生产并填满缓冲区,然后去做别的事(或等待)。消费者可以持续地从缓冲区消费。反之亦然。这最大限度地利用了系统资源。
在计算机中的典型应用:
-
消息队列(如Kafka, RabbitMQ):应用程序(生产者)产生消息放入队列,其他服务(消费者)从队列取出处理。
-
线程池/任务队列:主线程(生产者)提交任务到队列,工作线程(消费者)从队列获取并执行任务。
-
数据处理管道:一个阶段处理完数据放入缓冲区,下一个阶段从缓冲区读取。
-
I/O 和计算的重叠:一个线程从磁盘读数据(生产)到缓冲区,另一个线程同时处理缓冲区中的数据(消费)。
3.2 是什么(What):模型的核心组成是什么?
生产者-消费者模型有三个核心组成部分:
-
生产者(Producer(s)):
-
职责:生成数据或任务单元。
-
行为:将生成的数据放入一个共享的缓冲区中。
-
问题:如果缓冲区满了,它必须等待。
-
-
消费者(Consumer(s)):
-
职责:处理数据或任务单元。
-
行为:从共享缓冲区中取出数据并进行处理。
-
问题:如果缓冲区空了,它必须等待。
-
-
缓冲区(Buffer):
-
职责:作为生产者和消费者之间的通信通道。
-
形态:通常是一个有大小限制的队列(FIFO)。它屏蔽了生产者和消费者的细节。
-
关键:对缓冲区的所有操作(放入和取出)都必须是线程安全的。
-
这个模型引出了两个关键的同步条件:
-
缓冲区空 -> 消费者必须等待,生产者放入后通知消费者。
-
缓冲区满 -> 生产者必须等待,消费者取出后通知生产者。
注:321原则,三种关系(生产者和消费者之间的同步与互斥),两个角色(生产者和消费者),一个交易场所(存放数据的容器)。
3.3 怎么做(How):如何用代码实现?(C++ with pthreads)
3.3.1 基于blockingQueue实现
BlockingQueue类模版
#ifndef BLOCKING_QUEUE_H #define BLOCKING_QUEUE_H#include <queue> #include <mutex> #include <condition_variable> #include <iostream>template <typename T> class BlockingQueue { public:explicit BlockingQueue(size_t max_size) : max_size_(max_size) {}// 放入数据,如果队列满则阻塞void put(const T& item) {std::unique_lock<std::mutex> lock(mutex_);// 等待队列不满not_full_.wait(lock, [this]() { return queue_.size() < max_size_; });queue_.push(item);std::cout << "Produced: " << item << " (Size: " << queue_.size() << ")" << std::endl;// 通知消费者队列不空(唤醒消费者)not_empty_.notify_one();}// 取出数据,如果队列空则阻塞T take() {std::unique_lock<std::mutex> lock(mutex_);// 等待队列不空not_empty_.wait(lock, [this]() { return !queue_.empty(); });T item = queue_.front();queue_.pop();std::cout << "Consumed: " << item << " (Size: " << queue_.size() << ")" << std::endl;// 通知生产者队列不满not_full_.notify_one();return item;}// 非阻塞尝试放入bool try_put(const T& item) {std::unique_lock<std::mutex> lock(mutex_);if (queue_.size() >= max_size_) {return false;}queue_.push(item);std::cout << "Produced: " << item << " (Size: " << queue_.size() << ")" << std::endl;not_empty_.notify_one();return true;}// 非阻塞尝试取出bool try_take(T& item) {std::unique_lock<std::mutex> lock(mutex_);if (queue_.empty()) {return false;}item = queue_.front();queue_.pop();std::cout << "Consumed: " << item << " (Size: " << queue_.size() << ")" << std::endl;not_full_.notify_one();return true;}size_t size() const {std::unique_lock<std::mutex> lock(mutex_);return queue_.size();}bool empty() const {std::unique_lock<std::mutex> lock(mutex_);return queue_.empty();}bool full() const {std::unique_lock<std::mutex> lock(mutex_);return queue_.size() >= max_size_;}private:std::queue<T> queue_;mutable std::mutex mutex_;std::condition_variable not_empty_;std::condition_variable not_full_;size_t max_size_; };
模拟生产与消费的过程
#include <iostream> #include <thread> #include <vector> #include <chrono> #include <random>// 随机数生成器 int get_random_number(int min, int max) {static std::random_device rd;static std::mt19937 gen(rd());std::uniform_int_distribution<> distrib(min, max);return distrib(gen); }// 生产者函数 void producer(BlockingQueue<int>& queue, int id) {int item = 0;while (true) {// 模拟生产耗时std::this_thread::sleep_for(std::chrono::milliseconds(get_random_number(100, 500)));// 生产一个产品int product = id * 1000 + item++;queue.put(product);// 生产一定数量后退出if (item > 10) break;}std::cout << "Producer " << id << " finished." << std::endl; }// 消费者函数 void consumer(BlockingQueue<int>& queue, int id) {int count = 0;while (true) {// 模拟消费耗时std::this_thread::sleep_for(std::chrono::milliseconds(get_random_number(200, 800)));// 消费一个产品int product = queue.take();count++;// 处理产品(这里只是打印)std::cout << "Consumer " << id << " processed: " << product << std::endl;// 消费一定数量后退出if (count > 15) break;}std::cout << "Consumer " << id << " finished." << std::endl; }int main() {const size_t BUFFER_SIZE = 5;const int NUM_PRODUCERS = 2;const int NUM_CONSUMERS = 3;BlockingQueue<int> queue(BUFFER_SIZE);std::vector<std::thread> producers;std::vector<std::thread> consumers;// 创建生产者线程for (int i = 0; i < NUM_PRODUCERS; ++i) {producers.emplace_back(producer, std::ref(queue), i + 1);}// 创建消费者线程for (int i = 0; i < NUM_CONSUMERS; ++i) {consumers.emplace_back(consumer, std::ref(queue), i + 1);}// 等待所有生产者完成for (auto& p : producers) {p.join();}// 等待所有消费者完成(可能需要额外机制确保消费者也能退出)// 这里简单等待,实际应用中可能需要优雅退出机制for (auto& c : consumers) {c.join();}std::cout << "All threads finished. Final queue size: " << queue.size() << std::endl;return 0; }
3.3.2 基于环形队列的生产消费模型
#include <iostream> #include <semaphore.h> #include <pthread.h> #include <unistd.h> #include <vector>#define BUFFER_SIZE 5class RingBufferSem { private:std::vector<int> buffer;int head;int tail;sem_t empty_sem; // 空闲槽位信号量sem_t full_sem; // 已填充槽位信号量sem_t mutex; // 互斥信号量public:RingBufferSem(int size = BUFFER_SIZE) : buffer(size), head(0), tail(0) {// 初始化信号量sem_init(&empty_sem, 0, size); // 初始有size个空闲槽位sem_init(&full_sem, 0, 0); // 初始没有已填充槽位sem_init(&mutex, 0, 1); // 二进制信号量,用于互斥}~RingBufferSem() {sem_destroy(&empty_sem);sem_destroy(&full_sem);sem_destroy(&mutex);}void produce(int item) {// 等待空闲槽位sem_wait(&empty_sem);// 获取互斥锁访问缓冲区sem_wait(&mutex);// 生产物品buffer[tail] = item;tail = (tail + 1) % buffer.size();std::cout << "Produced: " << item << " (head: " << head << ", tail: " << tail << ")" << std::endl;// 释放互斥锁sem_post(&mutex);// 增加已填充槽位计数sem_post(&full_sem);}int consume() {// 等待已填充槽位sem_wait(&full_sem);// 获取互斥锁访问缓冲区sem_wait(&mutex);// 消费物品int item = buffer[head];head = (head + 1) % buffer.size();std::cout << "Consumed: " << item << " (head: " << head << ", tail: " << tail << ")" << std::endl;// 释放互斥锁sem_post(&mutex);// 增加空闲槽位计数sem_post(&empty_sem);return item;} };// 生产者线程函数 void* producer_sem(void* arg) {RingBufferSem* rb = static_cast<RingBufferSem*>(arg);int item = 0;while (true) {rb->produce(item++);usleep(300000); // 生产间隔0.3秒}return nullptr; }// 消费者线程函数 void* consumer_sem(void* arg) {RingBufferSem* rb = static_cast<RingBufferSem*>(arg);while (true) {rb->consume();usleep(500000); // 消费间隔0.5秒(消费比生产慢)}return nullptr; }// 多生产者示例 void* multi_producer(void* arg) {RingBufferSem* rb = static_cast<RingBufferSem*>(arg);int thread_id = *(static_cast<int*>(arg));int item = thread_id * 1000;while (true) {rb->produce(item++);usleep(400000 + (rand() % 200000)); // 随机间隔}return nullptr; }int main() {RingBufferSem rb;pthread_t prod_thread, cons_thread;pthread_t prod_thread2; // 第二个生产者std::cout << "=== 单生产者单消费者示例 ===" << std::endl;pthread_create(&prod_thread, nullptr, producer_sem, &rb);pthread_create(&cons_thread, nullptr, consumer_sem, &rb);// 运行5秒后结束示例sleep(5);pthread_cancel(prod_thread);pthread_cancel(cons_thread);pthread_join(prod_thread, nullptr);pthread_join(cons_thread, nullptr);std::cout << "\n=== 多生产者单消费者示例 ===" << std::endl;int id1 = 1, id2 = 2;pthread_create(&prod_thread, nullptr, [](void* arg) -> void* {RingBufferSem* rb = static_cast<RingBufferSem*>(arg);int item = 1000;while (true) {rb->produce(item++);usleep(350000);}return nullptr;}, &rb);pthread_create(&prod_thread2, nullptr, [](void* arg) -> void* {RingBufferSem* rb = static_cast<RingBufferSem*>(arg);int item = 2000;while (true) {rb->produce(item++);usleep(450000);}return nullptr;}, &rb);pthread_create(&cons_thread, nullptr, consumer_sem, &rb);// 运行5秒sleep(5);pthread_cancel(prod_thread);pthread_cancel(prod_thread2);pthread_cancel(cons_thread);pthread_join(prod_thread, nullptr);pthread_join(prod_thread2, nullptr);pthread_join(cons_thread, nullptr);return 0; }
通过信号量就可以完美解决这些问题。