当前位置: 首页 > news >正文

C++并发编程-23. 线程间切分任务的方法

按数量划分

在这里插入图片描述

按递归划分

在这里插入图片描述

在这里插入图片描述

#include <thread>
#include <list>
#include "thread_safe_stack.h"
#include <future>
#include <memory>
template<typename T>
struct sorter  //1  
{struct chunk_to_sort{std::list<T> data;std::promise<std::list<T> > promise;};thread_safe_stack<chunk_to_sort> chunks;    //⇽-- - 2std::vector<std::thread> threads;   // ⇽-- - 3unsigned const max_thread_count;std::atomic<bool> end_of_data;sorter() :max_thread_count(std::thread::hardware_concurrency() - 1),end_of_data(false){}~sorter()    //⇽-- - 4{end_of_data = true;     //⇽-- - 5for (unsigned i = 0; i < threads.size(); ++i){threads[i].join();    //⇽-- - 6}}void try_sort_chunk(){std::shared_ptr<chunk_to_sort> chunk = chunks.try_pop();    //⇽-- - 7if (chunk){sort_chunk(chunk);    //⇽-- - 8}}std::list<T> do_sort(std::list<T>& chunk_data)    //⇽-- - 9{if (chunk_data.empty()){return chunk_data;}std::list<T> result;result.splice(result.begin(),chunk_data,chunk_data.begin());T const& partition_val = *result.begin();typename std::list<T>::iterator divide_point =  //⇽-- - 10std::partition(chunk_data.begin(),chunk_data.end(),[&](T const& val) {return val < partition_val; });chunk_to_sort new_lower_chunk;new_lower_chunk.data.splice(new_lower_chunk.data.end(),chunk_data,chunk_data.begin(),divide_point);std::future<std::list<T> > new_lower =new_lower_chunk.promise.get_future();chunks.push(std::move(new_lower_chunk));   // ⇽-- - 11if (threads.size() < max_thread_count)    // ⇽-- - 12{threads.push_back(std::thread(&sorter<T>::sort_thread,this));}std::list<T> new_higher(do_sort(chunk_data));result.splice(result.end(),new_higher);while (new_lower.wait_for(std::chrono::seconds(0)) !=std::future_status::ready)    //⇽-- - 13{try_sort_chunk();   // ⇽-- - 14}result.splice(result.begin(),new_lower.get());return result;}void sort_chunk(std::shared_ptr<chunk_to_sort > const& chunk){chunk->promise.set_value(do_sort(chunk->data));    //⇽-- - 15}void sort_thread(){while (!end_of_data)    //⇽-- - 16{try_sort_chunk();    // ⇽-- - 17//交出时间片std::this_thread::yield();    //⇽-- - 18}}
};

我们实现一个函数调用上面的封装快速排序

template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)    //⇽-- - 19
{if (input.empty()){return input;}sorter<T> s;return s.do_sort(input);    //⇽-- - 20
}

本例中,parallel_quick_sort()函数(19处)把绝大部分功能委托给sorter类(1处),后者通过栈容器管理待排序的数据段(2处),并集中管控多个线程以并发执行任务(3处),从而以便捷的操作方式给出了代码实现。

本例中,主要工作由成员函数do_sort()负责(9处),它借标准库的std::partition()函数完成数据分段(10处)。

do_sort()将新划分出来的数据段压入栈容器(11处),但没有为每个数据段都专门生成新线程,而仅当仍存在空闲的处理器时(12处)才生成新线程。

因为划分出的前半部分数据可能会由别的线程处理,所以我们需要等待它完成排序而进入就绪状态(13处)。

如果当前线程是整个程序中仅有的线程,或者其他线程都正忙于别的任务,那么这一等待行为则需妥善处理,在当前线程的等待期间,我们让它试着从栈容器取出数据进行处理(14处)。

try_sort_chunk()先从栈容器弹出一段数据(7处)并对其进行排序(8处),再把结果存入附属该段的promise中(15处),使之准备就绪,以待提取。

向栈容器压入数据段与取出相关结果相互对应,两项操作均由同一个线程先后执行(11和12处)。

只要标志end_of_data没有成立(16处),各线程便反复循环,尝试对栈内数据段进行排序17。

每个线程在两次检测标志之间进行让步(18处),好让别的线程有机会向栈容器添加数据段。这段代码由sorter类的析构函数汇合各个线程(4处)。

do_sort()将在全部数据段都完成排序后返回(即便许多工作线程仍在运行),主线程进而从parallel_quick_sort()的调用返回20,并销毁sorter对象。其析构函数将设置标志end_of_data成立(5处),然后等待全部线程结束(6处)。标志的成立使得线程函数内的循环终止(16处)。

按工作类别划分

在这里插入图片描述
CRTP(Curiously Recurring Template Pattern,奇特递归模板模式)
在这里插入图片描述


template <typename Derived>
class Shape {
public:void draw() {// 将 this 转换为派生类指针,调用具体实现static_cast<Derived*>(this)->draw();}double area() {return static_cast<Derived*>(this)->area();}
};class Circle : public Shape<Circle> {
public:void draw() {std::cout << "Drawing a circle\n";}double area() {return 3.14159 * 10 * 10;}
};class Rectangle : public Shape<Rectangle> {
public:void draw() {std::cout << "Drawing a rectangle\n";}double area() {return 10 * 20;}
};

模板单例类包含了原子变量_bstop控制线程是否停止

包含了_que用来存储要处理的信息,这是一个线程安全的队列。

_thread是要处理任务的线程。

线程安全队列我们之前有实现过,但是还需要稍微改进下以满足接受外部停止的通知。

我们给ThreadSafeQue添加一个原子变量_bstop表示线程停止的标记

在需要停止等待的时候我们调用如下通知函数
在这里插入图片描述
在这里插入图片描述
比如我们要实现一个ClassA 处理A类任务,可以这么做


#include "ActorSingle.h"
#include "ClassB.h"
struct MsgClassA {std::string name;friend std::ostream& operator << (std::ostream& os, const MsgClassA& ca) {os << ca.name;return os;}
};
class ClassA : public ActorSingle<ClassA, MsgClassA> {friend class ActorSingle<ClassA, MsgClassA>;
public:~ClassA() {_bstop = true;_que.NotifyStop();_thread.join();std::cout << "ClassA destruct " << std::endl;}void DealMsg(std::shared_ptr<MsgClassA> data) {std::cout << "class A deal msg is " << *data << std::endl;MsgClassB msga;msga.name = "llfc";ClassB::Inst().PostMsg(msga);}
private:ClassA(){_thread = std::thread([this]() {for (; (_bstop.load() == false);) {std::shared_ptr<MsgClassA> data = _que.WaitAndPop();if (data == nullptr) {continue;}DealMsg(data);}std::cout << "ClassA thread exit " << std::endl;});}
};

在这里插入图片描述
在这里插入图片描述

http://www.xdnf.cn/news/1452295.html

相关文章:

  • `void 0` 与 `undefined` 深度解析
  • mysql安装(压缩包方式8.0及以上)
  • 2026届IC秋招联芸科技IC面经(完整面试题)
  • 从零开始学大模型之大语言模型
  • 大模型部署全攻略:Docker+FastAPI+Nginx搭建高可用AI服务
  • MindMeister AI版:AI思维导图工具高效生成框架,解决结构卡壳与逻辑优化难题
  • 十一、容器化 vs 虚拟化-K8s-Kustomize
  • Spark中的堆外和堆内内存以及内部行数据表示UnsafeRow
  • S 3.3深度学习--卷积神经网络--代码
  • (A题|烟幕干扰弹的投放策略)2025年高教杯全国大学生数学建模国赛解题思路|完整代码论文集合
  • 【mmcv自己理解】
  • “全结构化录入+牙位可视化标记”人工智能化python编程路径探析
  • 新电脑硬盘如何分区?3个必知技巧避免“空间浪费症”!
  • 如何监控员工的电脑?7款实用的员工电脑管理软件,探索高效管理捷径!
  • cursor+python轻松实现电脑监控
  • 【嵌入式DIY实例-ESP32篇】-倾斜弹跳球游戏
  • 小程序缓存数据字典
  • Android 项目:画图白板APP开发(三)——笔锋(多 Path 叠加)
  • 当液态玻璃计划遭遇反叛者:一场 iOS 26 界面的暗战
  • 用 Rust + Actix-Web 打造“Hello, WebSocket!”——从握手到回声,只需 50 行代码
  • Energy期刊论文学习——基于集成学习模型的多源域迁移学习方法用于小样本实车数据锂离子电池SOC估计
  • 邮件如何防泄密?这10个电子邮件安全解决方案真的好用,快收藏
  • Windows+Docker一键部署CozeStudio私有化,保姆级
  • 15、Docker构建前端镜像并运行
  • 计算机大数据毕业设计推荐:基于Spark的新能源汽车保有量可视化分析系统
  • 配置阿里云 YUM 源指南
  • IPV6之DHCPv6服务器和中继代理和前缀代理服务器客户端
  • 高并发商城 商品为了防止超卖,都做了哪些努力?
  • PostgreSQL18-FDW连接的 SCRAM 直通身份验证
  • 当便捷遇上复杂,低代码的路该怎么走?