C++ThreadPool
一个简单的线程库,用于处理多线程任务
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>namespace utils {
class ThreadPool {
private:using task_t = std::function<void()>;public:// Constructor for the ThreadPool class that initializes a specified number of// threads. It reserves space for the threads and creates each thread to// execute the _run method.explicit ThreadPool(std::size_t thrd_cnt) noexcept {threads_.reserve(thrd_cnt);for (std::size_t i = 0; i < thrd_cnt; ++i) {threads_.emplace_back([this]() { _run(); });}}ThreadPool(const ThreadPool &) = delete;ThreadPool &operator=(const ThreadPool &) = delete;ThreadPool(ThreadPool &&) = delete;ThreadPool &operator=(ThreadPool &&) = delete;// Add a task to the task queue. The task is represented by a std::function// object that takes no arguments and returns nothing.template <class FnT, class... ArgsT>void add(FnT &&fn, ArgsT... args) noexcept {std::lock_guard<std::mutex> _(mtx_);tasks_.push(std::bind(std::forward<FnT>(fn), std::forward<ArgsT>(args)...));cond_.notify_one();}// Stop all threads in the thread pool. This method sets a flag to stop the// _run method and notifies all threads to wake up and exit.void stop() noexcept {if (stop_) {return;}{std::lock_guard<std::mutex> _(mtx_);stop_ = true;cond_.notify_all();}for (auto &thread : threads_) {thread.join();}}// Destructor for the ThreadPool class that calls the stop method to stop all// threads and join them.~ThreadPool() noexcept { stop(); }// Static method to retrieve the global thread pool instance. This method// returns a reference to the global thread pool instance.static ThreadPool &GetGlobalThreadPool() noexcept {static ThreadPool k_thread_pool(std::thread::hardware_concurrency());return k_thread_pool;}private:// Private method that runs in each thread of the thread pool. This method// waits for a task to be available in the task queue, executes the task, and// repeats the process until the thread pool is stopped.void _run() noexcept {while (true) {task_t task = {};{std::unique_lock<std::mutex> lk(mtx_);cond_.wait(lk, [this]() { return (stop_ || !tasks_.empty()); });if (stop_ && tasks_.empty()) {return;}task = std::move(tasks_.front());tasks_.pop();}task();}}private:bool stop_ = false;std::mutex mtx_;std::condition_variable cond_;std::queue<task_t> tasks_;std::vector<std::thread> threads_;
};class Event {
public:// Typedef for enum representing the state of an eventtypedef enum {INVALID = 0,SUCCESS = 1,ERROR = 2,} State;// Constructor for Event, initializes the stateexplicit Event(State state = INVALID) noexcept : state_(state) {}// Reset the event state to INVALIDvoid reset() noexcept {std::lock_guard<std::mutex> _(mtx_);state_ = INVALID;}// Set the event state and notify all waiting threadsvoid set(State state) noexcept {std::lock_guard<std::mutex> _(mtx_);state_ = state;cond_.notify_all();}// Wait for the event state to change from INVALID, with an optional timeoutState wait(int64_t tms /* ms */) noexcept {if (state_ != INVALID) {return state_;}std::unique_lock<std::mutex> lk(mtx_);if (tms <= 0) {cond_.wait(lk, [this]() { return (state_ != INVALID); });} else {cond_.wait_for(lk, std::chrono::milliseconds(tms),[this]() { return (state_ != INVALID); });}return state_;}private:// Current state of the eventState state_ = INVALID;// Mutex for synchronizing access to the event statestd::mutex mtx_;// Condition variable for blocking threads until the event state changesstd::condition_variable cond_;
};
} // namespace utils// Define a macro to easily access the global thread pool instance
// This macro retrieves the global thread pool from the utils namespace
#define GLOBAL_THREAD_POOL utils::ThreadPool::GetGlobalThreadPool()
测试代码
#include <algorithm>
#include <iostream>#include "thread_pool.h"using namespace utils;// 主函数,测试线程池的功能
int main(int argc, char *argv[]) {// 向全局线程池中添加10个任务,每个任务打印一个数字for (std::size_t i = 0; i < 10; ++i) {GLOBAL_THREAD_POOL.add([](auto i) { printf("-> %ld\n", i); }, i);}// 创建一个包含10个Event对象的向量,用于同步任务的完成状态std::vector<Event> events(10);// 向全局线程池中添加10个任务,每个任务打印一个数字并设置Event对象的状态为成功for (std::size_t i = 0; i < 10; ++i) {auto &evnet = events[i];evnet.reset();GLOBAL_THREAD_POOL.add([&evnet](auto i) {printf("=> %ld\n", i);evnet.set(Event::SUCCESS);},i);}// 检查所有Event对象的状态,如果有一个任务没有在1秒内成功完成,则输出"FAILED"if (std::any_of(events.begin(), events.end(), [](auto &event) {return (event.wait(1) != Event::SUCCESS);})) {std::cout << "FAILED" << std::endl;}// 返回程序成功结束的状态码return EXIT_SUCCESS;
}