Linux多线程(十三)之【POSIX信号量基于环形队列的生产消费模型】
文章目录
- POSIX信号量
- 基于环形队列的生产消费模型
POSIX信号量
POSIX
信号量和SystemV
信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。
但POSIX
可以用于线程间同步。
上面的生产消费模型中的队列q是被当做整体使用的。
但是共享资源也可以被看作多份!
信号量的本质是一把计数器,这把计数器的本质是临界资源的数量。
(把资源是否就绪放在了临界区之外)
申请到了临界资源,在PV操作期间,不需要判断资源是否就绪。
(因为申请信号量时,其实就已经间接的做判断了)
因为只要申请成功了信号量,就一定会有对应的资源。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数: pshared:0表示线程间共享,非零表示进程间共享 value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,
所以可以通过加计数器或者标记位来判断满或者空。
另外也可以预留一个空的位置,作为满的状态。
生产者-消费者的例子是基于queue的,其空间可以动态分配,
现在基于固定大小的环形队列重写这个程序
(POSIX信号量):
我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
问题:多生产者、多消费者加锁加在哪里?
//1
P(pspace_sem_);
lock(p_mutex_);
//2
lock(p_mutex_);
P(pspace_sem_);
选1
技术上:信号量申请不需要加锁,因为是原子的。
逻辑上:方案2要申请锁成功了才能申请信号量。(时间是串行的)
方案1申请到了信号量的线程就可以竞争锁了,
在持有信号量和锁的线程运行临界区代码时,其他线程就可以先去申请信号量。
运行完代码释放锁,其他获得信号量的线程就可以立马获得锁,立马运行代码。
可以让申请信号量的时间和申请锁的时间变成并行的。
Task.hpp
/** @Author: lll 1280388921@qq.com* @Date: 2025-05-08 23:06:06* @LastEditors: lll 1280388921@qq.com* @LastEditTime: 2025-05-12 23:11:35* @FilePath: /lesson39/2.blockqueue/Task.hpp* @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE*/
#pragma once
#include <iostream>
#include<string>
using namespace std;string opers="+-*/%";enum
{Div_zero = 1,Mod_zero,Unknown
};class Task
{
public:Task(){}Task(int x, int y, char op): a(x), b(y), op_(op), ret(0), exitcode(0){}void Run(){switch (op_){case '+':ret = a + b;break;case '-':ret = a - b;break;case '*':ret = a * b;break;case '/':{if (b == 0)exitcode = Div_zero;elseret = a / b;}break;case '%':{if (b == 0)exitcode = Mod_zero;elseret = a % b;}break;default:exitcode=Unknown;break;}}string GetTask(){string r=to_string(a);r+=op_;r+=to_string(b);r+="=???";return r;}string Getret(){string r=to_string(a);r+=op_;r+=to_string(b);r+="=";r+=to_string(ret);r+=" [ exitcode: ";r+=to_string(exitcode);r+=" ]";return r;}void operator()(){Run();}~Task() {}private:int a;int b;char op_;int ret;int exitcode;
};
Ringqueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>using namespace std;const static int defaultnum = 5;template <class T>
class Ringqueue
{void P(sem_t &sem){sem_wait(&sem);}void V(sem_t &sem){sem_post(&sem);}void lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public:Ringqueue(int cap = defaultnum): ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0){sem_init(&cdata_sem_, 0, 0);sem_init(&pspace_sem_, 0, cap_);pthread_mutex_init(&c_mutex_, nullptr);pthread_mutex_init(&p_mutex_, nullptr);}void Push(const T &in){P(pspace_sem_);lock(p_mutex_);ringqueue_[p_step_++] = in;p_step_ %= cap_;unlock(p_mutex_);V(cdata_sem_);}void Pop(T *out){P(cdata_sem_);lock(c_mutex_);*out = ringqueue_[c_step_++];c_step_ %= cap_;unlock(c_mutex_);V(pspace_sem_);}~Ringqueue(){sem_destroy(&cdata_sem_);sem_destroy(&pspace_sem_);pthread_mutex_destroy(&c_mutex_);pthread_mutex_destroy(&p_mutex_);}private:vector<T> ringqueue_;int cap_;int c_step_; // 消费者下标int p_step_; // 生产者下标sem_t cdata_sem_; // 数据资源sem_t pspace_sem_; // 空间资源pthread_mutex_t c_mutex_;pthread_mutex_t p_mutex_;
};
main.cc
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>
#include <ctime>
#include "Ringqueue.hpp"
#include "Task.hpp"using namespace std;struct ThreadData
{Ringqueue<Task> *rq_;string tname;
};void *Productor(void *args)
{ThreadData *td = static_cast<ThreadData *>(args);Ringqueue<Task> *rq = td->rq_;string name = td->tname;int len = opers.size();while (1){int data1 = rand() % 10 + 1;usleep(10);int data2 = rand() % 5;char op = opers[rand() % len];Task t(data1, data2, op);rq->Push(t);cout << "Productor task done: " << t.GetTask() << " ,name: " << name << endl;// sleep(1);}return nullptr;
}void *Consumer(void *args)
{ThreadData *td = static_cast<ThreadData *>(args);Ringqueue<Task> *rq = td->rq_;string name = td->tname;while (1){Task t; // 记得补一个默认构造rq->Pop(&t);t();cout << "Consumer get task , result: " << t.Getret() << " ,name: " << name << endl;sleep(1);}return nullptr;
}int main()
{srand(time(nullptr) ^ getpid());Ringqueue<Task> *rq = new Ringqueue<Task>();pthread_t c[1], p[1];for (int i = 0; i < 1; i++){ThreadData *td = new ThreadData();td->rq_ = rq;td->tname = "Consumer - " + to_string(i + 1);pthread_create(p + i, nullptr, Consumer, td);}for (int i = 0; i < 1; i++){ThreadData *td = new ThreadData();td->rq_ = rq;td->tname = "Productor - " + to_string(i + 1);pthread_create(c + i, nullptr, Productor, td);}for (int i = 0; i < 1; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 1; i++){pthread_join(p[i], nullptr);}return 0;
}