当前位置: 首页 > news >正文

Linux多线程(十三)之【POSIX信号量基于环形队列的生产消费模型】

文章目录

      • POSIX信号量
        • 基于环形队列的生产消费模型

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。

POSIX可以用于线程间同步。

上面的生产消费模型中的队列q是被当做整体使用的。

但是共享资源也可以被看作多份!

信号量的本质是一把计数器,这把计数器的本质是临界资源的数量。

(把资源是否就绪放在了临界区之外)

申请到了临界资源,在PV操作期间,不需要判断资源是否就绪。

(因为申请信号量时,其实就已经间接的做判断了)

因为只要申请成功了信号量,就一定会有对应的资源。

初始化信号量

#include <semaphore.h> 
int sem_init(sem_t *sem, int pshared, unsigned int value); 
参数: pshared:0表示线程间共享,非零表示进程间共享 value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性

image-20250425234659138

数组模拟环形队列

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,

所以可以通过加计数器或者标记位来判断满或者空。

另外也可以预留一个空的位置,作为满的状态。

image-20250425234719962

image-20250511171403892

生产者-消费者的例子是基于queue的,其空间可以动态分配,

现在基于固定大小的环形队列重写这个程序

(POSIX信号量):

我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程

问题:多生产者、多消费者加锁加在哪里?

//1
P(pspace_sem_);
lock(p_mutex_);
//2
lock(p_mutex_);
P(pspace_sem_);

选1

技术上:信号量申请不需要加锁,因为是原子的。

逻辑上:方案2要申请锁成功了才能申请信号量。(时间是串行的)

​ 方案1申请到了信号量的线程就可以竞争锁了,

​ 在持有信号量和锁的线程运行临界区代码时,其他线程就可以先去申请信号量。

​ 运行完代码释放锁,其他获得信号量的线程就可以立马获得锁,立马运行代码。

​ 可以让申请信号量的时间和申请锁的时间变成并行的。

Task.hpp

/** @Author: lll 1280388921@qq.com* @Date: 2025-05-08 23:06:06* @LastEditors: lll 1280388921@qq.com* @LastEditTime: 2025-05-12 23:11:35* @FilePath: /lesson39/2.blockqueue/Task.hpp* @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE*/
#pragma once
#include <iostream>
#include<string>
using namespace std;string opers="+-*/%";enum
{Div_zero = 1,Mod_zero,Unknown
};class Task
{
public:Task(){}Task(int x, int y, char op): a(x), b(y), op_(op), ret(0), exitcode(0){}void Run(){switch (op_){case '+':ret = a + b;break;case '-':ret = a - b;break;case '*':ret = a * b;break;case '/':{if (b == 0)exitcode = Div_zero;elseret = a / b;}break;case '%':{if (b == 0)exitcode = Mod_zero;elseret = a % b;}break;default:exitcode=Unknown;break;}}string GetTask(){string r=to_string(a);r+=op_;r+=to_string(b);r+="=???";return r;}string Getret(){string r=to_string(a);r+=op_;r+=to_string(b);r+="=";r+=to_string(ret);r+=" [ exitcode: ";r+=to_string(exitcode);r+=" ]";return r;}void operator()(){Run();}~Task() {}private:int a;int b;char op_;int ret;int exitcode;
};

Ringqueue.hpp

#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>using namespace std;const static int defaultnum = 5;template <class T>
class Ringqueue
{void P(sem_t &sem){sem_wait(&sem);}void V(sem_t &sem){sem_post(&sem);}void lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public:Ringqueue(int cap = defaultnum): ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0){sem_init(&cdata_sem_, 0, 0);sem_init(&pspace_sem_, 0, cap_);pthread_mutex_init(&c_mutex_, nullptr);pthread_mutex_init(&p_mutex_, nullptr);}void Push(const T &in){P(pspace_sem_);lock(p_mutex_);ringqueue_[p_step_++] = in;p_step_ %= cap_;unlock(p_mutex_);V(cdata_sem_);}void Pop(T *out){P(cdata_sem_);lock(c_mutex_);*out = ringqueue_[c_step_++];c_step_ %= cap_;unlock(c_mutex_);V(pspace_sem_);}~Ringqueue(){sem_destroy(&cdata_sem_);sem_destroy(&pspace_sem_);pthread_mutex_destroy(&c_mutex_);pthread_mutex_destroy(&p_mutex_);}private:vector<T> ringqueue_;int cap_;int c_step_; // 消费者下标int p_step_; // 生产者下标sem_t cdata_sem_;  // 数据资源sem_t pspace_sem_; // 空间资源pthread_mutex_t c_mutex_;pthread_mutex_t p_mutex_;
};

main.cc

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>
#include <ctime>
#include "Ringqueue.hpp"
#include "Task.hpp"using namespace std;struct ThreadData
{Ringqueue<Task> *rq_;string tname;
};void *Productor(void *args)
{ThreadData *td = static_cast<ThreadData *>(args);Ringqueue<Task> *rq = td->rq_;string name = td->tname;int len = opers.size();while (1){int data1 = rand() % 10 + 1;usleep(10);int data2 = rand() % 5;char op = opers[rand() % len];Task t(data1, data2, op);rq->Push(t);cout << "Productor task done: " << t.GetTask() << " ,name: " << name << endl;// sleep(1);}return nullptr;
}void *Consumer(void *args)
{ThreadData *td = static_cast<ThreadData *>(args);Ringqueue<Task> *rq = td->rq_;string name = td->tname;while (1){Task t; // 记得补一个默认构造rq->Pop(&t);t();cout << "Consumer get task , result: " << t.Getret() << " ,name: " << name << endl;sleep(1);}return nullptr;
}int main()
{srand(time(nullptr) ^ getpid());Ringqueue<Task> *rq = new Ringqueue<Task>();pthread_t c[1], p[1];for (int i = 0; i < 1; i++){ThreadData *td = new ThreadData();td->rq_ = rq;td->tname = "Consumer - " + to_string(i + 1);pthread_create(p + i, nullptr, Consumer, td);}for (int i = 0; i < 1; i++){ThreadData *td = new ThreadData();td->rq_ = rq;td->tname = "Productor - " + to_string(i + 1);pthread_create(c + i, nullptr, Productor, td);}for (int i = 0; i < 1; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 1; i++){pthread_join(p[i], nullptr);}return 0;
}

image-20250512231530427

http://www.xdnf.cn/news/1078957.html

相关文章:

  • OpenCV CUDA模块设备层-----在 GPU上高效地执行两个uint类型值的最小值比较函数vmin2()
  • LeetCode 317 最短距离选址问题详解(Swift 实现 + BFS 多源遍历)
  • 高边驱动 低边驱动
  • 多模态AI Agent技术栈解析:视觉-语言-决策融合的算法原理与实践
  • Kafka生态整合深度解析:构建现代化数据架构的核心枢纽
  • JA3指纹在Web服务器或WAF中集成方案
  • 专题:2025即时零售与各类人群消费行为洞察报告|附400+份报告PDF、原数据表汇总下载
  • MacOS Safari 如何打开F12 开发者工具 Developer Tools
  • 打造一个可维护、可复用的前端权限控制方案(含完整Demo)
  • 请求未达服务端?iOS端HTTPS链路异常的多工具抓包排查记录
  • 【CSS揭秘】笔记
  • 网络基础(3)
  • HTML初学者第二天
  • 利用tcp转发搭建私有云笔记
  • Chart.js 安装使用教程
  • 【强化学习】深度解析 GRPO:从原理到实践的全攻略
  • 怎样理解:source ~/.bash_profile
  • vscode vim插件示例json意义
  • 电子电气架构 --- SOVD功能简单介绍
  • 如何系统性评估运维自动化覆盖率:方法与关注重点
  • Spark流水线数据探查组件
  • 【字节跳动】数据挖掘面试题0002:从转发数据中求原视频用户以及转发的最长深度和二叉排序树指定值
  • 计算机视觉的新浪潮:扩散模型(Diffusion Models)技术剖析与应用前景
  • 六、软件操作手册
  • 【Python】进阶 - 数据结构与算法
  • Python 高光谱分析工具(PyHAT)
  • Python 数据分析:numpy,说人话,说说数组维度。听故事学知识点怎么这么容易?
  • vue中的toRef
  • C#上位机串口接口
  • docker常见命令