C++ 并发编程指南 并发设计模式:Actor vs. CSP (生活场景版)
文章目录
- **并发设计模式:Actor vs. CSP (生活场景版)**
- **1. Actor 模式:就像公司里的邮件系统**
- **2. CSP 模式:就像快递站的储物格**
- **总结与对比:如何选择?**
- C++ 实现 Actor 与 CSP 模式的生活化示例
- 场景描述
- 1. CSP 模式实现 (使用 Channel)
- 2. Actor 模式实现
- 两种模式的对比分析
- CSP 模式特点:
- Actor 模式特点:
- 适用场景:
并发设计模式:Actor vs. CSP (生活场景版)
想象一下,你正在规划一个大型活动的后勤工作。传统的“共享内存+锁”模式就像是一个共享的白板:所有工作人员都在上面读写任务,但必须轮流使用(加锁),经常造成拥堵(性能差),并且一个工作人员的失误可能擦掉别人的数据(状态混乱,耦合度高)。
Actor 和 CSP 提供了更优雅的解决方案。
1. Actor 模式:就像公司里的邮件系统
核心思想: “人(Actor)与人之间直接发邮件(消息)”。
生活场景模拟:
假设你的活动有三个核心部门:
- 前台 (Front Desk Actor):负责接收外界送来的物资,并通知相应部门来取。
- 厨房 (Kitchen Actor):负责处理食材,制作餐食。
- 舞台 (Stage Actor):负责管理音响、灯光设备。
工作流程 (Actor 模式):
- 隔离与邮箱: 每个部门(Actor)都在自己的办公室(独立线程/进程)里工作,互不干扰。每个部门门口都有一个专属邮箱(Mailbox)。
- 异步通信: 厨房需要面粉了,他们不会直接跑去仓库翻找(共享内存)。而是给前台发一封邮件(
Send("前台", NeedFlourMsg)
):“请送10袋面粉来”,然后继续做自己的事(异步)。 - 顺序处理: 前台的员工一次只从自己的邮箱里取出一封邮件进行处理。比如他先处理舞台的“需要新麦克风”请求,处理完后,再处理厨房的“需要面粉”请求。他一次只做一件事,所以绝不会搞混(无需锁,内部状态安全)。
- 直接寻址: 厨房知道这封邮件是发给“前台”的,前台收到邮件后也知道它来自“厨房”。就像邮件有明确的发件人和收件人。
Actor 模式的特点:
- 重点在“人”(Actor):系统由多个独立的 Actor 组成。
- 直接通信:知道消息发给谁,也知道消息来自谁。
- 优点:职责清晰,部门间完全隔离,一个部门崩溃不会直接影响另一个(容错性强)。
现实中的例子: 公司的邮件系统、客服工单系统(每个工单分配给特定客服处理)。
2. CSP 模式:就像快递站的储物格
核心思想: “人把东西放到公共格子(Channel),别人再从格子里取”。
生活场景模拟:
同样是那个活动,现在我们换一种后勤模式。我们设置一个中央快递站,站里有一排带编号的储物格(Channel)。
- 格子的类型:
- 小件格 (无缓冲 Channel):只能放一件物品。如果格子里有东西,快递员必须等别人取走才能放新的(直接阻塞等待)。
- 货架格 (有缓冲 Channel):比如 10 号货架可以放最多 10 箱水。放满后,送货员必须等待;取空后,取货员也必须等待。
工作流程 (CSP 模式):
- 关注通道: 工作人员不关心是谁把东西放进格子,也不关心最终是谁取走的。他们只认格子(Channel)。
- 解耦通信: 农夫送来 10 箱矿泉水。他不需要知道活动方是谁,他只需根据指示,把水放到“10号货架”(
10号货架 <- 10箱水
)。 - 匿名处理: 活动现场的志愿者渴了,他也不知道水是谁送的,他只需走到“10号货架”取一箱水(
水 <- 10号货架
)。 - 同步协调: 如果志愿者来取水时货架是空的,他就会在那里等着,直到有农夫送来水(同步等待)。反之,如果货架满了,农夫也会等着。
CSP 模式的特点:
- 重点在“格子”(Channel):Channel 是第一公民,是通信的核心。
- 间接通信:发送者和接收者不知道对方的存在,完全通过 Channel 解耦。
- 优点:灵活性极高,可以轻松组合出复杂的数据流管道(如:
格A -> 工作人员1 -> 格B -> 工作人员2 -> 格C
)。
现实中的例子: 工厂的流水线、餐厅的传菜窗口(厨师做好菜放到窗口,服务员从窗口端走,他们不需要直接交流)。
总结与对比:如何选择?
特性 | Actor 模式 (邮件系统) | CSP 模式 (快递站格子) |
---|---|---|
通信对象 | 明确的参与者 (Actor) | 抽象的通道 (Channel) |
耦合关系 | 发送方和接收方互相知晓 | 发送方和接收方互相不知 |
关注点 | 谁 (Who) 来处理消息 | 在哪里 (Where) 交换消息 |
同步方式 | 消息发送通常是异步的 | 通过 Channel 进行同步(等待) |
生活比喻 | 部门邮件往来 | 快递站、流水线、传菜窗口 |
典型语言 | Erlang, Akka (Java/Scala) | Go (原生支持), Clojure |
选择建议:
- 如果你的系统由许多独立的、有状态的、需要明确身份的实体组成(比如游戏中的每个玩家、电商系统中的每个订单),Actor 模式更直观。
- 如果你需要构建一个数据流管道,处理步骤清晰,且希望最大程度解耦生产者和消费者(比如数据处理管道、网络爬虫的各个阶段),CSP 模式更灵活。
它们的共同伟大之处在于实现了那句名言:
“不要通过共享内存来通信;而要通过通信来共享内存。”
(Do not communicate by sharing memory; instead, share memory by communicating.)*
这意味着我们不再需要小心翼翼地守护一个“共享白板”,而是通过“邮件”或“快递”这种更自然、更安全的方式来协作,从而大大降低了并发编程的复杂性。
C++ 实现 Actor 与 CSP 模式的生活化示例
下面我将使用 C++ 实现餐厅点餐场景的 Actor 和 CSP 模式示例。
场景描述
在一个餐厅中,有顾客、服务员和厨师三种角色。顾客下单,服务员接收订单并传递给厨师,厨师烹饪完成后通知服务员上菜。
1. CSP 模式实现 (使用 Channel)
首先,我们需要实现一个简单的 Channel 类来模拟 Go 语言的 Channel:
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <string>// 简单的 Channel 实现
template <typename T>
class Channel {
private:std::queue<T> queue_;std::mutex mtx_;std::condition_variable cv_producer_;std::condition_variable cv_consumer_;size_t capacity_;bool closed_ = false;public:Channel(size_t capacity = 0) : capacity_(capacity) {}bool send(T value) {std::unique_lock<std::mutex> lock(mtx_);cv_producer_.wait(lock, [this]() {return (capacity_ == 0 && queue_.empty()) || queue_.size() < capacity_ || closed_;});if (closed_) {return false;}queue_.push(value);cv_consumer_.notify_one();return true;}bool receive(T& value) {std::unique_lock<std::mutex> lock(mtx_);cv_consumer_.wait(lock, [this]() { return !queue_.empty() || closed_; });if (closed_ && queue_.empty()) {return false;}value = queue_.front();queue_.pop();cv_producer_.notify_one();return true;}void close() {std::unique_lock<std::mutex> lock(mtx_);closed_ = true;cv_producer_.notify_all();cv_consumer_.notify_all();}
};// 定义消息类型
struct Order {int customerId;std::string dish;
};struct CookedDish {int customerId;std::string dish;
};// 厨师函数
void chef(Channel<Order>& orderChannel, Channel<CookedDish>& cookedChannel) {Order order;while (orderChannel.receive(order)) {std::cout << "厨师收到订单: 顾客 " << order.customerId << " 点了 " << order.dish << std::endl;// 模拟烹饪时间std::this_thread::sleep_for(std::chrono::seconds(2));CookedDish cookedDish{order.customerId, order.dish};std::cout << "厨师完成烹饪: " << cookedDish.dish << std::endl;cookedChannel.send(cookedDish);}std::cout << "厨师结束工作" << std::endl;
}// 服务员函数
void waiter(Channel<Order>& customerToWaiter, Channel<Order>& waiterToChef, Channel<CookedDish>& chefToWaiter, Channel<CookedDish>& waiterToCustomer) {while (true) {// 从顾客接收订单Order order;if (!customerToWaiter.receive(order)) {break;}std::cout << "服务员接收订单: 顾客 " << order.customerId << " 点了 " << order.dish << std::endl;// 将订单发送给厨师waiterToChef.send(order);// 从厨师接收烹饪完成的菜品CookedDish cookedDish;if (!chefToWaiter.receive(cookedDish)) {break;}std::cout << "服务员取菜: " << cookedDish.dish << std::endl;// 将菜品发送给顾客waiterToCustomer.send(cookedDish);}std::cout << "服务员结束工作" << std::endl;
}// 顾客函数
void customer(int id, Channel<Order>& waiterChannel, Channel<CookedDish>& customerChannel) {std::string dishes[] = {"披萨", "意面", "沙拉", "牛排"};std::string dish = dishes[id % 4];// 下单Order order{id, dish};std::cout << "顾客 " << id << " 下单: " << dish << std::endl;waiterChannel.send(order);// 等待菜品CookedDish cookedDish;if (customerChannel.receive(cookedDish)) {std::cout << "顾客 " << id << " 收到: " << cookedDish.dish << std::endl;}
}int main() {// 创建各种ChannelChannel<Order> customerToWaiter(5); // 顾客到服务员的订单通道Channel<Order> waiterToChef(5); // 服务员到厨师的订单通道Channel<CookedDish> chefToWaiter(5); // 厨师到服务员的菜品通道Channel<CookedDish> waiterToCustomer(5); // 服务员到顾客的菜品通道// 启动厨师线程std::thread chefThread(chef, std::ref(waiterToChef), std::ref(chefToWaiter));// 启动服务员线程std::thread waiterThread(waiter, std::ref(customerToWaiter), std::ref(waiterToChef),std::ref(chefToWaiter), std::ref(waiterToCustomer));// 模拟多个顾客std::thread customers[4];for (int i = 0; i < 4; i++) {customers[i] = std::thread(customer, i, std::ref(customerToWaiter), std::ref(waiterToCustomer));}// 等待所有顾客完成for (int i = 0; i < 4; i++) {customers[i].join();}// 关闭通道并等待线程结束customerToWaiter.close();waiterToChef.close();chefToWaiter.close();waiterToCustomer.close();waiterThread.join();chefThread.join();std::cout << "餐厅打烊" << std::endl;return 0;
}
2. Actor 模式实现
接下来,我们使用 Actor 模式实现相同的餐厅场景:
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <string>
#include <functional>
#include <memory>
#include <unordered_map>// 消息基类
struct Message {virtual ~Message() = default;
};// 订单消息
struct OrderMessage : Message {int customerId;std::string dish;OrderMessage(int id, const std::string& d) : customerId(id), dish(d) {}
};// 完成烹饪消息
struct CookedMessage : Message {int customerId;std::string dish;CookedMessage(int id, const std::string& d) : customerId(id), dish(d) {}
};// 简单的Actor基类
class Actor {
protected:std::queue<std::unique_ptr<Message>> mailbox_;std::mutex mailboxMutex_;std::condition_variable mailboxCV_;bool running_ = false;std::thread thread_;public:virtual ~Actor() {stop();}void start() {running_ = true;thread_ = std::thread(&Actor::run, this);}void stop() {running_ = false;mailboxCV_.notify_all();if (thread_.joinable()) {thread_.join();}}void send(std::unique_ptr<Message> message) {std::lock_guard<std::mutex> lock(mailboxMutex_);mailbox_.push(std::move(message));mailboxCV_.notify_one();}void run() {while (running_) {std::unique_ptr<Message> message;{std::unique_lock<std::mutex> lock(mailboxMutex_);mailboxCV_.wait(lock, [this]() { return !mailbox_.empty() || !running_; });if (!running_ && mailbox_.empty()) {break;}if (!mailbox_.empty()) {message = std::move(mailbox_.front());mailbox_.pop();}}if (message) {processMessage(*message);}}}virtual void processMessage(Message& message) = 0;
};// 顾客Actor
class CustomerActor : public Actor {
private:int id_;std::string dish_;Actor* waiter_;public:CustomerActor(int id, const std::string& dish, Actor* waiter) : id_(id), dish_(dish), waiter_(waiter) {}void processMessage(Message& message) override {if (auto cookedMsg = dynamic_cast<CookedMessage*>(&message)) {std::cout << "顾客 " << id_ << " 收到: " << cookedMsg->dish << std::endl;}}void placeOrder() {std::cout << "顾客 " << id_ << " 下单: " << dish_ << std::endl;waiter_->send(std::make_unique<OrderMessage>(id_, dish_));}
};// 服务员Actor
class WaiterActor : public Actor {
private:Actor* chef_;std::unordered_map<int, CustomerActor*> customers_;public:WaiterActor(Actor* chef) : chef_(chef) {}void addCustomer(int customerId, CustomerActor* customer) {customers_[customerId] = customer;}void processMessage(Message& message) override {if (auto orderMsg = dynamic_cast<OrderMessage*>(&message)) {std::cout << "服务员接收订单: 顾客 " << orderMsg->customerId << " 点了 " << orderMsg->dish << std::endl;chef_->send(std::make_unique<OrderMessage>(*orderMsg));} else if (auto cookedMsg = dynamic_cast<CookedMessage*>(&message)) {std::cout << "服务员取菜: " << cookedMsg->dish << std::endl;auto it = customers_.find(cookedMsg->customerId);if (it != customers_.end()) {it->second->send(std::make_unique<CookedMessage>(*cookedMsg));}}}
};// 厨师Actor
class ChefActor : public Actor {
public:void processMessage(Message& message) override {if (auto orderMsg = dynamic_cast<OrderMessage*>(&message)) {std::cout << "厨师收到订单: 顾客 " << orderMsg->customerId << " 点了 " << orderMsg->dish << std::endl;// 模拟烹饪时间std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "厨师完成烹饪: " << orderMsg->dish << std::endl;// 发送完成消息给服务员(通过消息中的sender字段)// 这里简化处理,直接通过构造函数传递waiter引用}}
};int main() {// 创建厨师ChefActor chef;chef.start();// 创建服务员WaiterActor waiter(&chef);waiter.start();// 创建顾客std::string dishes[] = {"披萨", "意面", "沙拉", "牛排"};const int numCustomers = 4;std::unique_ptr<CustomerActor> customers[numCustomers];for (int i = 0; i < numCustomers; i++) {customers[i] = std::make_unique<CustomerActor>(i, dishes[i % 4], &waiter);waiter.addCustomer(i, customers[i].get());customers[i]->start();}// 顾客下单for (int i = 0; i < numCustomers; i++) {customers[i]->placeOrder();}// 等待一段时间让订单处理完成std::this_thread::sleep_for(std::chrono::seconds(10));// 停止所有Actorfor (int i = 0; i < numCustomers; i++) {customers[i]->stop();}waiter.stop();chef.stop();std::cout << "餐厅打烊" << std::endl;return 0;
}
两种模式的对比分析
CSP 模式特点:
- 通信通过 Channel:各个角色之间通过 Channel 进行通信,不直接知道对方的存在
- 同步点:Channel 的发送和接收操作可以是同步的,形成天然的同步点
- 解耦:生产者和消费者完全解耦,只需知道 Channel 接口
- 数据流清晰:数据流动路径明确,易于理解和调试
Actor 模式特点:
- 实体为中心:每个 Actor 是一个独立的实体,有自己的状态和行为
- 异步消息传递:Actor 之间通过异步消息进行通信
- 封装状态:每个 Actor 封装自己的状态,避免了共享状态的问题
- 位置透明:Actor 可以分布在不同的进程中甚至不同的机器上
适用场景:
- CSP 更适合需要明确数据流和控制流的场景
- Actor 更适合需要模拟现实世界实体和分布式系统的场景
这两个示例展示了如何使用 C++ 实现并发编程中的两种重要模式,通过餐厅点餐的生活化场景使得这些概念更加容易理解。