C++ 并发 future, promise和async
文章目录
- 前言
- 1. std::async
- async用法
- async的启动策略
- 2. std::promise
- 3. std::future
- std::future::get():
- std::future::wait():
- 将任务和future关联
- 异常处理
- std::packaged_task
- 什么是std::packaged_task?
- std::packaged_task的基本使用
- 总结
- 线程池
前言
同步与异步是描述任务执行方式的两种基本模式,核心区别在于任务执行的顺序性和阻塞性:同步要求任务按顺序执行且必须等待当前操作完成,而异步允许任务独立执行并在完成后通过特定机制通知结果。
基本定义与核心区别
同步(Synchronous):
- 顺序执行:操作必须严格按照代码书写顺序执行,前一个任务未完成时后续任务会被阻塞。
- 阻塞特性:例如普通函数调用或同步读取文件时,程序会暂停执行直到获得结果。
异步(Asynchronous):
-
非阻塞执行:任务触发后无需等待结果,可立即执行后续代码,待任务完成后再通过回调、事件通知等方式处理结果。
-
并发优势:适用于网络请求、文件读写等高延迟操作,能显著提升系统吞吐量和响应速度。
技术实现与典型应用
同步实现方式:
-
普通函数调用(如 result = sync_function())。
-
同步 I/O 操作(如 Java 中 BufferedReader.readLine() 阻塞读取)。
异步编程方法:
1. 回调函数:通过嵌套回调处理结果,但易引发“回调地狱”。
2. Promise/Future:链式调用 .then() 管理异步状态,增强代码可读性。
3. async/await:以同步语法实现异步逻辑(如 await fetch()),降低代码复杂度。
4. 事件驱动模型:通过监听事件触发后续逻辑(如 Node.js 文件读取)。
扩展应用场景
-
电子信号传输。
- 同步需依赖公共时钟信号协调收发(如 SPI 通信协议)。
- 异步通过起始/停止位自同步(如 UART 串口通信)。
-
电机控制领域。
- 同步电机转子与定子磁场转速严格一致(用于精密调速场景)。
- 异步电机转子转速滞后于磁场转速,通过电磁感应产生转矩(常见于工业驱动)。
在 C++11 中引入的 头文件提供了一套用于异步编程的工具,主要包括 std::future、std::async 和 std::promise。
1. std::async
async用法
std::async
是一个用于异步执行函数的模板函数,它返回一个 std::future
对象,该对象用于获取函数的返回值。
以下是一个使用 std::async
的示例:
#include <iostream>
#include <future>
#include <chrono>// 定义一个异步任务
std::string fetchDataFromDB(std::string query) {// 模拟一个异步任务,比如从数据库中获取数据std::this_thread::sleep_for(std::chrono::seconds(5));return "Data: " + query;
}int main() {// 使用 std::async 异步调用 fetchDataFromDBstd::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");// 在主线程中做其他事情std::cout << "Doing something else..." << std::endl;// 从 future 对象中获取数据std::string dbData = resultFromDB.get();std::cout << dbData << std::endl;return 0;
}
在这个示例中,std::async
创建了一个新的线程(或从内部线程池中挑选一个线程)并自动与一个 std::promise
对象相关联。std::promise
对象被传递给 fetchDataFromDB
函数,函数的返回值被存储在 std::future
对象中。在主线程中,我们可以使用 std::future::get
方法从 std::future
对象中获取数据。注意,在使用 std::async
的情况下,我们必须使用 std::launch::async
标志来明确表明我们希望函数异步执行。
上面的例子输出
Doing something else...
Data: Data
async的启动策略
std::async
函数可以接受几个不同的启动策略,这些策略在std::launch
枚举中定义。除了std::launch::async
之外,还有以下启动策略:
std::launch::deferred
:这种策略意味着任务将在调用std::future::get()
或std::future::wait()
函数时延迟执行。换句话说,任务将在需要结果时同步执行。std::launch::async | std::launch::deferred
:这种策略是上面两个策略的组合。任务可以在一个单独的线程上异步执行,也可以延迟执行,具体取决于实现。
默认情况下,std::async
使用std::launch::async | std::launch::deferred
策略。这意味着任务可能异步执行,也可能延迟执行,具体取决于实现。需要注意的是,不同的编译器和操作系统可能会有不同的默认行为。
2. std::promise
C++11引入了std::promise
和std::future
两个类,用于实现异步编程。std::promise
用于在某一线程中设置某个值或异常,而std::future
则用于在另一线程中获取这个值或异常。
下面是std::promise
的基本用法:
#include <iostream>
#include <thread>
#include <future>void set_value(std::promise<int> prom) {// 设置 promise 的值prom.set_value(10);
}int main() {// 创建一个 promise 对象std::promise<int> prom;// 获取与 promise 相关联的 future 对象std::future<int> fut = prom.get_future();// 在新线程中设置 promise 的值std::thread t(set_value, std::move(prom));// 在主线程中获取 future 的值std::cout << "Waiting for the thread to set the value...\n";std::cout << "Value set by the thread: " << fut.get() << '\n';t.join();return 0;
}
程序输出
Waiting for the thread to set the value...
promise set value successValue set by the thread:
10
在上面的代码中,我们首先创建了一个std::promise<int>
对象,然后通过调用get_future()
方法获取与之相关联的std::future<int>
对象。然后,我们在新线程中通过调用set_value()
方法设置promise
的值,并在主线程中通过调用fut.get()
方法获取这个值。注意,在调用fut.get()
方法时,如果promise
的值还没有被设置,则该方法会阻塞当前线程,直到值被设置为止。
除了set_value()
方法外,std::promise
还有一个set_exception()
方法,用于设置异常。该方法接受一个std::exception_ptr
参数,该参数可以通过调用std::current_exception()
方法获取。下面是一个例子:
#include <iostream>
#include <thread>
#include <future>void set_exception(std::promise<void> prom) {try {// 抛出一个异常throw std::runtime_error("An error occurred!");} catch(...) {// 设置 promise 的异常prom.set_exception(std::current_exception());}
}int main() {// 创建一个 promise 对象std::promise<void> prom;// 获取与 promise 相关联的 future 对象std::future<void> fut = prom.get_future();// 在新线程中设置 promise 的异常std::thread t(set_exception, std::move(prom));// 在主线程中获取 future 的异常try {std::cout << "Waiting for the thread to set the exception...\n";fut.get();} catch(const std::exception& e) {std::cout << "Exception set by the thread: " << e.what() << '\n';}t.join();return 0;
}
上述代码输出
Waiting for the thread to set the exception...
Exception set by the thread: An error occurred!
当然我们使用promise
时要注意一点,如果promise
被释放了,而其他的线程还未使用与promise
关联的future
,当其使用这个future
时会报错。如下是一段错误展示
void use_promise_destruct() {std::thread t;std::future<int> fut;{// 创建一个 promise 对象std::promise<int> prom;// 获取与 promise 相关联的 future 对象fut = prom.get_future();// 在新线程中设置 promise 的值t = std::thread(set_value, std::move(prom));}// 在主线程中获取 future 的值std::cout << "Waiting for the thread to set the value...\n";std::cout << "Value set by the thread: " << fut.get() << '\n';t.join();
}
随着局部作用域}的结束,prom可能被释放也可能会被延迟释放, 如果立即释放则fut.get()获取的值会报error_value的错误。
3. std::future
std::future是C++11标准库中的⼀个模板类,它表示⼀个异步操作的结果。当我们在多线程编程中使⽤异步任务时,std::future可以帮助我们在需要的时候获取任务的执行结果。std::future的⼀个重要特性是能够阻塞当前线程,直到异步操作完成,从⽽确保我们在获取结果时不会遇到未完成的操作。
std::future::get()
和 std::future::wait()
是 C++ 中用于处理异步任务的两个方法,它们的功能和用法有一些重要的区别。
std::future::get():
std::future::get()
是一个阻塞调用,用于获取std::future
对象表示的值或异常。如果异步任务还没有完成,get()
会阻塞当前线程,直到任务完成。如果任务已经完成,get()
会立即返回任务的结果。重要的是,get()
只能调用一次,因为它会移动或消耗掉 std::future
对象的状态。一旦 get()
被调用,std::future
对象就不能再被用来获取结果。
std::future::wait():
std::future::wait()
也是一个阻塞调用,但它与get()
的主要区别在于 wait()
不会返回任务的结果。它只是等待异步任务完成。如果任务已经完成,wait()
会立即返回。如果任务还没有完成,wait()
会阻塞当前线程,直到任务完成。与 get()
不同,wait()
可以被多次调用,它不会消耗掉 std::future
对象的状态。
总结一下,这两个方法的主要区别在于:
std::future::get()
用于获取并返回任务的结果,而 std::future::wait()
只是等待任务完成。
get()
只能调用一次,而 wait()
可以被多次调用。
如果任务还没有完成,get()
和 wait()
都会阻塞当前线程,但get()
会一直阻塞直到任务完成并返回结果,而wait()
只是在等待任务完成。
你可以使用std::future
的wait_for()
或wait_until()
方法来检查异步操作是否已完成。这些方法返回一个表示操作状态的std::future_status
值。
if(fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { // 操作已完成
} else { // 操作尚未完成
}
将任务和future关联
std::packaged_task
和std::future
是C++11中引入的两个类,它们用于处理异步任务的结果。
std::packaged_task
是一个可调用目标,它包装了一个任务,该任务可以在另一个线程上运行。它可以捕获任务的返回值或异常,并将其存储在std::future
对象中,以便以后使用。
std::future
代表一个异步操作的结果。它可以用于从异步任务中获取返回值或异常。
以下是使用std::packaged_task
和std::future
的基本步骤:
创建一个std::packaged_task
对象,该对象包装了要执行的任务。
调用std::packaged_task
对象的get_future()
方法,该方法返回一个与任务关联的std::future
对象。
在另一个线程上调用std::packaged_task
对象的operator()
,以执行任务。
在需要任务结果的地方,调用与任务关联的std::future
对象的get()
方法,以获取任务的返回值或异常。
以下是一个简单的示例代码:
int my_task() {std::this_thread::sleep_for(std::chrono::seconds(5));std::cout << "my task run 5 s" << std::endl;return 42;
}void use_package() {// 创建一个包装了任务的 std::packaged_task 对象 std::packaged_task<int()> task(my_task);// 获取与任务关联的 std::future 对象 std::future<int> result = task.get_future();// 在另一个线程上执行任务 std::thread t(std::move(task));t.detach(); // 将线程与主线程分离,以便主线程可以等待任务完成 // 等待任务完成并获取结果 int value = result.get();std::cout << "The result is: " << value << std::endl;}
在上面的示例中,我们创建了一个包装了任务的std::packaged_task
对象,并获取了与任务关联的std::future
对象。然后,我们在另一个线程上执行任务,并等待任务完成并获取结果。最后,我们输出结果。
我们可以使用 std::function
和 std::package_task
来包装带参数的函数。std::package_task
是一个模板类,它包装了一个可调用对象,并允许我们将其作为异步任务传递。
异常处理
std::future
是C++的一个模板类,它用于表示一个可能还没有准备好的异步操作的结果。你可以通过调用 std::future::get
方法来获取这个结果。如果在获取结果时发生了异常,那么 std::future::get
会重新抛出这个异常。
以下是一个例子,演示了如何在 std::future
中获取异常:
#include <iostream>
#include <future>
#include <stdexcept>
#include <thread>void may_throw()
{// 这里我们抛出一个异常。在实际的程序中,这可能在任何地方发生。throw std::runtime_error("Oops, something went wrong!");
}int main()
{// 创建一个异步任务std::future<void> result(std::async(std::launch::async, may_throw));try{// 获取结果(如果在获取结果时发生了异常,那么会重新抛出这个异常)result.get();}catch (const std::exception &e){// 捕获并打印异常std::cerr << "Caught exception: " << e.what() << std::endl;}return 0;
}
在这个例子中,我们创建了一个异步任务 may_throw
,这个任务会抛出一个异常。然后,我们创建一个 std::future
对象 result
来表示这个任务的结果。在 main
函数中,我们调用 result.get()
来获取任务的结果。如果在获取结果时发生了异常,那么 result.get()
会重新抛出这个异常,然后我们在 catch
块中捕获并打印这个异常。
上面的例子输出
Caught exception: Oops, something went wrong!
std::packaged_task
什么是std::packaged_task?
std::packaged_task是一个模板类,用于打包任务(如可调用对象、函数、lambda表达式、bind表达式或者其他函数对象),以便异步地执行它,并获取其结果。它与std::future和std::promise一起构成了C++标准库中异步编程的基础。std::packaged_task的目的是为了封装任务以便在线程之间传递,实现任务的异步处理。
std::packaged_task的基本使用
std::packaged_task
封装的函数的计算结果会通过与之关联的std::future::get
获取(可以在其它线程中异步获取)。关联的std::future
可以通过std::packaged_task::get_future
获取,该方法只能调用一次,多次调用会触发std::future_error
异常。
和std::function
类似,std::packaged_task
是一个多态的、能够感知内存分配的容器:存储的可调用目标可以分配到堆上,也可以通过提供的内存分配器定义存储方式。
使用场景
异步任务执行:将任务封装起来,并在稍后或在A线程中执行它。在B线程获取结果。
线程池:在实现线程池时,可以使用std::packaged_task
封装任务,并将其提交到线程池中。
任务取消和重试:通过将任务封装在std::packaged_task
中,可以更方便地管理任务的取消和重试。
代码示例
以下是一个简单的示例,展示了如何使用std::packaged_task
和std::future
进行异步任务处理:
#include <iostream>
#include <future>
#include <thread>
#include <chrono>int compute(int a, int b) {std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟长时间计算return a + b;
}void task1() {std::packaged_task<int(int, int)> task(compute);std::future<int> result = task.get_future();std::thread t(std::move(task), 5, 4);std::cout << "Waiting for result..." << std::endl;std::cout << "Result: " << result.get() << std::endl;t.join();
}int main() {task1();return 0;
}
在这个示例中:
创建了一个std::packaged_task
对象,并封装了compute
函数。
获取与任务关联的std::future
对象。
在一个新的线程中执行任务。
等待并输出结果。
确保线程结束。
使用std::bind
和std::packaged_task
std::bind
用于创建一个新的可调用对象,将函数与部分或全部参数绑定,从而生成一个新的函数对象或函数指针。这个新对象可以存储并在稍后调用,而不需要再次提供参数。结合std::packaged_task
和std::bind
可以方便地封装和调度任务。
代码示例
#include <iostream>
#include <future>
#include <thread>
#include <functional>int packagedTaskMethod(int val, std::string str) {std::cout << "run packagedTaskMethod: val = " << val << " , str = " << str << std::endl;return 555999;
}void task2() {auto boundTask = std::bind(packagedTaskMethod, 42, std::placeholders::_1);std::packaged_task<int(std::string)> task1(boundTask);std::future<int> ret1 = task1.get_future();std::thread t(std::move(task1), "hhh");std::cout << "Result: " << ret1.get() << std::endl;t.join();
}int main() {task2();return 0;
}
在这个示例中:
使用std::bind
将packagedTaskMethod
的部分参数固定下来,生成一个新的可调用对象。
将该对象封装到std::packaged_task
中。
在一个新的线程中执行任务,并传递参数。
等待并输出结果。
确保线程结束。
总结
通过std::packaged_task
和std::future
,可以轻松地在不同线程间传递任务和获取结果,实现并发编程。同时,结合std::bind
可以简化参数管理和任务调度,提供更高的灵活性和可读性。这些特性使得std::packaged_task
在异步编程中非常有用,尤其是在复杂的任务管理和多线程环境中。
线程池
我们可以利用上面提到的std::packaged_task
和std::promise
构建线程池,提高程序的并发能力。 先了解什么是线程池:
线程池是一种多线程处理形式,它处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
线程池可以避免在处理短时间任务时创建与销毁线程的代价,它维护着多个线程,等待着监督管理者分配可并发执行的任务,从而提高了整体性能。
下面是我提供的一套线程池源码,目前用在公司的项目中
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__#include <atomic>
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>class ThreadPool {
public:ThreadPool(const ThreadPool&) = delete;ThreadPool& operator=(const ThreadPool&) = delete;static ThreadPool& instance() {static ThreadPool ins;return ins;}using Task = std::packaged_task<void()>;~ThreadPool() {stop();}template <class F, class... Args>auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {using RetType = decltype(f(args...));if (stop_.load())return std::future<RetType>{};auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<RetType> ret = task->get_future();{std::lock_guard<std::mutex> cv_mt(cv_mt_);tasks_.emplace([task] { (*task)(); });}cv_lock_.notify_one();return ret;}int idleThreadCount() {return thread_num_;}private:ThreadPool(unsigned int num = 5): stop_(false) {{if (num < 1)thread_num_ = 1;elsethread_num_ = num;}start();}void start() {for (int i = 0; i < thread_num_; ++i) {pool_.emplace_back([this]() {while (!this->stop_.load()) {Task task;{std::unique_lock<std::mutex> cv_mt(cv_mt_);this->cv_lock_.wait(cv_mt, [this] {return this->stop_.load() || !this->tasks_.empty();});if (this->tasks_.empty())return;task = std::move(this->tasks_.front());this->tasks_.pop();}this->thread_num_--;task();this->thread_num_++;}});}}void stop() {stop_.store(true);cv_lock_.notify_all();for (auto& td : pool_) {if (td.joinable()) {std::cout << "join thread " << td.get_id() << std::endl;td.join();}}}private:std::mutex cv_mt_;std::condition_variable cv_lock_;std::atomic_bool stop_;std::atomic_int thread_num_;std::queue<Task> tasks_;std::vector<std::thread> pool_;
};#endif // !__THREAD_POOL_H__
顺便介绍一下decltype
decltype
是C++11新增的一个关键字,和auto
的功能一样,用来在编译时期进行自动类型推导。引入decltype
是因为auto
并不适用于所有的自动类型推导场景,在某些特殊情况下auto
用起来很不方便,甚至压根无法使用。
典型应用场景
1.推导函数返回类型(配合 auto 和尾置返回类型)
C++11 中,当函数返回类型依赖参数类型时,可用 decltype 推导:
template <typename T1, typename T2>
auto add(T1 a, T2 b) -> decltype(a + b) { // 尾置返回类型,推导返回值为 a+b 的类型 return a + b;
}
C++14 后支持直接用 auto 作为返回类型(编译器自动推导,本质依赖 decltype):
template <typename T1, typename T2>
auto add(T1 a, T2 b) { // C++14 及以后,等价于上述写法 return a + b;
}
2.获取容器迭代器类型(避免手动书写复杂类型)
#include <vector>
std::vector<int> vec;
decltype(vec.begin()) it = vec.begin(); // 推导 it 为 std::vector<int>::iterator
3.定义模板中的类型别名(结合 typedef/using)
template <typename T>
using ptr_type = decltype(&T::member); // 推导类成员指针的类型
4.处理表达式的精确类型(包括引用 /const 限定)
int x = 0;
const int& rx = x;
decltype(rx) var1 = x; // var1 是 const int&(保留引用和 const)
decltype(x) var2 = rx; // var2 是 int(x 是普通变量,推导为值类型,丢失 const 和引用)