当前位置: 首页 > news >正文

Linux——简单线程池封装案例

文章目录

  • Linux C++ 线程池技术:从理论到实践的高性能并发编程
    • 引言
    • 一、 线程池的基本概念
      • 1.1 线程池的定义与作用
      • 1.2 线程池的适用场景
      • 1.3 线程池的优势
    • 二、 Linux 线程基础
      • 2.1 POSIX 线程(pthread)简介
      • POSIX 线程库常用函数及详解
        • pthread_create
        • pthread_join
        • pthread_exit
        • pthread_detach
        • pthread_attr_init
        • pthread_attr_destroy
        • pthread_attr_setdetachstate
      • 2.2 线程创建与同步机制
      • 互斥锁(Mutex)
        • pthread_mutex_init
        • pthread_mutex_lock
        • pthread_mutex_unlock
      • 条件变量(Condition Variable)
        • pthread_cond_init
        • pthread_cond_wait
        • pthread_cond_signal
        • pthread_cond_broadcast
      • 2.3 线程安全与竞态条件
    • 三、 C++ 线程池实现的关键组件
      • 3.1 任务队列(Task Queue)的设计
      • 3.2 线程管理(Thread Workers)
      • 3.3 线程同步与任务调度
    • 四、 线程池的核心实现步骤
      • 条件变量封装
      • 自动锁封装
      • 手动锁封装
      • 简单日志封装(方便调试)
      • 线程封装
      • 线程池封装

Linux C++ 线程池技术:从理论到实践的高性能并发编程

引言

在现代多核CPU架构下,并发编程是挖掘硬件性能、提升应用吞吐量的关键技术。然而,频繁地创建和销毁线程会带来显著的性能开销,并可能导致系统资源耗尽。线程池(Thread Pool)作为一种经典的并发编程模式,应运而生。它通过预先创建并管理一组线程,实现了任务提交与任务执行的解耦,是构建高性能、高可靠性C++服务的基石。本文将探讨Linux环境下使用C++实现线程池的核心概念、实现细节。

一、 线程池的基本概念

1.1 线程池的定义与作用

线程池是一种多线程处理形式,其核心思想是预先创建一定数量的线程,放入一个“池子”中管理。当有任务需要执行时,它并不立即创建一个新线程,而是将任务提交到池中的任务队列中,等待一个空闲线程从队列中取出并执行。

它的主要作用可以概括为:

  • 资源复用:避免线程频繁创建和销毁带来的性能损耗(如内存分配、系统调用)。

  • 流量控制:通过队列缓冲,平滑处理突发的大量任务请求,防止系统过载。

  • 统一管理:方便对线程进行统一的监控、管理和调优。

1.2 线程池的适用场景


高并发服务器:如Web服务器、文件服务器、游戏服务器,需要快速响应大量网络请求。

异步任务处理:如后台日志记录、数据批量处理、邮件发送等不阻塞主线程的任务。

并行计算:将一个大型计算任务拆分成多个小任务,由线程池并行处理,加速计算。

1.3 线程池的优势

性能优化:线程复用降低了创建销毁的开销,任务队列减少了调度延迟。

资源管理:限制了系统最大并发线程数,防止因线程过多导致内存耗尽或过度上下文切换。

稳定性提升:对异步任务进行了统一管理,避免了野线程和资源泄漏问题。

二、 Linux 线程基础

2.1 POSIX 线程(pthread)简介

在Linux上,线程通常通过POSIX线程库(pthread)实现。虽然C++11提供了标准的std::thread,但理解其底层基础(通常是pthread)对于调试和深度优化至关重要。

POSIX 线程库常用函数及详解

pthread_create

用于创建一个新的线程。

int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);
  • thread:指向线程标识符的指针。
  • attr:线程属性,通常设为 NULL 使用默认属性。
  • start_routine:线程运行的函数入口。
  • arg:传递给线程函数的参数。
  • 返回值:成功返回 0,失败返回错误码。
pthread_join

等待指定线程终止并回收资源。

int pthread_join(pthread_t thread, void **retval);
  • thread:目标线程标识符。
  • retval:存储线程退出状态的指针,可为 NULL
  • 返回值:成功返回 0,失败返回错误码。
pthread_exit

终止调用线程并返回状态值。

void pthread_exit(void *retval);
  • retval:线程退出状态,可被其他线程通过 pthread_join 获取。
pthread_detach

分离线程,使其终止时自动释放资源。

int pthread_detach(pthread_t thread);
  • thread:目标线程标识符。
  • 返回值:成功返回 0,失败返回错误码。
pthread_attr_init

初始化线程属性对象。

int pthread_attr_init(pthread_attr_t *attr);
  • attr:指向属性对象的指针。
  • 返回值:成功返回 0,失败返回错误码。
pthread_attr_destroy

销毁线程属性对象。

int pthread_attr_destroy(pthread_attr_t *attr);
  • attr:目标属性对象。
  • 返回值:成功返回 0,失败返回错误码。
pthread_attr_setdetachstate

设置线程的分离状态属性。

int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
  • attr:属性对象。
  • detachstatePTHREAD_CREATE_JOINABLE(默认)或 PTHREAD_CREATE_DETACHED
  • 返回值:成功返回 0,失败返回错误码。

2.2 线程创建与同步机制

实现线程池,本质上是管理多个线程协同工作,这离不开线程同步机制。

互斥锁(Mutex)

用于保护共享资源(如任务队列),防止多个线程同时访问导致数据竞争(Data Race)。

pthread_mutex_init

初始化互斥锁。

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
  • mutex:指向互斥锁的指针。
  • attr:锁属性,通常设为 NULL 使用默认属性。
  • 返回值:成功返回 0,失败返回错误码。
pthread_mutex_lock

加锁,若锁已被占用则阻塞。

int pthread_mutex_lock(pthread_mutex_t *mutex);
  • mutex:目标互斥锁。
  • 返回值:成功返回 0,失败返回错误码。
pthread_mutex_unlock

解锁。

int pthread_mutex_unlock(pthread_mutex_t *mutex);
  • mutex:目标互斥锁。
  • 返回值:成功返回 0,失败返回错误码。

C++标准库提供了std::mutex,其底层在Linux通常是pthread_mutex_t。

条件变量(Condition Variable)

用于线程间的通信与同步。当某个条件不满足时(如任务队列为空),线程可以阻塞在条件变量上等待;当条件可能满足时(如新任务到来),通知等待的线程。

pthread_cond_init

初始化条件变量。

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
  • cond:指向条件变量的指针。
  • attr:属性,通常设为 NULL
  • 返回值:成功返回 0,失败返回错误码。
pthread_cond_wait

阻塞等待条件变量被触发。

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
  • cond:目标条件变量。
  • mutex:关联的互斥锁,调用时会自动释放锁。
  • 返回值:成功返回 0,失败返回错误码。
pthread_cond_signal

唤醒一个等待该条件变量的线程。

int pthread_cond_signal(pthread_cond_t *cond);
  • cond:目标条件变量。
  • 返回值:成功返回 0,失败返回错误码。
pthread_cond_broadcast

唤醒所有等待该条件变量的线程。

int pthread_cond_broadcast(pthread_cond_t *cond);
  • cond:目标条件变量。
  • 返回值:成功返回 0,失败返回错误码。

C++标准库提供了std::condition_variable。

2.3 线程安全与竞态条件

竞态条件(Race Condition) 是指程序的输出依赖于不受控制的事件序列。线程池的核心数据结构——任务队列,是一个典型的共享资源,必须通过互斥锁等手段使其成为线程安全(Thread-Safe) 的,以消除竞态条件。

三、 C++ 线程池实现的关键组件

一个最小化的线程池通常包含以下三个核心组件:

3.1 任务队列(Task Queue)的设计

任务队列是生产者和消费者之间的桥梁。生产者线程(如主线程)提交任务,消费者线程(工作线程)从队列中获取任务。

数据类型:通常使用std::function<void()>或自定义函数对象来封装任何可调用任务。

容器选择:std::queue或std::deque是简单直接的选择。对于高级场景,可以考虑优先级队列(如std::priority_queue)以实现带优先级的调度。

3.2 线程管理(Thread Workers)

工作线程是线程池中的劳动者,它们的行为模式是一个循环:

等待条件变量通知(有任务或需要停止)。

在互斥锁的保护下,从任务队列中取出一个任务。

释放锁,执行取出的任务。

回到步骤1,等待下一个任务。

3.3 线程同步与任务调度

同步机制将上述组件粘合在一起:

std::mutex:保护对任务队列、停止标志等所有共享状态的访问。

std::condition_variable:

生产者在添加新任务后,调用notify_one()或notify_all()来通知可能正在等待的工作线程。

消费者在等待任务时,使用条件变量的wait()方法,并在等待时自动释放锁,避免忙等待。

四、 线程池的核心实现步骤

条件变量封装

#pragma once
#include <pthread.h>
#include<iostream>namespace CondtionModule
{class Cond{public:// 构造函数,初始化条件变量Cond(){pthread_cond_init(&_cond, nullptr);}// 等待条件变量,同时释放互斥锁// @param mutex 互斥锁引用void Wait(pthread_mutex_t& mutex){int n = pthread_cond_wait(&_cond, &mutex);if(n < 0)std::cerr << "Cond Wait failed!" << std::endl;}// 唤醒一个等待该条件变量的线程void Signal(){int n = pthread_cond_signal(&_cond);if(n < 0)std::cerr << "Cond Signal failed!" << std::endl;}// 唤醒所有等待该条件变量的线程void Broadcast(){int n = pthread_cond_broadcast(&_cond);if(n < 0)std::cerr << "Cond Broadcast failed!" << std::endl;}// 析构函数,销毁条件变量~Cond(){int n = pthread_cond_destroy(&_cond);if(n < 0)std::cerr << "~Cond failed! " << std::endl;}private:pthread_cond_t _cond; // 条件变量};
}

自动锁封装

#ifndef __LOCKGUARD_HPP__  // 防止头文件重复包含的宏定义
#define __LOCKGUARD_HPP__  // 定义头文件宏#include <pthread.h>       // 引入POSIX线程库头文件namespace LockGuardModule  // 定义命名空间封装锁守卫类
{class Lockguard        // RAII风格的互斥锁守卫类{public:// 构造函数:初始化并锁定互斥锁Lockguard(pthread_mutex_t mutex): _mutex(mutex)  // 初始化成员变量{pthread_mutex_init(&_mutex, nullptr);  // 初始化互斥锁pthread_mutex_lock(&_mutex);          // 加锁(阻塞直到获取锁)}// 析构函数:自动释放互斥锁~Lockguard(){pthread_mutex_unlock(&_mutex);  // 解锁}private:pthread_mutex_t _mutex;  // 封装的POSIX互斥锁对象};
}#endif  // 结束头文件宏

手动锁封装

以下是为提供的 C++ 互斥锁(Mutex)头文件代码生成的完整注释,所有注释集中展示:

#ifndef __MUTEX_HPP__          // 防止头文件重复包含的宏定义
#define __MUTEX_HPP__          // 定义宏标识符
#include<pthread.h>            // 引入POSIX线程库(包含互斥锁API)
#include<unistd.h>             // 引入Unix标准库(提供系统调用接口)
#include<iostream>             // 引入标准输入输出流库(未实际使用)namespace MutexModule          // 定义命名空间隔离互斥锁实现
{class Mutex                // 互斥锁封装类{public:Mutex()                // 构造函数:初始化互斥锁{pthread_mutex_init(&_mutex, nullptr);  // 使用默认属性初始化互斥锁}void Lock()            // 加锁方法:阻塞直到获取锁{pthread_mutex_lock(&_mutex);  // 调用POSIX加锁函数}void Unlock()          // 解锁方法:释放锁{pthread_mutex_unlock(&_mutex);  // 调用POSIX解锁函数}pthread_mutex_t& Getmutex()  // 获取底层互斥锁引用(通常用于条件变量){return _mutex;     // 返回内部互斥锁对象}~Mutex()               // 析构函数:销毁互斥锁{pthread_mutex_destroy(&_mutex);  // 释放互斥锁资源}private:pthread_mutex_t _mutex;  // 内部POSIX互斥锁对象};
}#endif                        // 结束条件编译指令

简单日志封装(方便调试)

#ifndef __LOG_HPP__
#define __LOG_HPP__#include <iostream>
#include <ctime>
#include <string>
#include <pthread.h>
#include <sstream>
#include <fstream>
#include <filesystem>
#include <unistd.h>
#include <memory>
#include "Mutex.hpp"
#include "Lockguard.hpp"namespace LogModule
{using namespace MutexModule;// 默认日志文件路径和文件名std::string default_path = "./log/";std::string default_file = "log.txt";// 日志级别枚举:从调试信息到致命错误,级别依次升高enum LogLevel{DEBUG,    // 调试信息,用于开发阶段排查问题INFO,     // 普通信息,记录程序运行状态WARNING,  // 警告信息,表明可能存在潜在问题ERROR,    // 错误信息,表示功能无法正常执行FATAL     // 致命错误,可能导致程序崩溃};// 将日志级别枚举转换为对应的字符串表示std::string LogLevelToString(LogLevel level){switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOWN";  // 未知级别,用于错误处理}}// 获取当前时间的格式化字符串(线程安全版本)std::string GetCurrentTime(){std::time_t time = std::time(nullptr);      // 获取当前时间戳struct tm stm;localtime_r(&time, &stm);                  // 线程安全的时间转换函数// 格式化时间字符串:年-月-日-时-分-秒char buff[128];snprintf(buff, sizeof(buff), "%4d-%02d-%02d-%02d-%02d-%02d",stm.tm_year + 1900,   // 年份需要加1900stm.tm_mon + 1,       // 月份从0开始,需要加1stm.tm_mday,          // 日stm.tm_hour,          // 时stm.tm_min,           // 分stm.tm_sec);          // 秒return buff;}// 日志策略抽象基类:定义日志输出的统一接口class Logstrategy{public:virtual ~Logstrategy() = default;           // 虚析构函数确保正确释放派生类资源virtual void syncLog(std::string &message) = 0;  // 纯虚函数,同步输出日志消息};// 控制台日志策略:将日志输出到标准错误流(std::cerr)class ConsoleLogstrategy : public Logstrategy{public:void syncLog(std::string &message) override{std::cerr << message << std::endl;  // 输出到控制台并换行}~ConsoleLogstrategy() override{// 析构函数,清理资源(此处无特殊资源需要清理)}};// 文件日志策略:将日志输出到指定文件class FileLogstrategy : public Logstrategy{public:// 构造函数:初始化文件路径和文件名,确保日志目录存在FileLogstrategy(std::string filepath = default_path, std::string filename = default_file){_mutex.Lock();  // 加锁保护共享资源访问_filepath = filepath;_filename = filename;// 检查日志目录是否已存在if (std::filesystem::exists(filepath)) {_mutex.Unlock();  // 目录存在,直接返回return;}   try{// 递归创建目录(包括所有父目录)std::filesystem::create_directories(filepath);}catch (const std::filesystem::filesystem_error &e){// 目录创建失败,输出错误信息到标准错误std::cerr << e.what() << '\n';}_mutex.Unlock();  // 解锁}// 同步输出日志到文件void syncLog(std::string &message) override{_mutex.Lock();  // 加锁保护文件操作// 构建完整文件路径,处理路径分隔符std::string path =_filepath.back() == '/' ? _filepath + _filename : _filepath + "/" + _filename;// 以追加模式打开文件std::ofstream out(path, std::ios::app);if (!out.is_open()){_mutex.Unlock();  // 解锁后再返回std::cerr << "file open fail!" << '\n';  // 文件打开失败提示return;}out << message << '\n';  // 写入日志消息并换行_mutex.Unlock();         // 解锁out.close();             // 关闭文件}~FileLogstrategy(){// 析构函数,清理资源}private:std::string _filepath;  // 日志文件路径std::string _filename;  // 日志文件名Mutex _mutex;           // 互斥锁,保护文件操作};// 主日志类:提供日志记录的核心功能class Log{public:// 构造函数:默认使用控制台输出策略Log(){_logstrategy = std::make_unique<ConsoleLogstrategy>();}// 切换到控制台输出策略void useconsolestrategy(){_logstrategy = std::make_unique<ConsoleLogstrategy>();printf("转换控制台策略!\n");}// 切换到文件输出策略void usefilestrategy(){_logstrategy = std::make_unique<FileLogstrategy>();printf("转换文件策略!\n");}// 日志消息类:负责构建单条日志消息class LogMessage{public:// 构造函数:初始化日志消息的基本信息LogMessage(LogLevel level, std::string file, int line, Log &log): _loglevel(level), _time(GetCurrentTime())  // 获取当前时间, _file(file), _pid(getpid())  // 获取进程ID, _line(line),_log(log)  // 引用外部Log对象{// 构建日志消息头:包含时间、级别、进程ID、文件名、行号等信息std::stringstream ss;ss << "[" << _time << "] "<< "[" << LogLevelToString(_loglevel) << "] "<< "[" << _pid << "] "<< "[" << _file << "] "<< "[" << _line << "] "<< "- ";_loginfo = ss.str();  // 保存格式化后的消息头}// 重载<<运算符:支持链式输出,将各种类型数据追加到日志消息template <typename T>LogMessage &operator<<(const T &t){std::stringstream ss;ss << _loginfo << t;  // 将新内容追加到现有消息_loginfo = ss.str();  // 更新消息内容return *this;  // 返回自身引用,支持链式调用}// 析构函数:在对象销毁时实际输出日志~LogMessage(){// 检查日志策略是否有效if (_log._logstrategy){// 调用具体策略输出日志消息_log._logstrategy->syncLog(_loginfo);}}private:LogLevel _loglevel;    // 日志级别std::string _time;     // 时间戳pid_t _pid;            // 进程IDstd::string _file;     // 源文件名int _line;             // 源代码行号std::string _loginfo;  // 完整的日志消息内容Log &_log;             // 对外部Log对象的引用};// 重载函数调用运算符:创建LogMessage对象开始记录日志LogMessage operator()(LogLevel level, std::string filename, int line){return LogMessage(level, filename, line, *this);}~Log(){// 析构函数,自动释放unique_ptr管理的策略对象}private:std::unique_ptr<Logstrategy> _logstrategy;  // 智能指针管理日志策略对象};Log logger;  // 全局日志对象实例// 宏定义:简化日志调用,自动填充文件名和行号
#define Log(type) logger(type, __FILE__, __LINE__)// 宏定义:切换到控制台输出策略
#define ENABLE_LOG_CONSOLE_STRATEGY() logger.useconsolestrategy()// 宏定义:切换到文件输出策略
#define ENABLE_LOG_FILE_STRATEGY() logger.usefilestrategy()
}#endif

线程封装

#ifndef __PTHREAD_HPP__
#define __PTHREAD_HPP__#include <pthread.h>
#include <string>
#include <iostream>
#include <functional>namespace PthreadModule
{uint32_t order = 1; // 全局计数器,用于生成线程唯一名称// 线程状态枚举:表示线程的生命周期状态enum ThreadStatus{RUNNING,  // 线程正在运行NEW,      // 线程已创建但未启动STOPPED   // 线程已停止};// 线程函数类型定义:使用std::function包装返回void且无参数的函数using Thread_func_t = std::function<void()>;class Pthread{private:// 设置线程名称:基于全局计数器生成唯一名称void setname(){_thread_name = "Thread——" + std::to_string(order++);}// 静态线程入口函数:符合pthread_create要求的函数签名static void *run(void *arg){Pthread* thr = static_cast<Pthread*>(arg); // 将void*转换回Pthread对象指针thr->_thread_func(); // 执行用户提供的线程函数return nullptr; // 线程正常退出,返回空指针}public:// 构造函数:初始化线程对象,接收用户线程函数Pthread(Thread_func_t func_t): _thread_func(func_t)    // 用户提供的线程函数, _thread_status(NEW)     // 初始状态为NEW(未启动), _isjoineable(true)      // 默认可join, _isjoin(false)          // 初始未join{setname(); // 自动设置线程名称}// 设置线程为分离状态:分离后无需join,线程结束后自动释放资源bool setDetach(){if (_isjoineable) // 只有可join的线程才能设置为分离{pthread_detach(_thread_id); // 调用pthread_detach设置分离属性_isjoineable = false;       // 更新状态为不可joinreturn true;}return false; // 如果已经是分离状态,返回false}// 启动线程:创建真正的操作系统线程bool start(){if (_thread_status != ThreadStatus::RUNNING) // 确保线程未在运行{int n = pthread_create(&_thread_id, nullptr, run, (void *)this);if (n < 0) // pthread_create成功返回0,失败返回错误码return false;if (!_isjoineable) // 如果已经设置为分离状态{setDetach(); // 确保线程属性为分离}_thread_status = ThreadStatus::RUNNING; // 更新线程状态为运行中return true;}return false; // 线程已在运行,启动失败}// 取消线程:请求终止目标线程的执行bool cancel(){if (_thread_status == ThreadStatus::RUNNING) // 只有运行中的线程才能取消{int n = pthread_cancel(_thread_id); // 发送取消请求if (n < 0) // 成功返回0,失败返回错误码return false;return true;}return false; // 线程未运行,取消失败}// 等待线程结束并回收资源:阻塞调用线程直到目标线程结束bool join(){if (_thread_status == ThreadStatus::RUNNING) // 只有运行中的线程才能join{int n = pthread_join(_thread_id, &_ret); // 等待线程结束并获取返回值_isjoin = true; // 标记已joinif (n < 0) // 成功返回0,失败返回错误码return false;return true;}return false; // 线程未运行,join失败}// 获取线程名称std::string Getname(){return _thread_name;}// 析构函数:确保资源正确释放~Pthread(){if(_isjoineable && !_isjoin) // 如果线程可join但尚未join{join(); // 自动等待线程结束,避免资源泄漏}}private:pthread_t _thread_id;        // 操作系统线程IDstd::string _thread_name;    // 线程自定义名称ThreadStatus _thread_status; // 线程当前状态Thread_func_t _thread_func;  // 用户提供的线程函数void* _ret;                  // 线程返回值(暂未使用)bool _isjoineable;           // 标记线程是否可join(非分离状态)bool _isjoin;                // 标记线程是否已被join};}#endif // __PTHREAD_HPP__

线程池封装

#pragma once#include "Log.hpp"
#include "Pthread.hpp"
#include "Cond.hpp"
#include <queue>namespace ThreadPoolModule
{using namespace PthreadModule;using namespace MutexModule;using namespace LockGuardModule;using namespace CondtionModule;using namespace LogModule;template<typename T>class Thread_pool{private:// 线程处理函数:工作线程的主循环,不断从任务队列中取出并执行任务void HandlerTask(){while(true){_mutex.Lock(); // 加锁保护共享资源访问// 等待条件:任务队列不为空或线程池停止运行while(_task_queue.empty() && _isrunning){_waitnum++; // 增加等待线程计数_cond.Wait(_mutex.Getmutex()); // 等待条件变量通知(自动释放锁)// 被唤醒后_waitnum--; // 减少等待线程计数}// 检查退出条件:线程池已停止且任务队列为空if(_task_queue.empty() && !_isrunning){_mutex.Unlock(); // 解锁后再退出break; // 退出线程循环}// 获取任务并执行T t = _task_queue.front(); // 从队列头部取任务_task_queue.pop(); // 移除已取出的任务_mutex.Unlock(); // 解锁,允许其他线程访问队列t(); // 执行任务(函数对象调用)}}// 私有构造函数:实现单例模式,防止外部创建多个实例Thread_pool(int poolsize = 5): _pool_size(poolsize) // 初始化线程数量, _waitnum(0)          // 初始化等待线程数为0, _isrunning(false)    // 初始状态为未运行{// 创建指定数量的工作线程for(int i = 0; i < poolsize; i++){// 绑定线程处理函数到每个线程_threads.emplace_back(std::bind(&Thread_pool<T>::HandlerTask, this));}}public:// 删除拷贝构造函数和赋值运算符,确保单例唯一性Thread_pool(Thread_pool<T>&) = delete;Thread_pool<T>& operator=(const Thread_pool<T>&) = delete;// 获取线程池单例实例(线程安全版本)static Thread_pool<T>* Getinstance(){// 双重检查锁定:提高性能,避免每次获取实例都加锁if(_instance == nullptr) // 第一次检查(无锁){Lockguard lock(_lock.Getmutex()); // 加锁保护创建过程if(_instance == nullptr) // 第二次检查(加锁后){_instance = new Thread_pool<T>(); // 创建单例实例_instance->Run(); // 启动线程池return _instance;}}return _instance; // 返回已存在的实例}// 启动线程池:让所有工作线程开始运行bool Run(){if(_isrunning == false) // 确保线程池未在运行{for(auto& thread : _threads) // 遍历所有线程{thread.start(); // 启动线程Log(LogLevel::DEBUG) << thread.Getname() << "start!" ; // 记录日志}_isrunning = true; // 设置运行标志return true;}return false; // 如果已在运行,返回false}// 停止线程池:通知所有工作线程停止bool Stop(){if(_isrunning == false) // 如果已经停止,直接返回return false;_mutex.Lock(); // 加锁保护状态修改if(_isrunning == true){_isrunning = false; // 设置停止标志_cond.Broadcast(); // 广播通知所有等待的线程Log(LogLevel::DEBUG) << "Stop sucess!" ; // 记录日志_mutex.Unlock(); // 解锁return true;}_mutex.Unlock(); // 解锁return false;}// 等待所有工作线程结束:阻塞直到所有线程完成退出bool Wait(){for(auto& thread : _threads) // 遍历所有线程{if(thread.join()) // 等待线程结束Log(LogLevel::INFO) << thread.Getname() << " 退出..."; // 成功退出日志elseLog(LogLevel::ERROR) << "Wait error!"; // 等待失败日志}return true;}// 向任务队列添加任务(生产者接口)bool Enqueue(const T& t){_mutex.Lock(); // 加锁保护队列if(_isrunning == true) // 确保线程池在运行{_task_queue.push(t); // 任务入队if(_waitnum > 0) // 如果有线程在等待任务{_cond.Signal(); // 通知一个等待线程Log(LogLevel::DEBUG) << "唤醒一个线程!"; // 记录日志}Log(LogLevel::DEBUG) << "任务入队列成功!"; // 记录日志_mutex.Unlock(); // 解锁return true;}_mutex.Unlock(); // 解锁return false; // 线程池未运行,入队失败}// 析构函数~Thread_pool(){// 注意:单例对象通常需要显式释放,此处未实现释放逻辑}private:int _pool_size;             // 线程池大小(工作线程数量)std::vector<Pthread> _threads; // 工作线程集合Mutex _mutex;               // 互斥锁,保护任务队列和共享状态Cond _cond;                 // 条件变量,用于线程间同步int _waitnum;               // 当前等待任务的线程数量bool _isrunning;            // 线程池运行状态标志std::queue<T> _task_queue;  // 任务队列(FIFO)public:static Thread_pool<T>* _instance; // 单例实例指针static Mutex _lock;          // 用于保护单例创建的静态锁};// 静态成员初始化template<typename T>Thread_pool<T>* Thread_pool<T>::_instance = nullptr;template<typename T>Mutex Thread_pool<T>::_lock;
}
http://www.xdnf.cn/news/1431055.html

相关文章:

  • Sping Web MVC入门
  • 【机器学习深度学习】向量检索到重排序:RAG 系统中的优化实践
  • 关于ANDROUD APPIUM安装细则
  • 分页功能设计
  • MYSQL配置复制拓扑知识点
  • 【54页PPT】数字化转型数据中台解决方案(附下载方式)
  • spring boot 整合AI教程
  • 解析ELK(filebeat+logstash+elasticsearch+kibana)日志系统原理以及k8s集群日志采集过程
  • Unity学习----【数据持久化】二进制数据(五)--由Excel自动生成数据结构类与二进制文件
  • 【常见的几款棋牌室计时软件】佳易王棋牌室计时计费软件大众版,佳易王棋牌室计时计费高级版,两款软件有何不同,适配不同场景需求#软件具体教程详解
  • react+taro的使用整理
  • 将 .vcproj 文件转换为 .pro 文件
  • 企业DevOps的安全与合规关键:三大主流DevOps平台能力对比
  • 认识⼯作区、暂存区、版本库
  • Wireshark笔记-DHCP两步交互流程与数据解析
  • 简单爬一个小说页面 HTML 的title和内容
  • 基于STM32单片机智能家居wifi远程监控系统机智云app设计
  • Zookeeper分布式锁原理
  • 域名备案成功后怎么还显示没有注册
  • 基于vue3和springboot框架集成websocket
  • springboot项目使用websocket功能,使用了nginx反向代理后连接失败问题解决
  • DASK shuffle任务图分析
  • ansible循环
  • 零依赖每月工作计划备忘录:高效管理你的每一天
  • TSMC-1987《Convergence Theory for Fuzzy c-Means: Counterexamples and Repairs》
  • 电动车动力电池自动点焊机|深圳比斯特自动化
  • 证明有理数集不是完备的度量空间
  • SpringBoot 整合 RabbitMQ 的完美实践
  • 【代码随想录day 22】 力扣 40.组合总和II
  • Elasticsearch 深分页限制与解决方案