当前位置: 首页 > java >正文

C++:实现线程池

线程池(Thread Pool)是一种多线程处理方式,用于管理和复用多个线程,以提高程序的并发性能并避免频繁创建和销毁线程所带来的开销。

基本概念

线程池维护着若干个已创建好的线程,当有任务需要执行时,就从线程池中取出空闲线程来执行任务,任务执行完成后线程并不被销毁,而是返回线程池中继续等待执行下一个任务。

主要优点

  1. 减少资源消耗:重复利用已创建的线程,避免频繁创建和销毁线程的开销。
  2. 提高响应速度:任务到达时可以立即执行(如果有空闲线程),而不必等待创建新线程。
  3. 提高线程可管理性:线程池可以设置线程的最大并发数,防止系统因为创建过多线程而耗尽资源。
  4. 任务排队机制:可以统一调度和管理任务,比如按顺序执行、设置优先级、延迟执行等。

使用场景

  1. Web 服务器:处理高并发的客户端请求,复用线程,提升效率。
  2. 数据库连接池:减少数据库连接的创建和销毁,提高性能。
  3. 任务并行处理:同时处理多个独立任务,减少总执行时间。
  4. 后台定时任务:定时执行任务或后台处理,避免阻塞主线程。
  5. IO密集型任务:提高系统资源利用率,如网络请求或磁盘读写。

实现

class ThreadPool {
public:// 默认构造函数ThreadPool() = default;// 移动构造函数ThreadPool(ThreadPool&&) = default;// 带线程数参数的构造函数,默认启动 8 个线程explicit ThreadPool(size_t thread_count = 8) : pool_(std::make_shared<Pool>()) {// 启动 thread_count 个线程for (size_t i = 0; i < thread_count; ++i) {std::thread([this]() {// 每个线程执行的任务std::unique_lock<std::mutex> locker(pool_->mtx_);while (true) {// 如果任务队列不为空,则取任务并执行if (!pool_->tasks_.empty()) {// 从任务队列中取出任务并执行auto task = std::move(pool_->tasks_.front());pool_->tasks_.pop();locker.unlock();  // 执行任务时不需要保持锁task();  // 执行任务locker.lock();  // 执行完任务后重新加锁} // 如果任务队列为空且线程池未关闭,则等待新任务else if (!pool_->stop_) {pool_->cond_.wait(locker);  // 等待任务或关闭信号} // 如果线程池已关闭,退出线程else {break;}}}).detach();  // 分离线程,确保线程池的生命周期独立于调用者}}// 析构函数,关闭线程池,确保所有线程停止~ThreadPool() {if (pool_) {// 设置停止标志,通知所有线程退出std::lock_guard<std::mutex> locker(pool_->mtx_);pool_->stop_ = true;}// 唤醒所有等待中的线程,让它们退出pool_->cond_.notify_all();}// 添加任务到任务队列template<class T>void addTask(T&& task) {std::lock_guard<std::mutex> locker(pool_->mtx_);  // 加锁确保线程安全pool_->tasks_.push(std::forward<T>(task));  // 将任务添加到队列pool_->cond_.notify_one();  // 通知一个线程执行任务}private:// 用于管理线程池的内部结构struct Pool {bool stop_;  // 标志线程池是否关闭std::mutex mtx_;  // 保护任务队列的互斥锁std::condition_variable cond_;  // 条件变量,用于等待任务std::queue<std::function<void()>> tasks_;  // 存储任务的队列};// 线程池的共享资源std::shared_ptr<Pool> pool_; 
};

更高级的实现,A simple C++11 Thread Pool implementation

class ThreadPool {
public:// 构造函数,传入线程数量ThreadPool(size_t);// 添加任务到线程池,返回一个 future 对象,用于获取异步任务结果template<class F, class... Args>auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>;// 析构函数,回收线程~ThreadPool();private:// 工作线程集合,用于管理线程对象std::vector<std::thread> workers;// 任务队列,存放待执行的任务(函数对象)std::queue<std::function<void()>> tasks;// 互斥锁,保证任务队列线程安全std::mutex queue_mutex;// 条件变量,用于线程之间通信,通知有新任务或者停止std::condition_variable condition;// 停止标志,标识线程池是否停止接收新任务bool stop;
};// 构造函数:初始化线程池,创建 threads 个线程
inline ThreadPool::ThreadPool(size_t threads): stop(false) // 初始时线程池不停止
{for (size_t i = 0; i < threads; ++i)// 创建线程并加入 workers 容器workers.emplace_back([this] {for (;;) { // 每个线程不断循环取任务执行std::function<void()> task;{// 加锁,保证线程安全访问任务队列std::unique_lock<std::mutex> lock(this->queue_mutex);// 如果任务队列为空且线程池未关闭,则阻塞等待this->condition.wait(lock,[this] { return this->stop || !this->tasks.empty(); });// 如果线程池关闭且任务队列为空,退出线程if (this->stop && this->tasks.empty())return;// 从任务队列取出一个任务task = std::move(this->tasks.front());this->tasks.pop();}// 执行任务task();}});
}// 添加任务到线程池,并返回 future 获取结果
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>
{// 定义任务返回值类型using return_type = typename std::result_of<F(Args...)>::type;// 将传入的函数和参数绑定,生成 packaged_task(带返回值的可调用对象)auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 获取该任务对应的 future,用于异步获取结果std::future<return_type> res = task->get_future();{// 加锁保护任务队列std::unique_lock<std::mutex> lock(queue_mutex);// 如果线程池已经停止,抛异常if (stop)throw std::runtime_error("enqueue on stopped ThreadPool");// 将任务加入任务队列,任务是一个无参 void() 函数对象tasks.emplace([task]() { (*task)(); });}// 通知一个线程有新任务了condition.notify_one();// 返回 futurereturn res;
}// 析构函数,停止线程池,回收所有线程
inline ThreadPool::~ThreadPool()
{{// 加锁修改停止标志std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}// 唤醒所有阻塞中的线程,让它们检查 stop 状态并退出condition.notify_all();// 等待所有线程结束for (std::thread &worker : workers)worker.join();
}

测试

#include <iostream>
#include "threadpool.h"int main() {ThreadPool pool(4);for (int i = 0; i < 10; ++i) {pool.addTask([i] {std::cout << "Task " << i << " running in thread " << std::this_thread::get_id() << std::endl;});}std::this_thread::sleep_for(std::chrono::seconds(1));  // 简单等一下
}

image.png

http://www.xdnf.cn/news/4223.html

相关文章:

  • 【Spring Boot 注解】@SpringBootApplication
  • 力扣-hot100 (矩阵置零)
  • C++命名空间
  • Windows11下通过Docker安装mysql8.0
  • FPGA----基于ZYNQ 7020实现petalinux文件持久化存储
  • Linux主机时间设置操作指南及时间异常影响
  • LeetCode 解题思路 45(Hot 100)
  • 科普文:丰田凯美瑞三代混动(THS II)技术解析
  • Golang领域Beego框架的中间件开发实战
  • 【Linux】用户与组管理
  • Fastjson 从多层级的JSON数据中获取特定字段的值
  • Transformer中的三种注意力机制
  • 开源模型应用落地-qwen模型小试-Qwen3-8B-推理加速-vLLM-结构化输出(三)
  • Copilot for PPT 可直接用模板创建品牌演示文稿
  • 【Python-Day 10】Python 循环控制流:while 循环详解与 for 循环对比
  • 文件上传/读取/包含漏洞技术说明
  • MySQL中有哪几种锁?
  • 【“星瑞” O6 评测】 — 车辆速度估计
  • 【区块链】Uniswap之滑点(Slippage)
  • Java 检查某个点是否存在于圆扇区内(Check whether a point exists in circle sector or not)
  • springBoot中自定义一个validation注解,实现指定枚举值校验
  • LINUX——例行性工作
  • 私有仓库 Harbor、GitLab
  • K8S使用--dry-run输出资源模版和兼容性测试
  • Django缓存框架API
  • 物理服务器紧急救援:CentOS系统密码重置全流程实战指南
  • 如何添加或删除极狐GitLab 项目成员?
  • JPress安装(Docker)
  • 如何在使用 docker-compose 命令时指定 COMPOSE_PROJECT_NAME ?
  • 概统期末复习--速成