Linux信号量(32)
文章目录
- 前言
- 一、POSIX 信号量
- 信号量的基础知识
- 信号量的基本操作
- 二、基于环形队列实现生产者消费者模型
- 环形队列
- 单生产单消费模型
- 多生产多消费模型
- 总结
前言
加油,加油!!!
一、POSIX 信号量
信号量的基础知识
互斥、同步 不只能通过 互斥锁、条件变量 实现,还能通过 信号量 sem、互斥锁 实现(出自 POSIX 标准)
「信号量」 的本质就是一个 计数器,能够更细粒度的对临界资源进行管理。
- 申请到资源,计数器 --(P 操作)
- 释放完资源,计数器 ++(V 操作)
「信号量」 的 PV 操作都是原子的,假设将 「信号量」 的值设为 1,用来表示 「生产者消费者模型」 中 阻塞队列 _queue 的使用情况
- 当 sem 值为 1 时,线程可以进行 「生产 / 消费」,sem–
- 当 sem 值为 0 时,线程无法进行 「生产 / 消费」,只能阻塞等待
此时的 「信号量」 只有两种状态:1、0,可以实现类似 互斥锁 的效果,即实现 线程互斥,像这种只有两种状态的信号量称为 「二元信号量」
「信号量」 不止可以用于 互斥,它的主要目的是 描述临界资源中的资源数目,比如我们可以把 阻塞队列 切割成 N 份,初始化 「信号量」 的值为 N,当某一份资源就绪时,sem–,资源被释放后,sem++,如此一来可以像 条件变量 一样实现 同步
- 当 sem == N 时,阻塞队列已经空了,消费者无法消费
- 当 sem == 0 时,阻塞队列已经满了,生产者无法生产
用来实现 互斥、同步 的信号量称为 「多元信号量」
综上所述,在使用 「多元信号量」 访问资源时,需要先申请 「信号量」,只有申请成功了才能进行资源访问,否则会进入阻塞等待,即当前资源不可用
在实现 互斥、同步 时,该如何选择?
结合业务场景进行分析,如果待操作的共享资源是一个整体,比较适合使用 互斥锁+条件变量 的方案,但如果共享资源是多份资源,使用 信号量 就比较方便
其实 「信号量」 的工作机制类似于 买电影票,是一种 预订机制,只要你买到票了,即使你晚点到达电影院,你的位置也始终可用,买到票的本质是将对应的座位进行了预订
对于 「信号量」 的第一层理解:申请信号量实际是一种资源预订机制
只要申请 「信号量」 成功了,就一定可以访问临界资源
如果将 「信号量」 实际带入我们之前写的 「生产者消费者模型」 代码中,是不需要进行资源条件判断的,因为 「信号量」本身就已经是资源的计数器了
对于 「信号量」 的第二层理解:使用信号量时,就已经把资源条件判断转化成了信号量的申请行为
// 生产数据(入队)
void Push(const T& inData)
{// 申请信号量 P操作// ..._queue.push(inData);// ...// 释放信号量 V操作
}
信号量的基本操作
有了之前 互斥锁、条件变量 的使用基础,信号量 的接口学习是释放简单的,依旧是只有四个接口:初始化、销毁、申请、释放
初始化信号量
#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);
参数1:需要初始化的信号量,sem_t 实际就是一个联合体,里面包含了一个 char 数组,以及一个 long int 成员
typedef union
{char __size[__SIZEOF_SEM_T];long int __align;
} sem_t;
参数2:表示当前信号量的共享状态,传递 0 表示线程间共享,传递 非0 表示进程间共享
参数3:信号量的初始值,可以设置为双元或多元信号量
返回值:初始化成功返回 0,失败返回 -1,并设置错误码
销毁信号量
#include <semaphore.h>int sem_destroy(sem_t *sem);
参数:待销毁的信号量
返回值:成功 0,失败 -1, 并设置错误码
申请信号量(等待信号量)
#include <semaphore.h>int sem_wait(sem_t *sem);int sem_trywait(sem_t *sem);int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
主要使用 sem_wait
参数:表示从哪个信号量中申请
返回值:成功返回 0,失败返回 -1,并设置错误码
其他两种申请方式分别是:尝试申请,如果没有申请到资源,就会放弃申请;每隔一段时间进行申请,即 timeout
释放信号量(发布信号量)
#include <semaphore.h>int sem_post(sem_t *sem);
参数:将资源释放到哪个信号量中
返回值:成功返回 0,失败返回 -1,并设置错误码
这些函数其实还是蛮一目了然的,是干啥有啥用都很清楚,所以事不宜迟我们直接开始上手用信号量实现生产者消费者模型!!!
二、基于环形队列实现生产者消费者模型
环形队列
「生产者消费者模型」 中的交易场所是可更换的,不仅可以使用 阻塞队列,还可以使用 环形队列,所谓的 环形队列 并非 队列,而是用数组模拟实现的 “队列”, 并且它的 判空、判满 比较特殊
如何让 环形队列 “转” 起来?
答案是取模,很简单的
在我之前讲解数据结构的时候,我曾经讲过一个环形队列 判空、判满 的方法:
多开一个空间,head、tail 位于同一块空间中时,表示当前队列为空
当待生产的数据落在 head 指向的空间时,就表示已满
但是今天我们参考阻塞队列,搞一个计数器,当计数器的值为 0 时,表示当前为空,当计数器的值为容量时,表示队列为满
我前面说了,因为 「信号量」 本身就是一个天然的计数器,所以在这里我们也天然使用第二个策略
在 环形队列 中,生产者 和 消费者 关心的资源不一样:生产者只关心是否有空间放数据,消费者只关心是否能从空间中取到数据
除非两者相遇,其他情况下生产者、消费者可以并发运行(同时访问环形队列)
两者错位时正常进行生产消费就好了,但两者相遇时需要特殊处理,也就是处理 空、满 两种情况,这就是 环形队列 的运转模式
以下是DS大人给出的一个比喻,有助于大家理解并掌握
简而言之,这个模型的运作模式就是:
- 环形队列为空时:消费者阻塞,只能由生产者进行生产,生产完商品后,消费者可以消费商品
- 环形队列为满时:生产者阻塞,只能由消费者进行消费,消费完商品后,生产者可以生产商品
- 其他情况:生产者、消费者并发运行,各干各的事,互不影响
所以我们可以使用 「信号量」 标识资源的使用情况,但生产者和消费者关注的资源并不相同,所以需要使用两个 「信号量」 来进行操作
- 生产者信号量:标识当前有多少可用空间
- 消费者信号量:标识当前有多少数据
如果说搞两个 条件变量 是 阻塞队列 的精髓,那么搞两个 信号量 就是 环形队列 的精髓,显然,刚开始的时候,生产者信号量初始值为环形队列的大小,消费者信号量初始值为 0
无论是生产者还是消费者,只有申请到自己的 「信号量」 资源后,才进行 生产 / 消费
比如上图中的 pro_sem 就表示 生产者还可以进行 3 次生产,con_sem 表示 消费者还可以消费 5 次
具体生产者消费者我们还可以参照下面代码来理解:
// 生产者
void Producer()
{// 申请信号量(空位 - 1)sem_wait(&pro_sem);// 生产商品// ...// 释放信号量(商品 + 1)sem_post(&con_sem);
}// 消费者
void Consumer()
{// 申请信号量(商品 - 1)sem_wait(&con_sem);// 消费商品// ...// 释放信号量(空位 + 1)sem_post(&pro_sem);
}
生产者和消费者指向同一个位置时保证线程安全,其他情况保证并发度
单生产单消费模型
我们起手先创建一个环形队列头文件
#pragma once#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <vector>#define NUM 8template<class T>
class RingQueue
{
private://P操作void P(sem_t& s){sem_wait(&s);}//V操作void V(sem_t& s){sem_post(&s);}
public:RingQueue(int cap = NUM): _cap(cap), _p_pos(0), _c_pos(0){_q.resize(_cap);sem_init(&_blank_sem, 0, _cap); //blank_sem初始值设置为环形队列的容量sem_init(&_data_sem, 0, 0); //data_sem初始值设置为0}~RingQueue(){sem_destroy(&_blank_sem);sem_destroy(&_data_sem);}//向环形队列插入数据(生产者调用)void Push(const T& data){P(_blank_sem); //生产者关注空间资源_q[_p_pos] = data;V(_data_sem); //生产//更新下一次生产的位置_p_pos++;_p_pos %= _cap;}//从环形队列获取数据(消费者调用)void Pop(T& data){P(_data_sem); //消费者关注数据资源data = _q[_c_pos];V(_blank_sem);//更新下一次消费的位置_c_pos++;_c_pos %= _cap;}
private:std::vector<T> _q; //环形队列int _cap; //环形队列的容量上限int _p_pos; //生产位置int _c_pos; //消费位置sem_t _blank_sem; //描述空间资源sem_t _data_sem; //描述数据资源
};
有一些小细节如下:
- 生产者的信号量初始值为 DEF_CAP
- 消费者的信号量初始值为 0
- 生产者、消费者的起始下标都为 0
在没有 互斥锁 的情况下,是如何 确保生产者与消费者间的互斥关系的?
通过两个 信号量,当两个 信号量 都不为 0 时,双方可以并发操作,这是 环形队列 最大的特点;当 生产者信号量为 0 时,生产者陷入阻塞等待,等待消费者消费;同理当 消费者信号量为 0 时,消费者也会阻塞住,在这里阻塞就是 互斥 的体现。当对方完成 生产 / 消费 后,自己会解除阻塞状态,而这就是 同步
现在我们再来创建 .cpp 文件
#include "test57.hpp"void* Producer(void* arg)
{RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){sleep(1);int data = rand() % 100 + 1;rq->Push(data);std::cout << "Producer: " << data << std::endl;}
}void* Consumer(void* arg)
{RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){sleep(1);int data = 0;rq->Pop(data);std::cout << "Consumer: " << data << std::endl;}
}int main()
{srand((unsigned int)time(nullptr));pthread_t producer, consumer;RingQueue<int>* rq = new RingQueue<int>;pthread_create(&producer, nullptr, Producer, rq);pthread_create(&consumer, nullptr, Consumer, rq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete rq;return 0;
}
结果如下,生产者每隔一秒生产一次、消费者每隔一秒消费一次
剩下的 生产者每隔一秒生产一次 和 消费者每隔一秒消费一次,就交给大家自己去尝试了!
多生产多消费模型
接下来可以实现 多生产多消费场景 中的 CP 模型了,多生产多消费无非就是增加了 消费者与消费者、生产者与生产者 间的 互斥 关系,加锁就行了,现在问题是加几把锁?
答案是 两把,因为当前的 生产者和消费者 关注的资源不一样,一个关注剩余空间,另一个关注是否有商品,一把锁是无法锁住两份不同资源的,所以需要给 生产者、消费者 各配一把锁
阻塞队列 中为什么只需要一把锁?
因为阻塞队列中的共享资源是一整个队列,生产者和消费者访问的是同一份资源,所以一把锁就够了
#pragma once#include <vector>
#include <mutex>
#include <semaphore.h>#define DEF_CAP 10template<class T>
class RingQueue
{
public:RingQueue(size_t cap = DEF_CAP):_cap(cap), _pro_step(0), _con_step(0){_queue.resize(_cap);// 初始化信号量sem_init(&_pro_sem, 0, _cap);sem_init(&_con_sem, 0, 0);// 初始化互斥锁pthread_mutex_init(&_pro_mtx, nullptr);pthread_mutex_init(&_con_mtx, nullptr);}~RingQueue(){// 销毁信号量sem_destroy(&_pro_sem);sem_destroy(&_con_sem);// 销毁互斥锁pthread_mutex_destroy(&_pro_mtx);pthread_mutex_destroy(&_con_mtx);}// 生产商品void Push(const T &inData){// 申请信号量P(&_pro_sem);Lock(&_pro_mtx);// 生产_queue[_pro_step++] = inData;_pro_step %= _cap;UnLock(&_pro_mtx);// 释放信号量V(&_con_sem);}// 消费商品void Pop(T *outData){// 申请信号量P(&_con_sem);Lock(&_con_mtx);// 消费*outData = _queue[_con_step++];_con_step %= _cap;UnLock(&_con_mtx);// 释放信号量V(&_pro_sem);}private:void P(sem_t *sem){sem_wait(sem);}void V(sem_t *sem){sem_post(sem);}void Lock(pthread_mutex_t *lock){pthread_mutex_lock(lock);}void UnLock(pthread_mutex_t *lock){pthread_mutex_unlock(lock);}private:std::vector<T> _queue;size_t _cap;sem_t _pro_sem;sem_t _con_sem;size_t _pro_step; // 生产者下标size_t _con_step; // 消费者下标pthread_mutex_t _pro_mtx;pthread_mutex_t _con_mtx;
};
细节: 加锁行为放在信号量申请成功之后,可以提高并发度
以下又是DS大人给的比喻来帮助你理解~
原因在于,信号量的操作是原子的,这就好比一群学生在进行座位编排,可以先放一个学生进入教室,再给他确定座位;也可以先给每个人确定好自己的座位(一人一座),然后排队进入教室,对号入座即可。先申请 「信号量」 相当于先确定座位,避免进入教室(加锁)后还得选座位
加锁意味着串行化,一定会降低效率,但因为 「信号量」 的操作是原子的,可以确保线程安全,也就不需要加锁保护;也就是可以并发申请 「信号量」,再串行化访问临界资源
// ...int main()
{// 种种子srand((size_t)time(nullptr));// 创建一个阻塞队列RingQueue<int>* rq = new RingQueue<int>;// 创建多个线程(生产者、消费者)pthread_t pro[10], con[20];for(int i = 0; i < 10; i++)pthread_create(pro + i, nullptr, Producer, rq);for(int i = 0; i < 20; i++)pthread_create(con + i, nullptr, Consumer, rq);for(int i = 0; i < 10; i++)pthread_join(pro[i], nullptr);for(int i = 0; i < 20; i++)pthread_join(con[i], nullptr);delete rq;return 0;
}
对比阻塞队列,我们还要思考以下环形队列的意义在哪里
对缓冲区的操作对于计算机说就是小 case,需要关注的点在于 获取数据和消费数据,这是比较耗费时间的,阻塞队列 至多支持获取 一次数据获取 或 一次数据消费,在代码中的具体体现就是 所有线程都在使用一把锁,并且每次只能 push、pop 一个数据;而 环形队列 就不一样了,生产者、消费者 可以通过 条件变量 知晓数据获取、数据消费次数,并且由于数据获取、消费操作没有加锁,支持并发,因此效率十分高
环形队列 中允许 N 个生产者线程一起进行数据获取,也允许 N 个消费者线程一起进行数据消费,简单任务处理感知不明显,但复杂任务就不一样了,这就有点像同时下载多份资源,是可以提高效率的
不过存在即合理,阻塞队列肯定也有其合理的地方,有其更适合的地方,具体问题就具体分析!!!
总结
加油加油!!!再撑住一会儿~