当前位置: 首页 > backend >正文

C++实现线程池(5)计划线程池

五. ScheduledThreadPool 的实现

5.1 需求

在指定的时间间隔或时间点异步地执行任务。

5.2 SyncQueue 的设计和实现

SyncQueue 类代码

#ifndef SYNCQUEUE5_HPP
#define SYNCQUEUE5_HPP#include<vector>
#include<list>
#include<mutex>
#include<condition_variable>
#include<iostream>
#include<chrono>
#include<queue>
#include<functional>using namespace std;
using namespace std::chrono;using Task = std::function<void(void)>;struct PairTask
{size_t first;Task second;
public:PairTask() :first(0), second(nullptr) {}PairTask(const size_t& tp, const Task& s):first(tp), second(s){}operator size_t() const { return first; }~PairTask(){//cout << "destroy PairTask" << endl;}
};class SyncQueue
{
private:std::priority_queue<PairTask,std::vector<PairTask>,std::greater<PairTask>> m_taskQueues;size_t m_maxSize;         // 队列的大小mutable std::mutex m_mutex;std::condition_variable m_notEmpty; //对应于消费者std::condition_variable m_notFull;  //对应于生产者std::condition_variable m_condition_;size_t m_waitTime;                  //任务队列满等待时间 sbool m_needStop; //  true 同步队列停止工作bool IsFull() const{bool full = m_taskQueues.size() >= m_maxSize;if (full){//clog << " m_queue 已经满了,需要等待..." << endl;}return full;}bool IsEmpty() const{bool empty = m_taskQueues.empty();if (empty){// clog << "m_queue 已经空了,需要等待..." << endl;}return empty;}int Add(const PairTask& task){std::unique_lock<std::mutex> locker(m_mutex);bool waitret = m_notFull.wait_for(locker,std::chrono::seconds(m_waitTime),[=] { return m_needStop || !IsFull(); });if (!waitret){return 1;}if (m_needStop){return 2;}m_taskQueues.push(task);m_notEmpty.notify_all();return 0;}public:SyncQueue(int bucketsize = 10, int maxsize = 200, size_t timeout = 1):m_maxSize(maxsize),m_needStop(false),m_waitTime(timeout){}~SyncQueue(){}int Put(const PairTask& task)  // 0 ..m_bucketSize-1{return Add(task);}int Take(PairTask& task){std::unique_lock<std::mutex> locker(m_mutex);while (IsEmpty() && !m_needStop){m_notEmpty.wait(locker);}if (m_needStop)return 2;task = m_taskQueues.top();m_taskQueues.pop();m_notFull.notify_all();return 0;}void Stop(){std::unique_lock<std::mutex> locker(m_mutex);while (!m_needStop && !IsEmpty()){m_notFull.wait(locker);}m_needStop = true;m_notEmpty.notify_all();m_notFull.notify_all();}bool Empty() const{std::unique_lock<std::mutex> locker(m_mutex);return m_taskQueues.empty();}bool Full() const{std::unique_lock<std::mutex> locker(m_mutex);return m_taskQueues.size() >= m_maxSize;}size_t Size() const{std::unique_lock<std::mutex> locker(m_mutex);return m_taskQueues.size();}size_t Count() const{return Size();}
};#endif

5.3 ScheduledThreadPool 的设计和实现

ScheduledThreadPool 类代码

#ifndef SCHEDULEDTHREADPOOL_HPP
#define SCHEDULEDTHREADPOOL_HPP
#include"SyncQueue5.hpp"
#include<functional>
#include<future>
#include<memory>
#include<chrono>using namespace std;
using namespace std::chrono;class ScheduledThreadPool
{
private:SyncQueue m_queue;std::list<std::shared_ptr<std::thread>> m_threadgroup;std::atomic_bool m_running; // false;  // true;std::once_flag m_flag;void Start(int numthreads){m_running = true;for (int i = 0; i < numthreads; ++i){m_threadgroup.push_back(std::make_shared<std::thread>(std::thread(&ScheduledThreadPool::RunInThread, this)));}}void RunInThread(){while (m_running){PairTask task;if (m_queue.Take(task) == 0){task.second();}}}void StopThreadGroup(){m_queue.Stop();m_running = false;for (auto& tha : m_threadgroup){if (tha && tha->joinable()){tha->join();}}m_threadgroup.clear();}public:ScheduledThreadPool(const int qusize = 10, const int numthreads = 8):m_queue(qusize),m_running(false){std::call_once(m_flag, &ScheduledThreadPool::Start, this, numthreads);}~ScheduledThreadPool(){Stop();}void Stop(){std::call_once(m_flag, [this]() { StopThreadGroup(); });}template<class Func, class... Args>void Execute(int interval, Func&& func, Args&&... args){using RetType = decltype(func(args...));auto task = std::make_shared<std::function<void(void)>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));auto tk = PairTask(interval, [task]() { (*task)(); });if (m_queue.Put(tk) != 0){(*task)();}}template<class Func, class... Args>auto Submit(int interval, Func&& func, Args&&... args){using RetType = decltype(func(args...));auto task = std::make_shared<std::packaged_task<RetType(void)>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));std::future<RetType> result = task->get_future();auto tk = PairTask(interval, [task]() { (*task)(); });if (m_queue.Put(tk) != 0){(*task)();}return result;}
};#endif

测试代码

#include"ScheduledThreadPool.hpp"
#include<iostream>using namespace std;void Func(int a)
{cout << "func : " << a << endl;
}int main()
{ScheduledThreadPool mypool(10, 1);mypool.Execute(3, Func, 3);mypool.Execute(5, Func, 5);mypool.Execute(1, Func, 1);mypool.Execute(8, Func, 8);std::this_thread::sleep_for(seconds(2));cout << "main end ..." << endl;return 0;
}

5.4 适用场景

ScheduledThreadPool 适用于以下场景:

  1. 定时任务调度:ScheduledThreadPool 可以用于执行在固定时间间隔内运行的任务。例如,定期备份数据、清理临时文件、定时发送邮件等。
  2. 任务调度器:ScheduledThreadPool 可以用作任务调度器,用于安排并执行后续任务。例如,可以在程序启动后安排一些初始化任务执行,或者在程序结束前执行一些清理任务。
  3. 心跳任务:ScheduledThreadPool 可以用于执行心跳任务,即周期性地检测系统状态或发送心跳信号。例如,检查网络连接、检测硬件设备的状态等。
  4. 定时数据处理:ScheduledThreadPool 可以用于定时处理一些数据任务,例如在特定时间点对数据库进行优化、对日志进行归档等。
http://www.xdnf.cn/news/17082.html

相关文章:

  • python学智能算法(三十四)|SVM-KKT条件回顾
  • KGF75N65KDF-U/H KEC 集成电路IC 工业电机驱动
  • 加密视频流程教程分享
  • 移动商城平台适配:ZKmall开源商城鸿蒙 / 小程序端开发要点
  • Mark两个Redis for windows
  • 【概念学习】深度学习有何不同
  • 当前主流且经过市场验证的开源 BI 系统推荐
  • 【多模态微调】【从0开始】Qwen2-VL + llamafactory
  • C语言高级编程技巧与最佳实践
  • 面向流程和产品的安全档案论证方法
  • Jenkinsfile各指令详解
  • 脑洞大开——AI流程图如何改变思维?
  • C++ - 仿 RabbitMQ 实现消息队列--服务器模块实现
  • 【计算机网络 | 第3篇】物理媒介
  • 【计算机网络】王道考研笔记整理(3)数据链路层
  • 12、Docker Compose 安装 Redis
  • Baumer相机如何通过YoloV8深度学习模型实现农作物水稻病虫害的检测识别(C#代码UI界面版)
  • PHP官方及第三方下载地址全指南(2025最新版)
  • 芯片封装(DIP、SOP、QFP、QFN、BGA、LGA、PGA)
  • 加载量化模型
  • 第十八天:C++进制之间的转换
  • React 表单处理:移动端输入场景下的卡顿问题与防抖优化方案
  • 【文献分享】Machine learning models提供数据和代码
  • 当前就业形势下,软件测试工程师职业发展与自我提升的必要性
  • JSON巴巴 - 专业JSON格式化工具:让任何JSON都能完美格式化
  • 支持多网络协议的测试工具(postman被无视版)
  • Enhancing Long Video Question Answering with Scene-Localized Frame Grouping
  • 从“T+1”到“T+0”:基于SQL构建MES到数据仓库的数据采集通道
  • SassSCSS:让CSS拥有超能力的预处理器
  • LVS-DR模式高性能负载均衡实战