Linux——简单线程池封装案例
文章目录
- Linux C++ 线程池技术:从理论到实践的高性能并发编程
- 引言
- 一、 线程池的基本概念
- 1.1 线程池的定义与作用
- 1.2 线程池的适用场景
- 1.3 线程池的优势
- 二、 Linux 线程基础
- 2.1 POSIX 线程(pthread)简介
- POSIX 线程库常用函数及详解
- pthread_create
- pthread_join
- pthread_exit
- pthread_detach
- pthread_attr_init
- pthread_attr_destroy
- pthread_attr_setdetachstate
- 2.2 线程创建与同步机制
- 互斥锁(Mutex)
- pthread_mutex_init
- pthread_mutex_lock
- pthread_mutex_unlock
- 条件变量(Condition Variable)
- pthread_cond_init
- pthread_cond_wait
- pthread_cond_signal
- pthread_cond_broadcast
- 2.3 线程安全与竞态条件
- 三、 C++ 线程池实现的关键组件
- 3.1 任务队列(Task Queue)的设计
- 3.2 线程管理(Thread Workers)
- 3.3 线程同步与任务调度
- 四、 线程池的核心实现步骤
- 条件变量封装
- 自动锁封装
- 手动锁封装
- 简单日志封装(方便调试)
- 线程封装
- 线程池封装
Linux C++ 线程池技术:从理论到实践的高性能并发编程
引言
在现代多核CPU架构下,并发编程是挖掘硬件性能、提升应用吞吐量的关键技术。然而,频繁地创建和销毁线程会带来显著的性能开销,并可能导致系统资源耗尽。线程池(Thread Pool)作为一种经典的并发编程模式,应运而生。它通过预先创建并管理一组线程,实现了任务提交与任务执行的解耦,是构建高性能、高可靠性C++服务的基石。本文将探讨Linux环境下使用C++实现线程池的核心概念、实现细节。
一、 线程池的基本概念
1.1 线程池的定义与作用
线程池是一种多线程处理形式,其核心思想是预先创建一定数量的线程,放入一个“池子”中管理。当有任务需要执行时,它并不立即创建一个新线程,而是将任务提交到池中的任务队列中,等待一个空闲线程从队列中取出并执行。
它的主要作用可以概括为:
-
资源复用:避免线程频繁创建和销毁带来的性能损耗(如内存分配、系统调用)。
-
流量控制:通过队列缓冲,平滑处理突发的大量任务请求,防止系统过载。
-
统一管理:方便对线程进行统一的监控、管理和调优。
1.2 线程池的适用场景
高并发服务器:如Web服务器、文件服务器、游戏服务器,需要快速响应大量网络请求。
异步任务处理:如后台日志记录、数据批量处理、邮件发送等不阻塞主线程的任务。
并行计算:将一个大型计算任务拆分成多个小任务,由线程池并行处理,加速计算。
1.3 线程池的优势
性能优化:线程复用降低了创建销毁的开销,任务队列减少了调度延迟。
资源管理:限制了系统最大并发线程数,防止因线程过多导致内存耗尽或过度上下文切换。
稳定性提升:对异步任务进行了统一管理,避免了野线程和资源泄漏问题。
二、 Linux 线程基础
2.1 POSIX 线程(pthread)简介
在Linux上,线程通常通过POSIX线程库(pthread)实现。虽然C++11提供了标准的std::thread,但理解其底层基础(通常是pthread)对于调试和深度优化至关重要。
POSIX 线程库常用函数及详解
pthread_create
用于创建一个新的线程。
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);
- thread:指向线程标识符的指针。
- attr:线程属性,通常设为
NULL
使用默认属性。 - start_routine:线程运行的函数入口。
- arg:传递给线程函数的参数。
- 返回值:成功返回 0,失败返回错误码。
pthread_join
等待指定线程终止并回收资源。
int pthread_join(pthread_t thread, void **retval);
- thread:目标线程标识符。
- retval:存储线程退出状态的指针,可为
NULL
。 - 返回值:成功返回 0,失败返回错误码。
pthread_exit
终止调用线程并返回状态值。
void pthread_exit(void *retval);
- retval:线程退出状态,可被其他线程通过
pthread_join
获取。
pthread_detach
分离线程,使其终止时自动释放资源。
int pthread_detach(pthread_t thread);
- thread:目标线程标识符。
- 返回值:成功返回 0,失败返回错误码。
pthread_attr_init
初始化线程属性对象。
int pthread_attr_init(pthread_attr_t *attr);
- attr:指向属性对象的指针。
- 返回值:成功返回 0,失败返回错误码。
pthread_attr_destroy
销毁线程属性对象。
int pthread_attr_destroy(pthread_attr_t *attr);
- attr:目标属性对象。
- 返回值:成功返回 0,失败返回错误码。
pthread_attr_setdetachstate
设置线程的分离状态属性。
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
- attr:属性对象。
- detachstate:
PTHREAD_CREATE_JOINABLE
(默认)或PTHREAD_CREATE_DETACHED
。 - 返回值:成功返回 0,失败返回错误码。
2.2 线程创建与同步机制
实现线程池,本质上是管理多个线程协同工作,这离不开线程同步机制。
互斥锁(Mutex)
用于保护共享资源(如任务队列),防止多个线程同时访问导致数据竞争(Data Race)。
pthread_mutex_init
初始化互斥锁。
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
- mutex:指向互斥锁的指针。
- attr:锁属性,通常设为
NULL
使用默认属性。 - 返回值:成功返回 0,失败返回错误码。
pthread_mutex_lock
加锁,若锁已被占用则阻塞。
int pthread_mutex_lock(pthread_mutex_t *mutex);
- mutex:目标互斥锁。
- 返回值:成功返回 0,失败返回错误码。
pthread_mutex_unlock
解锁。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
- mutex:目标互斥锁。
- 返回值:成功返回 0,失败返回错误码。
C++标准库提供了std::mutex,其底层在Linux通常是pthread_mutex_t。
条件变量(Condition Variable)
用于线程间的通信与同步。当某个条件不满足时(如任务队列为空),线程可以阻塞在条件变量上等待;当条件可能满足时(如新任务到来),通知等待的线程。
pthread_cond_init
初始化条件变量。
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
- cond:指向条件变量的指针。
- attr:属性,通常设为
NULL
。 - 返回值:成功返回 0,失败返回错误码。
pthread_cond_wait
阻塞等待条件变量被触发。
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
- cond:目标条件变量。
- mutex:关联的互斥锁,调用时会自动释放锁。
- 返回值:成功返回 0,失败返回错误码。
pthread_cond_signal
唤醒一个等待该条件变量的线程。
int pthread_cond_signal(pthread_cond_t *cond);
- cond:目标条件变量。
- 返回值:成功返回 0,失败返回错误码。
pthread_cond_broadcast
唤醒所有等待该条件变量的线程。
int pthread_cond_broadcast(pthread_cond_t *cond);
- cond:目标条件变量。
- 返回值:成功返回 0,失败返回错误码。
C++标准库提供了std::condition_variable。
2.3 线程安全与竞态条件
竞态条件(Race Condition) 是指程序的输出依赖于不受控制的事件序列。线程池的核心数据结构——任务队列,是一个典型的共享资源,必须通过互斥锁等手段使其成为线程安全(Thread-Safe) 的,以消除竞态条件。
三、 C++ 线程池实现的关键组件
一个最小化的线程池通常包含以下三个核心组件:
3.1 任务队列(Task Queue)的设计
任务队列是生产者和消费者之间的桥梁。生产者线程(如主线程)提交任务,消费者线程(工作线程)从队列中获取任务。
数据类型:通常使用std::function<void()>或自定义函数对象来封装任何可调用任务。
容器选择:std::queue或std::deque是简单直接的选择。对于高级场景,可以考虑优先级队列(如std::priority_queue)以实现带优先级的调度。
3.2 线程管理(Thread Workers)
工作线程是线程池中的劳动者,它们的行为模式是一个循环:
等待条件变量通知(有任务或需要停止)。
在互斥锁的保护下,从任务队列中取出一个任务。
释放锁,执行取出的任务。
回到步骤1,等待下一个任务。
3.3 线程同步与任务调度
同步机制将上述组件粘合在一起:
std::mutex:保护对任务队列、停止标志等所有共享状态的访问。
std::condition_variable:
生产者在添加新任务后,调用notify_one()或notify_all()来通知可能正在等待的工作线程。
消费者在等待任务时,使用条件变量的wait()方法,并在等待时自动释放锁,避免忙等待。
四、 线程池的核心实现步骤
条件变量封装
#pragma once
#include <pthread.h>
#include<iostream>namespace CondtionModule
{class Cond{public:// 构造函数,初始化条件变量Cond(){pthread_cond_init(&_cond, nullptr);}// 等待条件变量,同时释放互斥锁// @param mutex 互斥锁引用void Wait(pthread_mutex_t& mutex){int n = pthread_cond_wait(&_cond, &mutex);if(n < 0)std::cerr << "Cond Wait failed!" << std::endl;}// 唤醒一个等待该条件变量的线程void Signal(){int n = pthread_cond_signal(&_cond);if(n < 0)std::cerr << "Cond Signal failed!" << std::endl;}// 唤醒所有等待该条件变量的线程void Broadcast(){int n = pthread_cond_broadcast(&_cond);if(n < 0)std::cerr << "Cond Broadcast failed!" << std::endl;}// 析构函数,销毁条件变量~Cond(){int n = pthread_cond_destroy(&_cond);if(n < 0)std::cerr << "~Cond failed! " << std::endl;}private:pthread_cond_t _cond; // 条件变量};
}
自动锁封装
#ifndef __LOCKGUARD_HPP__ // 防止头文件重复包含的宏定义
#define __LOCKGUARD_HPP__ // 定义头文件宏#include <pthread.h> // 引入POSIX线程库头文件namespace LockGuardModule // 定义命名空间封装锁守卫类
{class Lockguard // RAII风格的互斥锁守卫类{public:// 构造函数:初始化并锁定互斥锁Lockguard(pthread_mutex_t mutex): _mutex(mutex) // 初始化成员变量{pthread_mutex_init(&_mutex, nullptr); // 初始化互斥锁pthread_mutex_lock(&_mutex); // 加锁(阻塞直到获取锁)}// 析构函数:自动释放互斥锁~Lockguard(){pthread_mutex_unlock(&_mutex); // 解锁}private:pthread_mutex_t _mutex; // 封装的POSIX互斥锁对象};
}#endif // 结束头文件宏
手动锁封装
以下是为提供的 C++ 互斥锁(Mutex)头文件代码生成的完整注释,所有注释集中展示:
#ifndef __MUTEX_HPP__ // 防止头文件重复包含的宏定义
#define __MUTEX_HPP__ // 定义宏标识符
#include<pthread.h> // 引入POSIX线程库(包含互斥锁API)
#include<unistd.h> // 引入Unix标准库(提供系统调用接口)
#include<iostream> // 引入标准输入输出流库(未实际使用)namespace MutexModule // 定义命名空间隔离互斥锁实现
{class Mutex // 互斥锁封装类{public:Mutex() // 构造函数:初始化互斥锁{pthread_mutex_init(&_mutex, nullptr); // 使用默认属性初始化互斥锁}void Lock() // 加锁方法:阻塞直到获取锁{pthread_mutex_lock(&_mutex); // 调用POSIX加锁函数}void Unlock() // 解锁方法:释放锁{pthread_mutex_unlock(&_mutex); // 调用POSIX解锁函数}pthread_mutex_t& Getmutex() // 获取底层互斥锁引用(通常用于条件变量){return _mutex; // 返回内部互斥锁对象}~Mutex() // 析构函数:销毁互斥锁{pthread_mutex_destroy(&_mutex); // 释放互斥锁资源}private:pthread_mutex_t _mutex; // 内部POSIX互斥锁对象};
}#endif // 结束条件编译指令
简单日志封装(方便调试)
#ifndef __LOG_HPP__
#define __LOG_HPP__#include <iostream>
#include <ctime>
#include <string>
#include <pthread.h>
#include <sstream>
#include <fstream>
#include <filesystem>
#include <unistd.h>
#include <memory>
#include "Mutex.hpp"
#include "Lockguard.hpp"namespace LogModule
{using namespace MutexModule;// 默认日志文件路径和文件名std::string default_path = "./log/";std::string default_file = "log.txt";// 日志级别枚举:从调试信息到致命错误,级别依次升高enum LogLevel{DEBUG, // 调试信息,用于开发阶段排查问题INFO, // 普通信息,记录程序运行状态WARNING, // 警告信息,表明可能存在潜在问题ERROR, // 错误信息,表示功能无法正常执行FATAL // 致命错误,可能导致程序崩溃};// 将日志级别枚举转换为对应的字符串表示std::string LogLevelToString(LogLevel level){switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOWN"; // 未知级别,用于错误处理}}// 获取当前时间的格式化字符串(线程安全版本)std::string GetCurrentTime(){std::time_t time = std::time(nullptr); // 获取当前时间戳struct tm stm;localtime_r(&time, &stm); // 线程安全的时间转换函数// 格式化时间字符串:年-月-日-时-分-秒char buff[128];snprintf(buff, sizeof(buff), "%4d-%02d-%02d-%02d-%02d-%02d",stm.tm_year + 1900, // 年份需要加1900stm.tm_mon + 1, // 月份从0开始,需要加1stm.tm_mday, // 日stm.tm_hour, // 时stm.tm_min, // 分stm.tm_sec); // 秒return buff;}// 日志策略抽象基类:定义日志输出的统一接口class Logstrategy{public:virtual ~Logstrategy() = default; // 虚析构函数确保正确释放派生类资源virtual void syncLog(std::string &message) = 0; // 纯虚函数,同步输出日志消息};// 控制台日志策略:将日志输出到标准错误流(std::cerr)class ConsoleLogstrategy : public Logstrategy{public:void syncLog(std::string &message) override{std::cerr << message << std::endl; // 输出到控制台并换行}~ConsoleLogstrategy() override{// 析构函数,清理资源(此处无特殊资源需要清理)}};// 文件日志策略:将日志输出到指定文件class FileLogstrategy : public Logstrategy{public:// 构造函数:初始化文件路径和文件名,确保日志目录存在FileLogstrategy(std::string filepath = default_path, std::string filename = default_file){_mutex.Lock(); // 加锁保护共享资源访问_filepath = filepath;_filename = filename;// 检查日志目录是否已存在if (std::filesystem::exists(filepath)) {_mutex.Unlock(); // 目录存在,直接返回return;} try{// 递归创建目录(包括所有父目录)std::filesystem::create_directories(filepath);}catch (const std::filesystem::filesystem_error &e){// 目录创建失败,输出错误信息到标准错误std::cerr << e.what() << '\n';}_mutex.Unlock(); // 解锁}// 同步输出日志到文件void syncLog(std::string &message) override{_mutex.Lock(); // 加锁保护文件操作// 构建完整文件路径,处理路径分隔符std::string path =_filepath.back() == '/' ? _filepath + _filename : _filepath + "/" + _filename;// 以追加模式打开文件std::ofstream out(path, std::ios::app);if (!out.is_open()){_mutex.Unlock(); // 解锁后再返回std::cerr << "file open fail!" << '\n'; // 文件打开失败提示return;}out << message << '\n'; // 写入日志消息并换行_mutex.Unlock(); // 解锁out.close(); // 关闭文件}~FileLogstrategy(){// 析构函数,清理资源}private:std::string _filepath; // 日志文件路径std::string _filename; // 日志文件名Mutex _mutex; // 互斥锁,保护文件操作};// 主日志类:提供日志记录的核心功能class Log{public:// 构造函数:默认使用控制台输出策略Log(){_logstrategy = std::make_unique<ConsoleLogstrategy>();}// 切换到控制台输出策略void useconsolestrategy(){_logstrategy = std::make_unique<ConsoleLogstrategy>();printf("转换控制台策略!\n");}// 切换到文件输出策略void usefilestrategy(){_logstrategy = std::make_unique<FileLogstrategy>();printf("转换文件策略!\n");}// 日志消息类:负责构建单条日志消息class LogMessage{public:// 构造函数:初始化日志消息的基本信息LogMessage(LogLevel level, std::string file, int line, Log &log): _loglevel(level), _time(GetCurrentTime()) // 获取当前时间, _file(file), _pid(getpid()) // 获取进程ID, _line(line),_log(log) // 引用外部Log对象{// 构建日志消息头:包含时间、级别、进程ID、文件名、行号等信息std::stringstream ss;ss << "[" << _time << "] "<< "[" << LogLevelToString(_loglevel) << "] "<< "[" << _pid << "] "<< "[" << _file << "] "<< "[" << _line << "] "<< "- ";_loginfo = ss.str(); // 保存格式化后的消息头}// 重载<<运算符:支持链式输出,将各种类型数据追加到日志消息template <typename T>LogMessage &operator<<(const T &t){std::stringstream ss;ss << _loginfo << t; // 将新内容追加到现有消息_loginfo = ss.str(); // 更新消息内容return *this; // 返回自身引用,支持链式调用}// 析构函数:在对象销毁时实际输出日志~LogMessage(){// 检查日志策略是否有效if (_log._logstrategy){// 调用具体策略输出日志消息_log._logstrategy->syncLog(_loginfo);}}private:LogLevel _loglevel; // 日志级别std::string _time; // 时间戳pid_t _pid; // 进程IDstd::string _file; // 源文件名int _line; // 源代码行号std::string _loginfo; // 完整的日志消息内容Log &_log; // 对外部Log对象的引用};// 重载函数调用运算符:创建LogMessage对象开始记录日志LogMessage operator()(LogLevel level, std::string filename, int line){return LogMessage(level, filename, line, *this);}~Log(){// 析构函数,自动释放unique_ptr管理的策略对象}private:std::unique_ptr<Logstrategy> _logstrategy; // 智能指针管理日志策略对象};Log logger; // 全局日志对象实例// 宏定义:简化日志调用,自动填充文件名和行号
#define Log(type) logger(type, __FILE__, __LINE__)// 宏定义:切换到控制台输出策略
#define ENABLE_LOG_CONSOLE_STRATEGY() logger.useconsolestrategy()// 宏定义:切换到文件输出策略
#define ENABLE_LOG_FILE_STRATEGY() logger.usefilestrategy()
}#endif
线程封装
#ifndef __PTHREAD_HPP__
#define __PTHREAD_HPP__#include <pthread.h>
#include <string>
#include <iostream>
#include <functional>namespace PthreadModule
{uint32_t order = 1; // 全局计数器,用于生成线程唯一名称// 线程状态枚举:表示线程的生命周期状态enum ThreadStatus{RUNNING, // 线程正在运行NEW, // 线程已创建但未启动STOPPED // 线程已停止};// 线程函数类型定义:使用std::function包装返回void且无参数的函数using Thread_func_t = std::function<void()>;class Pthread{private:// 设置线程名称:基于全局计数器生成唯一名称void setname(){_thread_name = "Thread——" + std::to_string(order++);}// 静态线程入口函数:符合pthread_create要求的函数签名static void *run(void *arg){Pthread* thr = static_cast<Pthread*>(arg); // 将void*转换回Pthread对象指针thr->_thread_func(); // 执行用户提供的线程函数return nullptr; // 线程正常退出,返回空指针}public:// 构造函数:初始化线程对象,接收用户线程函数Pthread(Thread_func_t func_t): _thread_func(func_t) // 用户提供的线程函数, _thread_status(NEW) // 初始状态为NEW(未启动), _isjoineable(true) // 默认可join, _isjoin(false) // 初始未join{setname(); // 自动设置线程名称}// 设置线程为分离状态:分离后无需join,线程结束后自动释放资源bool setDetach(){if (_isjoineable) // 只有可join的线程才能设置为分离{pthread_detach(_thread_id); // 调用pthread_detach设置分离属性_isjoineable = false; // 更新状态为不可joinreturn true;}return false; // 如果已经是分离状态,返回false}// 启动线程:创建真正的操作系统线程bool start(){if (_thread_status != ThreadStatus::RUNNING) // 确保线程未在运行{int n = pthread_create(&_thread_id, nullptr, run, (void *)this);if (n < 0) // pthread_create成功返回0,失败返回错误码return false;if (!_isjoineable) // 如果已经设置为分离状态{setDetach(); // 确保线程属性为分离}_thread_status = ThreadStatus::RUNNING; // 更新线程状态为运行中return true;}return false; // 线程已在运行,启动失败}// 取消线程:请求终止目标线程的执行bool cancel(){if (_thread_status == ThreadStatus::RUNNING) // 只有运行中的线程才能取消{int n = pthread_cancel(_thread_id); // 发送取消请求if (n < 0) // 成功返回0,失败返回错误码return false;return true;}return false; // 线程未运行,取消失败}// 等待线程结束并回收资源:阻塞调用线程直到目标线程结束bool join(){if (_thread_status == ThreadStatus::RUNNING) // 只有运行中的线程才能join{int n = pthread_join(_thread_id, &_ret); // 等待线程结束并获取返回值_isjoin = true; // 标记已joinif (n < 0) // 成功返回0,失败返回错误码return false;return true;}return false; // 线程未运行,join失败}// 获取线程名称std::string Getname(){return _thread_name;}// 析构函数:确保资源正确释放~Pthread(){if(_isjoineable && !_isjoin) // 如果线程可join但尚未join{join(); // 自动等待线程结束,避免资源泄漏}}private:pthread_t _thread_id; // 操作系统线程IDstd::string _thread_name; // 线程自定义名称ThreadStatus _thread_status; // 线程当前状态Thread_func_t _thread_func; // 用户提供的线程函数void* _ret; // 线程返回值(暂未使用)bool _isjoineable; // 标记线程是否可join(非分离状态)bool _isjoin; // 标记线程是否已被join};}#endif // __PTHREAD_HPP__
线程池封装
#pragma once#include "Log.hpp"
#include "Pthread.hpp"
#include "Cond.hpp"
#include <queue>namespace ThreadPoolModule
{using namespace PthreadModule;using namespace MutexModule;using namespace LockGuardModule;using namespace CondtionModule;using namespace LogModule;template<typename T>class Thread_pool{private:// 线程处理函数:工作线程的主循环,不断从任务队列中取出并执行任务void HandlerTask(){while(true){_mutex.Lock(); // 加锁保护共享资源访问// 等待条件:任务队列不为空或线程池停止运行while(_task_queue.empty() && _isrunning){_waitnum++; // 增加等待线程计数_cond.Wait(_mutex.Getmutex()); // 等待条件变量通知(自动释放锁)// 被唤醒后_waitnum--; // 减少等待线程计数}// 检查退出条件:线程池已停止且任务队列为空if(_task_queue.empty() && !_isrunning){_mutex.Unlock(); // 解锁后再退出break; // 退出线程循环}// 获取任务并执行T t = _task_queue.front(); // 从队列头部取任务_task_queue.pop(); // 移除已取出的任务_mutex.Unlock(); // 解锁,允许其他线程访问队列t(); // 执行任务(函数对象调用)}}// 私有构造函数:实现单例模式,防止外部创建多个实例Thread_pool(int poolsize = 5): _pool_size(poolsize) // 初始化线程数量, _waitnum(0) // 初始化等待线程数为0, _isrunning(false) // 初始状态为未运行{// 创建指定数量的工作线程for(int i = 0; i < poolsize; i++){// 绑定线程处理函数到每个线程_threads.emplace_back(std::bind(&Thread_pool<T>::HandlerTask, this));}}public:// 删除拷贝构造函数和赋值运算符,确保单例唯一性Thread_pool(Thread_pool<T>&) = delete;Thread_pool<T>& operator=(const Thread_pool<T>&) = delete;// 获取线程池单例实例(线程安全版本)static Thread_pool<T>* Getinstance(){// 双重检查锁定:提高性能,避免每次获取实例都加锁if(_instance == nullptr) // 第一次检查(无锁){Lockguard lock(_lock.Getmutex()); // 加锁保护创建过程if(_instance == nullptr) // 第二次检查(加锁后){_instance = new Thread_pool<T>(); // 创建单例实例_instance->Run(); // 启动线程池return _instance;}}return _instance; // 返回已存在的实例}// 启动线程池:让所有工作线程开始运行bool Run(){if(_isrunning == false) // 确保线程池未在运行{for(auto& thread : _threads) // 遍历所有线程{thread.start(); // 启动线程Log(LogLevel::DEBUG) << thread.Getname() << "start!" ; // 记录日志}_isrunning = true; // 设置运行标志return true;}return false; // 如果已在运行,返回false}// 停止线程池:通知所有工作线程停止bool Stop(){if(_isrunning == false) // 如果已经停止,直接返回return false;_mutex.Lock(); // 加锁保护状态修改if(_isrunning == true){_isrunning = false; // 设置停止标志_cond.Broadcast(); // 广播通知所有等待的线程Log(LogLevel::DEBUG) << "Stop sucess!" ; // 记录日志_mutex.Unlock(); // 解锁return true;}_mutex.Unlock(); // 解锁return false;}// 等待所有工作线程结束:阻塞直到所有线程完成退出bool Wait(){for(auto& thread : _threads) // 遍历所有线程{if(thread.join()) // 等待线程结束Log(LogLevel::INFO) << thread.Getname() << " 退出..."; // 成功退出日志elseLog(LogLevel::ERROR) << "Wait error!"; // 等待失败日志}return true;}// 向任务队列添加任务(生产者接口)bool Enqueue(const T& t){_mutex.Lock(); // 加锁保护队列if(_isrunning == true) // 确保线程池在运行{_task_queue.push(t); // 任务入队if(_waitnum > 0) // 如果有线程在等待任务{_cond.Signal(); // 通知一个等待线程Log(LogLevel::DEBUG) << "唤醒一个线程!"; // 记录日志}Log(LogLevel::DEBUG) << "任务入队列成功!"; // 记录日志_mutex.Unlock(); // 解锁return true;}_mutex.Unlock(); // 解锁return false; // 线程池未运行,入队失败}// 析构函数~Thread_pool(){// 注意:单例对象通常需要显式释放,此处未实现释放逻辑}private:int _pool_size; // 线程池大小(工作线程数量)std::vector<Pthread> _threads; // 工作线程集合Mutex _mutex; // 互斥锁,保护任务队列和共享状态Cond _cond; // 条件变量,用于线程间同步int _waitnum; // 当前等待任务的线程数量bool _isrunning; // 线程池运行状态标志std::queue<T> _task_queue; // 任务队列(FIFO)public:static Thread_pool<T>* _instance; // 单例实例指针static Mutex _lock; // 用于保护单例创建的静态锁};// 静态成员初始化template<typename T>Thread_pool<T>* Thread_pool<T>::_instance = nullptr;template<typename T>Mutex Thread_pool<T>::_lock;
}