【Linux笔记】——线程同步条件变量与生产者消费者模型的实现
🔥个人主页🔥:孤寂大仙V
🌈收录专栏🌈:Linux
🌹往期回顾🌹:【Linux笔记】——线程互斥与互斥锁的封装
🔖流水不争,争的是滔滔不息
- 一、线程同步的简介
- 二、条件变量
- 三、条件变量函数
- 四、条件变量的封装
- 五、生产者消费者模型
- 基于BlockingQueue的生产者消费者模型
一、线程同步的简介
定义
线程同步是指在多线程环境中,通过某种机制确保多个线程在访问共享资源时能够有序、协调地进行,以避免数据不一致或竞争条件的问题。线程同步的主要目的是控制线程的执行顺序,确保共享资源在某一时刻只能被一个线程访问或修改。
必要性
在多线程编程中,多个线程可能会同时访问和修改共享资源(如变量、文件、数据库等)。如果没有适当的同步机制,可能会导致以下问题:
- 数据竞争:多个线程同时修改同一数据,导致数据状态不一致。
- 死锁:多个线程相互等待对方释放资源,导致程序无法继续执行。
- 资源争用:多个线程竞争同一资源,导致性能下降或程序崩溃。
为实现线程同步这里引入条件变量
二、条件变量
条件变量是一种用于线程同步的机制,通常与互斥锁(mutex)结合使用,用于在多线程环境中实现线程间的协调。条件变量的主要作用是允许线程在某些条件不满足时进入等待状态,并在条件满足时被唤醒继续执行。
在多线程并发中,光有互斥锁只能保证临界区的互斥访问,但有些情况线程即使拿到了锁,也无法立刻执行任务,比如:线程A 拿到锁准备从任务队列里取任务,结果发现队列是空的 ,此时线程A该怎么办?它不能一直占着锁,否则别的线程(比如生产任务的线程B)就无法加锁,也就无法往队列中添加任务了。
条件变量的作用:在资源条件未满足时,挂起当前线程的执行(释放锁)并进入等待状态,直到条件满足被唤醒,再继续执行。
假如苹果是共享资源,不能被多个线程同时访问(临界资源),盘子是共享区(临界区),锁是控制访问盘子的唯一通道谁持有锁谁能动苹果,线程们是一群想吃苹果的人,条件变量是通知机制,用来告诉排队的人可以去拿锁。
有一堆线程想去盘子里拿到苹果。只有拿到锁得的人才能查看到盘子里是否有苹果。如果拿到锁发现没有苹果,就把锁放回去,然后进入等待队列。条件变量上场,等有线程放了苹果病通知,就会“敲钟”,让等待队列里的线程醒来。被唤醒的线程们就会重新竞争锁,拿到锁的线程再次检查苹果是否存在,存在就吃掉,不存在就继续等。
条件变量是一个“协调者”,帮你把拿不到锁的线程有序地放到队列里睡觉,然后在条件满足时“敲钟”叫醒他们,大家再去抢锁判断是否可以操作资源。
三、条件变量函数
初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t
*restrict attr);
参数:cond:要初始化的条件变量,attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict
mutex);
参数:cond:要在这个条件变量上等待,mutex:互斥量
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
实例
#include <iostream>
#include <vector>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <string>const int NUM=5;
int cnt=1000;using namespace std;pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER; //定义全局锁
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER; //定义全局信号量void* pthreadrun(void* args)
{string name=static_cast<const char*>(args);while(true){ pthread_mutex_lock(&glock); //上锁pthread_cond_wait(&gcond,&glock); //等待队列std::cout << name << " 计算: " << cnt << std::endl;cnt--;pthread_mutex_unlock(&glock); //解锁}
}int main()
{vector<pthread_t> pthreads;for(int i=0;i<NUM;i++){pthread_t tid;char* name=new char [64];snprintf(name,64,"thread-%d", i);int n=pthread_create(&tid,nullptr,pthreadrun,name);if(n==0){cout<<"创建新线程成功"<<endl;}pthreads.push_back(tid);sleep(1);}sleep(3);while(true){std::cout << "唤醒所有线程... " << std::endl;pthread_cond_broadcast(&gcond);sleep(1);}for(auto &id: pthreads){int n=pthread_join(id,nullptr);if(n==0){cout<<"进程回收成功"<<endl;}}
}
我们注意到等待队列是在临界区内,加锁和解锁之间写的
while(true){ pthread_mutex_lock(&glock); //上锁pthread_cond_wait(&gcond,&glock); //等待队列std::cout << name << " 计算: " << cnt << std::endl;cnt--;pthread_mutex_unlock(&glock); //解锁}
上面苹果的那个比喻,我们不是先申请锁然后去临界区内申请资源吗,没资源释放锁进入等待队列。流程:线程先申请锁成功,判断资源是否可用如果不可用就要自动释放锁进入等待队列(注意这里的的释放锁并进入等待队列是pthread_cond_wait提供的自动的),等唤醒时重新自动上锁,再重新判断资源状态。如果拿到资源执行完正常的释放锁。
注意:
pthread_cond_wait 中的“释放” → 是为了让别的线程能进入临界区补充资源。
最后的 pthread_mutex_unlock → 是整个线程工作完之后的“正常释放锁”。
四、条件变量的封装
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"using namespace MutexModule;
using namespace std;namespace CondModule
{class Cond{public:Cond(){pthread_cond_init(&_cond,nullptr);}void Wait(Mutex& mutex) //阻塞队列进行等待{int n=pthread_cond_wait(&_cond,mutex.Get());}void Signal() //唤醒条件变量下等待的进程{int n=pthread_cond_signal(&_cond);}void Broadcast() //唤醒条件变量下所有等待的进程{ int n=pthread_cond_broadcast(&_cond);}~Cond(){pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};
}
直接pthread库封装成类就完事了。
五、生产者消费者模型
生产者消费者模型是一种经典的并发编程模型,用于解决多线程环境下的资源分配和同步问题。该模型涉及两类线程:生产者线程和消费者线程。生产者负责生成数据并将其放入共享缓冲区,而消费者则从缓冲区中取出数据进行处理。
生产者消费者模式就是通过⼀个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于⼀个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型:三种要素,生产者、消费者、一个交易场所。中间的"交易场所就是一快"内存"空间"。
三种关系:
生产者之间:竞争关系,互斥关系。
消费者和消费者之间:互斥关系。
生产者和消费者之间:互斥和同步
两种角色:
生产者角色和消费者角色(线程承担)。
一个交易场所:
以特定结构构成的一种"内存空间"。
生产者消费者模型的优势:
生产过程和消费过程解耦,支持忙闲不均,提高效率(不体现在“交易场所”上,而是未来获取任务和处理任务)。
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是⼀种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
源码代码地址:生产者消费者模型源码
Blockqueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "Cond.hpp"using namespace std;
using namespace CondModule;
using namespace MutexModule;const int num = 5;template <typename T>
class Blockqueue
{
public:Blockqueue(int cap = num): _cap(cap), _csleep_num(0), _psleep_num(0)//{ }void Equeue(const T &in) // 生产者写入数据{LockGuard lockguard(_mutex); // RAII自动上锁解锁while (Qfull()) // 如果阻塞队列是满的{_psleep_num++;_full_cond.Wait(_mutex); // 生产者线程进入等待队列_psleep_num--;}_q.push(in);if (_csleep_num > 0){_empty_cond.Signal(); // 唤醒消费者cout<<"唤醒消费者"<<endl;}}T pop() // 消费者读数据{LockGuard lockguard(_mutex);while (Qempty()) // 如果阻塞队列是空的{_csleep_num++;_empty_cond.Wait(_mutex);//消费者线程进入等待队列_csleep_num--;}T data=_q.front();_q.pop();if(_psleep_num >0){_full_cond.Signal(); //唤醒生产者cout<<"唤醒生产者"<<endl;}return data;}~Blockqueue(){}bool Qfull(){return _q.size() >= num;}bool Qempty(){return _q.empty();}private:queue<T> _q;int _cap;Mutex _mutex;Cond _full_cond;Cond _empty_cond;int _psleep_num;int _csleep_num;
};
void Equeue(const T &in) // 生产者写入数据{LockGuard lockguard(_mutex); // RAII自动上锁解锁while (Qfull()) // 如果阻塞队列是满的{_psleep_num++;_full_cond.Wait(_mutex); // 生产者线程进入等待队列_psleep_num--;}_q.push(in);if (_csleep_num > 0){_empty_cond.Signal(); // 唤醒消费者cout << "唤醒消费者" << endl;}}T pop() // 消费者读数据{LockGuard lockguard(_mutex);while (Qempty()) // 如果阻塞队列是空的{_csleep_num++;_empty_cond.Wait(_mutex); // 消费者线程进入等待队列_csleep_num--;}T data = _q.front();_q.pop();if (_psleep_num > 0){_full_cond.Signal(); // 唤醒生产者cout << "唤醒生产者" << endl;}return data;}
生产者写入数据与消费者读数据放一起聊
生产者写入数据,无可厚非先上锁,判断如果阻塞队列是满的,那么我们就要用条件变量让生产者线程进行等待队列。但是这里一定要注意生产者线程写入和消费者线程读是并发执行的,如果不计数就可能在某些场景中浪费唤醒操作,甚至唤醒失败。当消费者读数据的时候,读完数据要唤醒生产者继续写数据,这时候如果写成
if(true)
{_full_cond.Signal();
}
想想这样粒度不够细,会不会白敲钟,造成效率浪费。计数后
if (_psleep_num > 0)
{_full_cond.Signal(); // 唤醒生产者cout << "唤醒生产者" << endl;}
这样有设计确实有生产者在等,才敲钟,是不是更精细,效率更高。
接着上面生产者线程进入等待队列后,顺一下流程。生产者往阻塞队列中写入数据,这时候判断消费者线程是否阻塞,唤醒消费者(这里看到可能有点懵,这么设计就是源于生产者消费者模型的并发执行的)。消费者读数据跟上述生产者写数据思路一样。
这段代码最绕的就是这个计数了,_psleep_num记录当前有多少个生产者线程"等待着空位",_csleep_num记录有多少个消费者线程在“等着有数据”。这样设计就是为了避免无意义的唤醒,提高效率,避免无意义的浪费。
main.cc
#include "Blockqueue.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;void *consumer(void *args) // 消费者
{Blockqueue<int> *bq = static_cast< Blockqueue<int> *>(args);while(true){sleep(20);int data=bq->pop();cout<<"消费者消费数据->"<<data<<endl;}
}void *productor(void *args) // 生产者
{Blockqueue<int> *bq = static_cast< Blockqueue<int> *>(args);int data=1;while (true){sleep(1);cout<<"生产者写入数据-> "<<data<<endl;bq->Equeue(data);data++;}
}int main()
{Blockqueue<int> *bq = new Blockqueue<int>();pthread_t c, p;pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
创建阻塞队列对象,创建线程,对了这里写的是单线程,消费者调用pop()让消费者读数据,生产者调用Equeue()让消费者写数据。
运行结果
代码地址:生产者消费者模型源码