当前位置: 首页 > news >正文

Linux学习:生产者消费者模型

目录

  • 1. 生产者消费者模型的相关概念
    • 1.1 什么是生产者消费者模型
    • 1.2 生产者消费者模型的优势作用
  • 2. 多线程简单实现生产者消费者模型
    • 2.1 设计方案
    • 2.2 代码实现
      • 2.2.1 线程类
      • 2.2.2 BlockQueue类
      • 2.2.3 任务类
      • 2.2.4 主干代码

1. 生产者消费者模型的相关概念

1.1 什么是生产者消费者模型

在这里插入图片描述
  生产者消费者模型是一种经典的并发编程的设计模式。其由三部分组成分别为生产者消费者共享资源缓冲区

  • 生产者: 生产任务、数据的线程或进程
  • 消费者: 处理任务、数据的线程或进程
  • 共享资源缓冲区: 任务与数据的暂存区,生产者向其中存储任务与数据,消费者从中获取任务与数据。简单来说,共享资源缓冲区,是一段临时保存数据的内存空间,一般使用某种数据结构对象充当(阻塞队列)

  生产者消费者模型中,其充当生产、消费角色的线程/进程,它们之间需要满足特定的关系,具体如下:

角色关系
生产者 vs 生产者互斥 || 同步(互斥,可能同步)
消费者 vs 消费者互斥 || 同步(互斥,可能同步)
生产者 vs 消费者互斥 && 同步

1.2 生产者消费者模型的优势作用

  生产者消费者模型是为了协调生产者与消费者之间协作。具体设计为,生产者生产的数据不再直接交给消费者,而是直接存入共享资源缓冲区。而消费者也不再从生产者手中获取数据,则是转为从共享资源缓冲区中获取存入的历史数据。通过这样的设计方式,让生产与消费的操作解耦合,提高更好的并发度,并且支持生产者、消费者之间的忙先不均
  生产者消费者模型高效与并发度好的原因为,支持生产任务与处理任务或数据的并发。当生产者竞争锁或是生产任务、数据时,消费者可执行自己的任务,或是直接从缓冲区中获取存储的历史任务、数据。消费者执行任务不影响生产者获取、生产任务。

2. 多线程简单实现生产者消费者模型

2.1 设计方案

1. 生产者消费者模型的实体选择:
在这里插入图片描述

  • 生产者:创建一批线程向共享资源缓冲区中生产任务
  • 消费者:创建一批线程从共享资源缓冲区获取任务并处理
  • 共享资源缓冲区:此处使用自定义的阻塞队列(BlockQueue)实现,保证生产者、消费者访问其时互斥且同步

2.阻塞队列(BlockQueue)的实现:

成员变量作用
queue<T>用于存储任务、数据的队列
int _cap队列的容量大小
pthread_mutex_t _mutex访问阻塞队列时控制互斥的锁
pthread_cond_t _productor_cond控制生产者同步的条件变量
pthread_cond_t _consumer_cond控制消费者同步的条件变量
int _productor_wait_num在条件变量处阻塞等待的生产者线程数量
int _consumer_wait_num在条件变量处阻塞等待的消费者线程数量
成员函数作用
void Equeue(T& data)将生产者生产的数据入队列
void Pop(T* data)从队列中获取历史的数据,采用输出型参数的方式
bool IsFull()检测队列是否满了
bool IsEmpty()检测队列是否为空
  • 互斥: 阻塞队列的入队与出队操作都必须是互斥的,即保证无论何时,无论是生产者还是消费者线程都只能有一个线程在访问阻塞队列。
  • 同步: 除此之外,还要保证生产者与消费者之间的同步,即队列中数据存储已慢,则阻塞生产者,队列中没有数据,则阻塞消费者,确保生产与消费的整个过程可以正常进行。
  • 条件变量的优化: 当没有生产者或消费者在条件变量下阻塞等待时,就可以选择不需要再去将对应的条件变量唤醒。

3. 程序的主干逻辑与函数

在这里插入图片描述

2.2 代码实现

2.2.1 线程类

#ifndef THREAD_MODULE
#define THREAD_MODULE
#include <pthread.h>
#include <iostream>
using namespace std;
#include <functional>namespace ThreadModule
{template<typename T>using func_t = function<void(T&)>;template<typename T>class Thread{public:Thread(func_t<T> func, T& data, string name = "none-thread"):_func(func), _data(data), _name(name), _stop(true){}~Thread(){}void Execute(){_func(_data);}static void* threadroutine(void* arg){Thread<T>* ptd = static_cast<Thread<T>*>(arg);ptd->Execute();return nullptr;}bool start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(n){return false; }_stop = false;return true;}void join(){if(!_stop){pthread_join(_tid, nullptr);}}void detach(){if(!_stop){pthread_detach(_tid);}}string name(){return _name;}void stop(){_stop = true;}private:pthread_t _tid;string _name;func_t<T> _func;T& _data;bool _stop;};}#endif

2.2.2 BlockQueue类

#ifndef BLOCK_QUEUE_HPP
#define BLOCK_QUEUE_HPP#include <queue>
#include <pthread.h>template<typename T>
class BlockQueue
{
public:BlockQueue(int cap){_cap = cap;pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_consumer_cond, nullptr);pthread_cond_init(&_productor_cond, nullptr);_consumer_wait_num = 0;_productor_wait_num = 0;}bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();}void Enqueue(T& data){pthread_mutex_lock(&_mutex);while(IsFull()){_productor_wait_num++;pthread_cond_wait(&_productor_cond, &_mutex);_productor_wait_num--;}_q.push(data);if(_consumer_wait_num > 0)//当有正在等待的消费者时pthread_cond_signal(&_consumer_cond);//生产了继续消费pthread_mutex_unlock(&_mutex);}void Pop(T* data){pthread_mutex_lock(&_mutex);while(IsEmpty()){_consumer_wait_num++;pthread_cond_wait(&_consumer_cond, &_mutex);_consumer_wait_num--;}*data = _q.front();_q.pop();if(_productor_wait_num > 0)//当有正在阻塞等待的生产者时pthread_cond_signal(&_productor_cond);//消费了继续生产pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_consumer_cond);pthread_cond_destroy(&_productor_cond);}private:std::queue<T> _q;    //队列存储数据int _cap;            //阻塞队列的容量pthread_mutex_t _mutex;pthread_cond_t _consumer_cond;pthread_cond_t _productor_cond;int _consumer_wait_num;int _productor_wait_num;
};#endif

  条件变量的阻塞判断条件应设为为while,不能设置为if,这是因为pthread_cond_wait可能会出错返回导致继续执行后续代码。可此时唤醒条件并未满足,条件变量并没有被真的唤醒,此种情况被称为伪唤醒if条件判断语句并不能预防此种伪唤醒的错误情况,所以,一般条件变量的阻塞判断条件会被设置为while检测,保证代码的健壮性

2.2.3 任务类

#ifndef TASK_HPP
#define TASK_HPP
#include <functional>
#include <iostream>
using namespace std;//使用仿函数
class Task
{
public://无参构造,用于消费者创建接收数据Task(){}Task(int x, int y):_x(x), _y(y){}~Task(){}void toDebugQuestion(){cout << _x << " + " << _y << " =?" << endl;}void toDebugAnswer(){cout << _x << " + " << _y << " = " << _x + _y << endl;}private:int _x;int _y;int _sum;
};#endif

2.2.4 主干代码

#include "Thread.hpp"
#include "BlockQueue.hpp"
#include <vector>
using namespace ThreadModule;
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "Task.hpp"using blockqueue_t = BlockQueue<Task>;void ProductorRun(blockqueue_t& bq)
{while(true){sleep(1);//生产慢Task t(rand() % 10, rand() % 10);bq.Enqueue(t);t.toDebugQuestion();}
}void ConsumerRun(blockqueue_t& bq)
{Task t;while(true){//sleep(1);//消费慢bq.Pop(&t);t.toDebugAnswer();}
}void StartComm(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, func_t<blockqueue_t> func, int num, string who)
{for(int i = 0; i < num; i++){string name = who + '-' + to_string(i + 1);threads.emplace_back(func, bq, name);threads.back().start();cout << name << " create success..." << endl;}
}void StartProductor(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, int num)
{StartComm(threads, bq, ProductorRun, num, "Productor");
}void StartConsumer(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, int num)
{StartComm(threads, bq, ConsumerRun, num, "Consumer");
}void WaitAllThreads(vector<Thread<blockqueue_t> >& threads)
{for(auto& thread : threads){thread.join();}
}int main()
{srand((size_t)time(nullptr));vector<Thread<blockqueue_t> > threads;blockqueue_t bq(5);StartProductor(threads, bq, 3);StartConsumer(threads, bq, 5);WaitAllThreads(threads);return 0;
}
http://www.xdnf.cn/news/1449109.html

相关文章:

  • 开源 C++ QT Widget 开发(十一)进程间通信--Windows 窗口通信
  • AI 大模型 “内卷” 升级:从参数竞赛到落地实用,行业正在发生哪些关键转变?
  • 2025年经济学专业女性职业发展证书选择与分析
  • SCN随机配置网络时间序列预测Matlab实现
  • @Resource与@Autowired的区别
  • 数据结构——顺序表和单向链表(2)
  • 【Android】【设计模式】抽象工厂模式改造弹窗组件必知必会
  • Wan2.2AllInOne - Wan2.2极速视频生成模型,4步极速生成 ComfyUI工作流 一键整合包下载
  • 深度学习篇---模型组成部分
  • http和https区别是什么
  • Spring Boot 2.7 中资源销毁的先后顺序
  • mysqldump导出远程的数据库表(在java代码中实现)
  • VUE的模版渲染过程
  • FFMPEG H264
  • OpenLayers常用控件 -- 章节一:地图缩放控件详解教程
  • 如何通过level2千档盘口分析挂单意图
  • JavaScript的输出语句
  • 三阶Bezier曲线,已知曲线上一点到曲线起点的距离为L,计算这个点的参数u的方法
  • 专题四_前缀和_一维前缀和
  • 【OC】属性关键字
  • vtk资料整理
  • Linux arm64 PTE contiguous bit
  • linux可以直接用指针操作物理地址吗?
  • torch学习 自用
  • python类的内置属性
  • AI重塑SaaS:从被动工具到智能角色的技术演进路径
  • 【面试题】OOV(未登录词)问题如何解决?
  • Leetcode_202.快乐数_三种方法解决(普通方法解决,哈希表解决,循环链表的性质解决_快慢指针)
  • 简述:普瑞时空数据建库软件(国土变更建库)之一(变更预检查部分规则)
  • PyTorch 中训练语言模型过程