线程(三) linux 同步
目录
概念补充
条件变量
操作
例:多线程抢票
封装
生产者消费者模型
生产者和消费者之间的关系
BlockQueue(阻塞队列)
单生产单消费
信号量
简介
操作
多生产者多消费者RingQueue(环形队列)代码
sem封装
信号量与锁
小知识
概念补充
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件.
条件变量
简单来说,条件变量相当于一个阻塞队列,将线程入队列(等待),满足条件就出队列(唤醒)
操作
全局条件变量
和mutex相似,使用宏初始化一个全局的条件变量
局部条件变量
attr传nullptr即可
等待
cond为条件变量,mutex为锁
唤醒
signal为唤醒一个线程
broadcast为唤醒所有线程
例:多线程抢票
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;int tickets = 100;
void* func(void* args)
{std::string name(static_cast<char*>(args));while(true){pthread_mutex_lock(&lock);pthread_cond_wait(&cond,&lock);if(tickets==0) {pthread_mutex_unlock(&lock);break;}std::cout<<name<<" "<<--tickets<<std::endl;pthread_mutex_unlock(&lock);}return (void*)0;
}int main()
{pthread_t t1,t2,t3;pthread_create(&t1,nullptr,func,(void*)"t1");pthread_create(&t2,nullptr,func,(void*)"t2");pthread_create(&t3,nullptr,func,(void*)"t3");while(true){pthread_cond_signal(&cond);sleep(0.3);if(tickets==0){pthread_cond_broadcast(&cond);break;}}pthread_join(t1,nullptr);pthread_join(t2,nullptr);pthread_join(t3,nullptr);return 0;
}
每个线程执行到wait时会阻塞住,
为什么wait要传锁,
因为wait会让线程等待,如果线程申请到了锁等待,那么其他线程就无法申请到锁,所以wait会先释放锁再等待,等被唤醒后先申请锁再向下执行,保证互斥.
封装
#pragma once
#include <iostream>
#include <pthread.h>
#include "mutex.hpp"class Cond
{pthread_cond_t _cond;
public: Cond(){pthread_cond_init(&_cond,nullptr);}void Wait(LockGuard& lock){int n = pthread_cond_wait(&_cond,lock.Get());(void)n;}void Notifyone(){int n = pthread_cond_signal(&_cond);(void)n;}void Notifyall(){int n = pthread_cond_broadcast(&_cond);(void)n;}~Cond(){pthread_cond_destroy(&_cond);}
};
生产者消费者模型
和生活中的超市类似,就是让不同线程作为生产产品者和消费产品者,临界区作为厂家和消费者的中转站超市.
为什么存在这样的模型:
1.减少生产和消费过程中产生的成本
2.支持生产和消费的忙闲不均
3.维护松耦合关系
减少成本问题: 比如一个方便面厂家,生产方便面都是成批生产,而消费者一般只会按个按箱购买,如果两者直接交易,厂家生产一袋方便面,消费者购买,这样厂家的成本会很高;如果厂家生产一批,消费者也买不了一批产品,消费者成本会很高.
支持忙闲不均和松耦合关系问题: 还是上面的例子,厂家可以生产一批放到超市,然后等消费者慢慢消费,生产和消费没有直接关系
生产者和消费者之间的关系
生产者之间 : 互斥关系
消费者之间 : 互斥关系
生产者与消费者之间 : 互斥+同步关系
BlockQueue(阻塞队列)
在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
//<BlockQueue.hpp>
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "mutex.hpp"
#include "cond.hpp"//上面封装的条件变量const static u_int32_t CAP = 5;template<typename T>
class BlockQueue
{std::queue<T> _q;u_int32_t _cap;pthread_mutex_t _lock;Cond _c_cond;//消费者Cond _p_cond;//生产者unsigned int _c_wait_num;//当前消费者等待的个数unsigned int _p_wait_num;//当前生产者等待的个数 为了防止唤醒次数过多或唤醒无意义bool isFull(){return _q.size()>=_cap;}bool isEmpty(){return _q.empty();}public: BlockQueue(u_int32_t cap = CAP) :_cap(cap){}void Enqueue(const T& data){{LockGuard lock(&_lock);while(isFull()) //while循环防止wait调用失败和伪唤醒问题{_p_wait_num++;_p_cond.Wait(lock);_p_wait_num--;}_q.push(data);if(_c_wait_num>0)_c_cond.Notifyone();}}void Pop(T* retval){{LockGuard lock(&_lock);while(isEmpty()) //while循环防止wait调用失败和伪唤醒问题{_c_wait_num++;_c_cond.Wait(lock);_c_wait_num--;}*retval = _q.front();_q.pop();if(_p_wait_num>0)_p_cond.Notifyone();}}~BlockQueue(){}};
单生产单消费
#include "blockqueue.hpp"
#include <iostream>
#include <string>
#include <unistd.h>struct Data
{BlockQueue<int>* bq;std::string name;
};void *consumer(void* args)
{Data* data = static_cast<Data*>(args);int retval = 0;while(true){data->bq->Pop(&retval);std::cout<<data->name<<"消费了 " <<retval<<std::endl; //sleep(1);} return (void*)0;
}void *productor(void* args)
{Data* data = static_cast<Data*>(args);int num = 1;while(true){data->bq->Enqueue(num);std::cout<<data->name<<"生产了 "<<num++<<std::endl;sleep(1);}return (void*)0;
}int main()
{BlockQueue<int>* bq = new BlockQueue<int>();pthread_t c,p;Data ctd = {bq,"消费者"};pthread_create(&c,nullptr,consumer,(void*)&ctd);Data ptd = {bq,"生产者"};pthread_create(&p,nullptr,productor,(void*)&ptd);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
对于多生产和多消费,BlockQueue.hpp也支持
信号量
简介
信号量针对多线程并发访问一块资源中的不同部分.
本质上,就是一个描述资源数量的计数器
操作
sem_init 初始化信号量
pshared 用来表示线程使用还是进程使用,0表示线程间使用
value 信号量的初始值
sem_destroy 销毁信号量
sem_wait 信号量--
sem_post 信号量++
多生产者多消费者RingQueue(环形队列)代码
生产者关注空余空间资源,消费者关注数据资源
sem封装 <sem.hpp>
#pragma once
#include <iostream>
#include <semaphore.h>#define NUM 5class Sem
{sem_t _sem;int _initnum;
public:Sem(int num = NUM):_initnum(num){sem_init(&_sem,0,_initnum);}void P(){int n = sem_wait(&_sem);(void)n;}void V(){int n = sem_post(&_sem);(void)n;}~Sem(){sem_destroy(&_sem);}
};
<mutex.hpp>
#pragma once
#include <iostream>
#include <unistd.h>class LockGuard
{pthread_mutex_t* _lock;public:LockGuard(pthread_mutex_t* lock):_lock(lock){pthread_mutex_lock(_lock);} pthread_mutex_t* Get(){return _lock;}~LockGuard(){pthread_mutex_unlock(_lock);}
};
<cond.hpp>
#pragma once
#include <iostream>
#include <pthread.h>
#include "mutex.hpp"class Cond
{pthread_cond_t _cond;
public: Cond(){pthread_cond_init(&_cond,nullptr);}void Wait(LockGuard& lock){int n = pthread_cond_wait(&_cond,lock.Get());(void)n;}void Notifyone(){int n = pthread_cond_signal(&_cond);(void)n;}void Notifyall(){int n = pthread_cond_broadcast(&_cond);(void)n;}~Cond(){pthread_cond_destroy(&_cond);}
};
<RingQueue.hpp>
#pragma once
#include <iostream>
#include "sem.hpp"
#include <vector>
#include "mutex.hpp"#define CAP 10template<typename T>
class RingQueue
{std::vector<T> _rq;int _cap;Sem _data_sem;Sem _space_sem;int _c_step;int _p_step;pthread_mutex_t _p_lock;pthread_mutex_t _c_lock;
public:RingQueue(int cap = CAP):_cap(cap) ,_rq(cap) ,_data_sem(0) ,_space_sem(cap),_c_step(0),_p_step(0){pthread_mutex_init(&_p_lock,nullptr);pthread_mutex_init(&_c_lock,nullptr);}void Enqueue(const T& data){ _space_sem.P();{LockGuard lock(&_p_lock);_rq[_c_step++] = data;_c_step%=_cap;}_data_sem.V();}void Pop(T* data){_data_sem.P();{LockGuard lock(&_c_lock);*data = _rq[_p_step++];_p_step%=_cap;}_space_sem.V();}~RingQueue(){pthread_mutex_destroy(&_p_lock);pthread_mutex_destroy(&_c_lock);}
};
<RingQueue.cpp>
#include <iostream>
#include "sem.hpp"
#include "mutex.hpp"
#include "cond.hpp"
#include <pthread.h>
#include "RingQueue.hpp"RingQueue<int> rq(10);void* consumer(void* args)
{while(true){int data = 0;rq.Pop(&data);std::cout<<"消费 "<<data<<std::endl;}return (void*)0;
}void* productor(void* args)
{int data = 1;while(true){rq.Enqueue(data);std::cout<<"生产 "<<data++<<std::endl;}return (void*)0;
}int main()
{pthread_t c[2],p[2];pthread_create(c,nullptr,consumer,nullptr);pthread_create(c+1,nullptr,consumer,nullptr);pthread_create(p,nullptr,productor,nullptr);pthread_create(p+1,nullptr,productor,nullptr);pthread_join(c[0],nullptr);pthread_join(c[1],nullptr);pthread_join(p[0],nullptr);pthread_join(p[1],nullptr);return 0;
}
信号量与锁
为什么信号量不需要加锁,信号量为空或为满时,信号量完成了同步和互斥动作,信号量不为空和不为满时,生产者和消费者访问不同位置,也不需要加锁
二元信号量(信号量初始值为1) :就是锁
锁:认为资源只有一份,申请锁相当于信号量P操作(--),释放锁相当于信号量V操作(++),所以锁是信号量的一种特殊情况
小知识
1.pthread开头的函数一般返回值为0表示成功
2.生产和消费还可以传递函数,用来给线程派发任务
3.生产消费模型高效在哪里:生产者生产数据和消费者处理数据是并发的,两者是松耦合关系,可以忙闲不均,这里效率高,而二者向阻塞队列中进行放入和拿出属于同步和互斥,效率不高