当前位置: 首页 > ai >正文

C++学习:六个月从基础到就业——多线程编程:条件变量

C++学习:六个月从基础到就业——多线程编程:条件变量

本文是我C++学习之旅系列的第五十六篇技术文章,也是第四阶段"并发与高级主题"的第三篇,介绍C++11标准中的条件变量(condition variable)及其在线程同步中的应用。查看完整系列目录了解更多内容。

引言

在上一篇文章中,我们介绍了互斥量和锁,它们能够保护共享资源,防止数据竞争。然而,仅靠互斥量无法高效地实现线程间的通信和等待。例如,当一个线程需要等待某个条件满足时,使用互斥量进行"忙等待"会浪费CPU资源。

条件变量(std::condition_variable)解决了这个问题,它允许线程高效地等待特定条件成立,并在条件满足时被其他线程唤醒。条件变量与互斥量配合使用,是实现生产者-消费者模式、线程池等并发模式的关键组件。

目录

  • 多线程编程:条件变量
    • 引言
    • 目录
    • 条件变量基础
      • 条件变量的概念
      • std::condition_variable API
      • wait与notifiy机制
      • 避免虚假唤醒
    • 条件变量使用模式
      • 生产者-消费者模式
      • 多生产者-多消费者
      • 等待条件满足
      • 超时等待
    • std::condition_variable_any
    • 实际应用案例
      • 线程安全消息队列
      • 简单线程池
      • 实现读写锁
    • 常见问题与陷阱
      • 丢失唤醒
      • 条件变量与互斥量的关系
      • 何时使用notify_all而非notify_one
    • C++20中的信号量与锁存器
    • 总结

条件变量基础

条件变量的概念

条件变量是一种同步原语,它用于线程间的通信,具体来说:

  • 一个或多个线程可以等待某个条件成立
  • 当条件成立时,另一个线程可以通知等待的线程继续执行
  • 条件变量避免了轮询(忙等待)带来的CPU资源浪费

条件变量必须与互斥量配合使用,以避免竞态条件。

std::condition_variable API

C++11在<condition_variable>头文件中提供了std::condition_variable类,其主要成员函数包括:

// 等待函数
void wait(std::unique_lock<std::mutex>& lock);
template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);// 超时等待函数
template<class Rep, class Period>
cv_status wait_for(std::unique_lock<std::mutex>& lock,const std::chrono::duration<Rep, Period>& rel_time);
template<class Rep, class Period, class Predicate>
bool wait_for(std::unique_lock<std::mutex>& lock,const std::chrono::duration<Rep, Period>& rel_time,Predicate pred);// 唤醒函数
void notify_one();
void notify_all();

注意:条件变量只能与std::unique_lock<std::mutex>一起使用,而不能与std::lock_guard一起使用,因为等待过程需要临时释放锁。

wait与notifiy机制

条件变量的核心机制是"等待-通知"模式,其工作流程如下:

  1. 等待线程首先获取互斥锁
  2. 检查条件是否满足;如不满足,调用wait进入等待状态
  3. wait内部会原子地释放互斥锁并阻塞线程
  4. 通知线程获取同一互斥锁,修改条件状态,然后调用notify_one()notify_all()
  5. 当条件变量收到通知,等待线程被唤醒,重新获取互斥锁,然后继续执行
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>std::mutex mtx;
std::condition_variable cv;
bool ready = false;void worker() {std::unique_lock<std::mutex> lock(mtx);// 等待直到ready变为truecv.wait(lock, []{ return ready; });// 继续执行,此时互斥锁已被重新获取std::cout << "Worker thread is processing data" << std::endl;
}void setter() {std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟准备工作{std::lock_guard<std::mutex> lock(mtx);ready = true; // 设置条件std::cout << "Data is ready, notifying worker thread" << std::endl;} // 离开作用域,释放锁cv.notify_one(); // 通知一个等待的线程
}int main() {std::thread workerThread(worker);std::thread setterThread(setter);workerThread.join();setterThread.join();return 0;
}

避免虚假唤醒

虚假唤醒(spurious wakeup)是指线程可能在没有收到明确通知的情况下被唤醒。这是操作系统层面的实现细节,C++标准要求程序员处理这种情况。

正确的做法是使用带谓词(predicate)的wait重载版本:

cv.wait(lock, []{ return condition; });// 等价于
while (!condition) {cv.wait(lock);
}

谓词版本会在唤醒后再次检查条件,确保线程只在条件真正满足时才继续执行。

条件变量使用模式

生产者-消费者模式

条件变量最常见的应用是实现生产者-消费者模式:

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>template<typename T>
class ThreadSafeQueue {
private:std::queue<T> queue;mutable std::mutex mutex;std::condition_variable cond;public:void push(T value) {{std::lock_guard<std::mutex> lock(mutex);queue.push(std::move(value));}cond.notify_one(); // 通知一个等待的消费者}T pop() {std::unique_lock<std::mutex> lock(mutex);cond.wait(lock, [this]{ return !queue.empty(); }); // 等待队列非空T value = std::move(queue.front());queue.pop();return value;}bool try_pop(T& value) {std::lock_guard<std::mutex> lock(mutex);if (queue.empty()) {return false;}value = std::move(queue.front());queue.pop();return true;}bool empty() const {std::lock_guard<std::mutex> lock(mutex);return queue.empty();}
};// 使用示例
void producer(ThreadSafeQueue<int>& queue) {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟工作std::cout << "Producing: " << i << std::endl;queue.push(i);}
}void consumer(ThreadSafeQueue<int>& queue) {for (int i = 0; i < 10; ++i) {int value = queue.pop(); // 等待数据std::cout << "Consuming: " << value << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟处理}
}int main() {ThreadSafeQueue<int> queue;std::thread producerThread(producer, std::ref(queue));std::thread consumerThread(consumer, std::ref(queue));producerThread.join();consumerThread.join();return 0;
}

多生产者-多消费者

对于多个生产者和消费者的情况,我们需要使用notify_all()来通知所有等待的线程:

void push(T value) {{std::lock_guard<std::mutex> lock(mutex);queue.push(std::move(value));}cond.notify_all(); // 通知所有等待的消费者
}

在多消费者模型中,每个消费者应该检查是否真正获取到了数据,因为可能多个消费者同时被唤醒,但队列中只有一个元素。

等待条件满足

条件变量也常用于等待某个条件满足,例如等待任务完成或状态改变:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>class Task {
private:bool completed = false;std::mutex mutex;std::condition_variable cv;public:void perform() {std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟任务执行{std::lock_guard<std::mutex> lock(mutex);completed = true;std::cout << "Task completed" << std::endl;}cv.notify_all(); // 通知所有等待者}void wait_for_completion() {std::unique_lock<std::mutex> lock(mutex);cv.wait(lock, [this]{ return completed; });std::cout << "Task completion confirmed" << std::endl;}
};int main() {Task task;std::thread performer(&Task::perform, &task);std::thread waiter(&Task::wait_for_completion, &task);performer.join();waiter.join();return 0;
}

超时等待

在实际应用中,无限期等待可能不是最佳选择。使用wait_forwait_until可以实现超时等待:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>std::mutex mtx;
std::condition_variable cv;
bool ready = false;bool wait_with_timeout() {std::unique_lock<std::mutex> lock(mtx);// 等待条件,最多等待3秒bool result = cv.wait_for(lock, std::chrono::seconds(3), []{ return ready; });if (result) {std::cout << "Condition satisfied" << std::endl;} else {std::cout << "Timeout occurred" << std::endl;}return result;
}void signal_after(int seconds) {std::this_thread::sleep_for(std::chrono::seconds(seconds));{std::lock_guard<std::mutex> lock(mtx);ready = true;}cv.notify_one();std::cout << "Signal sent" << std::endl;
}int main() {// 测试提前通知std::thread t1(signal_after, 2);  // 2秒后发信号std::thread t2(wait_with_timeout);t1.join();t2.join();// 重置条件ready = false;// 测试超时std::thread t3(signal_after, 5);  // 5秒后发信号(超过超时时间)std::thread t4(wait_with_timeout);t3.join();t4.join();return 0;
}

std::condition_variable_any

标准库还提供了std::condition_variable_any类,它与std::condition_variable类似,但可以与任何满足"可锁定"概念的互斥量一起使用,而不仅限于std::mutex

#include <condition_variable>
#include <mutex>
#include <shared_mutex>std::shared_mutex rwmutex;
std::condition_variable_any cv_any;// 可以使用unique_lock<shared_mutex>
void wait_for_condition() {std::unique_lock<std::shared_mutex> lock(rwmutex);cv_any.wait(lock, []{ return condition; });
}// 甚至可以使用自定义锁类型
class CustomLock {// 实现lock(), unlock(), ...
};void wait_with_custom_lock() {CustomLock lock;cv_any.wait(lock, []{ return condition; });
}

注意:condition_variable_any的灵活性是以性能为代价的,如果只需要与std::mutex一起使用,建议使用std::condition_variable

实际应用案例

线程安全消息队列

以下是一个完整的线程安全消息队列实现,支持超时和关闭操作:

#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <utility>
#include <chrono>template<typename T>
class MessageQueue {
private:std::queue<T> queue_;mutable std::mutex mutex_;std::condition_variable cv_;bool closed_ = false;public:MessageQueue() = default;~MessageQueue() {close();}// 队列不应被复制或移动MessageQueue(const MessageQueue&) = delete;MessageQueue& operator=(const MessageQueue&) = delete;// 将元素推入队列void push(T value) {{std::lock_guard<std::mutex> lock(mutex_);if (closed_) return;queue_.push(std::move(value));}cv_.notify_one();}// 尝试从队列中弹出一个元素,非阻塞bool try_pop(T& value) {std::lock_guard<std::mutex> lock(mutex_);if (queue_.empty() || closed_) return false;value = std::move(queue_.front());queue_.pop();return true;}// 等待并弹出一个元素,如果队列关闭返回falsebool wait_and_pop(T& value) {std::unique_lock<std::mutex> lock(mutex_);cv_.wait(lock, [this]{ return !queue_.empty() || closed_; });if (queue_.empty() || closed_) return false;value = std::move(queue_.front());queue_.pop();return true;}// 带超时的等待弹出template<typename Rep, typename Period>bool wait_and_pop_for(T& value, const std::chrono::duration<Rep, Period>& timeout) {std::unique_lock<std::mutex> lock(mutex_);if (!cv_.wait_for(lock, timeout, [this]{ return !queue_.empty() || closed_; })) {return false; // 超时}if (queue_.empty() || closed_) return false;value = std::move(queue_.front());queue_.pop();return true;}// 检查队列是否为空bool empty() const {std::lock_guard<std::mutex> lock(mutex_);return queue_.empty();}// 获取队列大小size_t size() const {std::lock_guard<std::mutex> lock(mutex_);return queue_.size();}// 关闭队列,唤醒所有等待线程void close() {{std::lock_guard<std::mutex> lock(mutex_);closed_ = true;}cv_.notify_all();}// 检查队列是否已关闭bool is_closed() const {std::lock_guard<std::mutex> lock(mutex_);return closed_;}
};

简单线程池

条件变量在线程池实现中非常有用,可以实现工作线程的睡眠和唤醒:

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <atomic>class ThreadPool {
private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queue_mutex;std::condition_variable condition;std::atomic<bool> stop;public:ThreadPool(size_t threads) : stop(false) {for (size_t i = 0; i < threads; ++i) {workers.emplace_back([this] {while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(queue_mutex);// 等待直到有任务或线程池停止condition.wait(lock, [this] { return stop || !tasks.empty(); });// 如果线程池停止且无任务,退出if (stop && tasks.empty()) {return;}// 获取任务task = std::move(tasks.front());tasks.pop();}// 执行任务task();}});}}// 向线程池添加任务template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;// 创建包装任务auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> result = task->get_future();{std::lock_guard<std::mutex> lock(queue_mutex);// 线程池停止后不接受新任务if (stop) {throw std::runtime_error("enqueue on stopped ThreadPool");}// 添加任务到队列tasks.emplace([task]() { (*task)(); });}// 通知一个等待中的线程condition.notify_one();return result;}// 停止线程池~ThreadPool() {{std::lock_guard<std::mutex> lock(queue_mutex);stop = true;}// 通知所有线程停止condition.notify_all();// 等待所有线程结束for (std::thread &worker : workers) {worker.join();}}
};// 使用示例
int main() {ThreadPool pool(4);  // 创建4个工作线程的线程池// 添加一些任务auto result1 = pool.enqueue([](int ms) {std::this_thread::sleep_for(std::chrono::milliseconds(ms));return "Task 1 completed after " + std::to_string(ms) + "ms";}, 1000);auto result2 = pool.enqueue([](int a, int b) {return a + b;}, 10, 20);// 获取结果std::cout << result1.get() << std::endl;std::cout << "Sum is: " << result2.get() << std::endl;return 0;
}

实现读写锁

条件变量也可以用来实现自定义同步原语,例如读写锁:

#include <mutex>
#include <condition_variable>class ReadWriteLock {
private:std::mutex mtx;std::condition_variable cv;unsigned int readers = 0;bool writer = false;public:void read_lock() {std::unique_lock<std::mutex> lock(mtx);// 等待直到没有写者cv.wait(lock, [this] { return !writer; });++readers;}void read_unlock() {std::unique_lock<std::mutex> lock(mtx);--readers;if (readers == 0) {// 如果最后一个读者,通知可能等待的写者cv.notify_one();}}void write_lock() {std::unique_lock<std::mutex> lock(mtx);// 等待直到没有读者和其他写者cv.wait(lock, [this] { return !writer && readers == 0; });writer = true;}void write_unlock() {std::unique_lock<std::mutex> lock(mtx);writer = false;// 通知所有等待的读者和写者cv.notify_all();}
};

常见问题与陷阱

丢失唤醒

条件变量的一个常见问题是"丢失唤醒",即线程A发送通知,但线程B尚未开始等待,导致通知丢失。正确的做法是在发送通知前先修改共享状态:

// 正确方式
{std::lock_guard<std::mutex> lock(mutex);ready = true;  // 先改变状态
}
cv.notify_one();  // 再发送通知// 等待线程
{std::unique_lock<std::mutex> lock(mutex);cv.wait(lock, []{ return ready; });  // 检查状态
}

条件变量与互斥量的关系

条件变量与互斥量必须正确配合使用:

  1. 通知时,通常先获取锁,修改条件,然后释放锁再通知
  2. 等待时,必须先获取锁,检查条件,不满足则调用wait
  3. wait内部会原子地释放锁并阻塞,被唤醒后重新获取锁

何时使用notify_all而非notify_one

notify_one()notify_all()的选择取决于应用场景:

  • 使用notify_one()当:

    • 只有一个线程需要处理任务(如单消费者模式)
    • 多个等待线程执行相同的任务,一个就够了
  • 使用notify_all()当:

    • 所有等待线程都应被唤醒(如屏障同步)
    • 不确定哪个线程应该被唤醒(如多种不同条件)
    • 状态变化可能满足多个线程的等待条件

过度使用notify_all()可能导致不必要的"惊群效应"(thundering herd problem),造成性能下降。

C++20中的信号量与锁存器

C++20引入了更多的同步原语,包括信号量(std::counting_semaphorestd::binary_semaphore)和锁存器(std::latchstd::barrier),它们在某些场景下比条件变量更适合:

#include <semaphore>
#include <thread>
#include <iostream>// 二元信号量实例(C++20)
std::binary_semaphore sem(0); // 初始值为0void worker() {std::cout << "Worker waiting for signal..." << std::endl;sem.acquire(); // 等待信号std::cout << "Worker received signal, continuing..." << std::endl;
}void signaler() {std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "Sending signal..." << std::endl;sem.release(); // 发送信号
}int main() {std::thread t1(worker);std::thread t2(signaler);t1.join();t2.join();return 0;
}

与条件变量相比,信号量和锁存器的优势在于使用更简单、不需要互斥量配合,且某些情况下性能更好。

总结

条件变量是C++11中实现线程同步和通信的强大工具。正确使用条件变量,需要特别注意以下几点:

  1. 总是与互斥量配合使用,保护共享状态
  2. 使用谓词函数处理虚假唤醒
  3. 先修改状态,再发送通知,避免丢失唤醒
  4. 根据实际需求选择notify_one()还是notify_all()

条件变量适用于生产者-消费者模式、线程池、屏障同步等多种并发场景。随着C++20引入信号量和锁存器,我们现在有了更丰富的同步工具选择。

在下一篇文章中,我们将探讨std::futurestd::promise,它们为C++并发编程提供了基于任务的异步模型。


这是我C++学习之旅系列的第五十六篇技术文章。查看完整系列目录了解更多内容。

http://www.xdnf.cn/news/7836.html

相关文章:

  • 诊断仪进行CAN采样点测试的原理
  • 管理会议最佳实践:高效协同与价值最大化
  • ctfhub技能书http协议
  • 2570. 合并两个二维数组 - 求和法
  • RTMP协议解析【三】
  • 【论文复现】——基于NDT与ICP结合的点云配准算法(matlab版)
  • 网页 HTML布局(详解)
  • 精益数据分析(74/126):从愿景到落地的精益开发路径——Rally的全流程管理实践
  • 新能源汽车充电桩资源如何利用资源高效配置?
  • Linux 内核音视频架构(V4L2 )介绍
  • 算法中的数学:欧拉函数
  • 工作流引擎-03-聊一聊什么是流程引擎(Process Engine)?
  • 用户缓冲区
  • JavaScript 函数、方法、限定符
  • 关于Vue自定义组件封装的属性/事件/插槽的透传问题
  • 密码合集(不定期更新)
  • 【VS2017】cpp文件字符编码异常导致编译报错
  • 老牌硬件检测工具的现代应用场景分析
  • 【动手学深度学习】1.3. 各种机器学习问题
  • spring的注入方式都有什么区别
  • 网页表格转换为markdown
  • 仅修改文件名会导致文件的MD5值发生变化吗?
  • 制造业ERP系统选型与实施避坑探讨
  • java加强 -网络编程
  • iframe加载或者切换时候,短暂的白屏频闪问题解决
  • Oracle Enqueue Names
  • MySQL中的重要常见知识点(入门到入土!)
  • QT中信号和事件的区别
  • Panasonic松下焊接机器人节气
  • Web3 领域中的一些专业术语