当前位置: 首页 > news >正文

C++ 并发编程指南 实现无锁队列

文章目录

    • 环形队列
    • ​**​无锁队列:旋转寿司店的智慧​**​
      • ​**​传统方案:锁(慢但安全)​**​
      • **​无锁方案:原子标签(高效协作)​**​
    • ​**​无锁队列核心代码实现 (C++11)​**​
      • ​**​关键点解释 (结合寿司店例子):​**​
      • ​**​优势:​**​
      • ​**​注意事项 (寿司店的局限):​**​
      • ​**​总结:​**​
  • 另一个详细点的说明
  • 从零开始理解无锁并发队列:像旋转餐桌一样高效存取数据
    • 现实生活中的环形队列:旋转餐桌
    • 有锁的实现:需要餐厅经理协调
    • 无锁的实现:厨师和客人自主协调
      • 基础无锁实现:单一的协调标记
      • 改进的无锁实现:独立的头和尾标记
      • 处理极端情况:确保数据完整性
      • 性能优化:精细控制内存访问顺序
    • 总结
  • 无锁并发队列:旋转餐桌的高效协作
    • 现实生活类比
    • 完整实现代码
    • 代码说明
      • 1. Dish类 - 代表餐桌上的菜品
      • 2. LockFreeCircularQueue类 - 无锁环形队列(旋转餐桌)
      • 3. 多线程模拟
    • 运行示例
    • 关键优势

环形队列

我们要实现无锁并发,经常会用到一种结构无锁队列,而无锁队列和我们经常使用的队列颇有不同,它采用的是环状的队列结构,为什么成环呢?主要有两个好处,一个是成环的队列大小是固定的,另外一个我们通过移动头和尾就能实现数据的插入和取出。

我们看下图是一个环形队列的基本结构
在这里插入图片描述

图1表示队列为空的时候,头节点和尾节点交会在一起,指向同一个扇区。

图2表示当我们你插入一个数字1后,队列大小为1,此时tail指针移动到下一个扇区,head指向头部,1被存储在头部了。

图3表示当我们将数字1出队后,head指针向后移动一个扇区,此时head和tail指向同一个扇区,表示队列又为空了。那有人会问队列中数字1为什么不清空呢?其实不用清空,因为当我们插入新数据时就可以覆盖掉1这个无效的数据。

比如我们继续3图,连续插入几个数字,将队列填满。

在这里插入图片描述


​无锁队列:旋转寿司店的智慧​

想象一下一家旋转寿司店:

  1. ​环形传送带(队列)​​:寿司在环形传送带上移动。
  2. ​厨师(生产者)​​:在传送带尾部放入新做好的寿司(push)。
  3. ​顾客(消费者)​​:在传送带头部取走寿司(pop)。
  4. ​空盘信号​​:当传送带上全是空盘时(head == tail),顾客知道没寿司了。
  5. ​满盘信号​​:当传送带的下一个位置就是头部时((tail + 1) % size == head),厨师知道没位置放新寿司了。

​问题:​​ 如果厨师和顾客很多(多线程),如何避免他们争抢位置时发生混乱(数据竞争)?

​传统方案:锁(慢但安全)​

就像给传送带入口和出口各装一扇门(mutex),厨师或顾客要操作时,必须关门(加锁),操作完再开门(解锁)。虽然安全,但效率低(排队等待)。

​无锁方案:原子标签(高效协作)​

我们给传送带装上智能标签系统(原子变量):

  1. _head标签​​:指向顾客下一个该取的盘子位置。
  2. _tail标签​​:指向厨师下一个该放寿司的盘子位置。
  3. _tail_update标签​​:指向​​已完成​​放置的最后一个盘子位置(解决“寿司还没放好就被拿”的问题)。

​无锁队列核心代码实现 (C++11)​

#include <iostream>
#include <atomic>
#include <memory>
#include <thread>
#include <vector>// 示例数据类
class SushiPlate {
public:int id;SushiPlate(int id = -1) : id(id) {}friend std::ostream& operator<<(std::ostream& os, const SushiPlate& plate) {os << "🍣 Plate #" << plate.id;return os;}
};template<typename T, size_t Cap>
class LockFreeCircularQueue {
private:static constexpr size_t _max_size = Cap + 1; // 实际容量+1 (区分空和满)T* _data;                                    // 存储数据的数组 (传送带)std::atomic<size_t> _head{0};                // 消费者取餐点 (原子标签)std::atomic<size_t> _tail{0};                // 生产者放餐点 (原子标签)std::atomic<size_t> _tail_update{0};          // 生产者完成放置点 (原子标签)// 内存管理辅助 (分配和释放数组)struct AllocHelper : public std::allocator<T> {T* allocate(size_t n) { return std::allocator<T>::allocate(n); }void deallocate(T* p, size_t n) { std::allocator<T>::deallocate(p, n); }} _alloc;public:// 构造函数:分配"传送带"LockFreeCircularQueue() : _data(_alloc.allocate(_max_size)) {}// 禁用拷贝 (多线程环境下拷贝无意义且危险)LockFreeCircularQueue(const LockFreeCircularQueue&) = delete;LockFreeCircularQueue& operator=(const LockFreeCircularQueue&) = delete;// 析构函数:清理"传送带"~LockFreeCircularQueue() {// 简单处理:假设队列已空。生产环境需更严谨。_alloc.deallocate(_data, _max_size);}// 厨师放寿司 (Push)bool push(const T& plate) {size_t current_tail;// 步骤1: 申请一个空盘子位置do {current_tail = _tail.load(std::memory_order_relaxed); // 放松读:位置不重要// 检查传送带是否已满 (下一个位置就是头)if ((current_tail + 1) % _max_size ==_head.load(std::memory_order_acquire)) { // 获取读:确保看到最新的_head// std::cout << "Queue full! Can't add plate #" << plate.id << std::endl;return false;}// 尝试将_tail标签推进到下一个位置 (CAS: 比较并交换)} while (!_tail.compare_exchange_strong(current_tail, (current_tail + 1) % _max_size,std::memory_order_release,    // 成功时:释放屏障 (保证下面的写操作不会被重排到上面)std::memory_order_relaxed)); // 失败时:放松 (继续尝试)// 步骤2: 安全地将寿司放到申请到的盘子上_data[current_tail] = plate; // 写入数据// 步骤3: 更新完成标签_tail_update (告诉顾客这个位置的寿司已放好)size_t expected_update = current_tail;while (!_tail_update.compare_exchange_strong(expected_update, (expected_update + 1) % _max_size,std::memory_order_release,    // 成功时:释放屏障 (保证_data写入先于_tail_update更新)std::memory_order_relaxed)) { // 失败时:放松 (继续尝试)expected_update = current_tail; // 重置期望值 (其他线程可能修改了_tail_update)}// std::cout << "Chef placed: " << plate << " at position " << current_tail << std::endl;return true;}// 顾客取寿司 (Pop)bool pop(T& plate) {size_t current_head;do {current_head = _head.load(std::memory_order_relaxed); // 放松读:位置不重要// 检查传送带是否为空 (头尾相遇)if (current_head == _tail.load(std::memory_order_acquire)) { // 获取读:确保看到最新的_tail// std::cout << "Queue empty! Nothing to take." << std::endl;return false;}// 检查目标位置的寿司是否真的放好了 (避免厨师还在摆盘)if (current_head == _tail_update.load(std::memory_order_acquire)) { // 获取读:确保看到最新的_tail_updatereturn false; // 这个位置的寿司还没摆好,稍后再试}// 步骤1: 取出寿司 (注意:用拷贝,不能移动!其他顾客可能还在看这个位置)plate = _data[current_head]; // 读取数据// 步骤2: 尝试将_head标签推进到下一个位置 (CAS)} while (!_head.compare_exchange_strong(current_head, (current_head + 1) % _max_size,std::memory_order_release,    // 成功时:释放屏障 (保证plate读取先于_head更新)std::memory_order_relaxed)); // 失败时:放松 (继续尝试)// std::cout << "Customer took: " << plate << " from position " << current_head << std::endl;return true;}
};// 测试函数:模拟厨师和顾客
void testLockFreeQueue() {const size_t capacity = 5; // 传送带容量 (实际可放4盘寿司)LockFreeCircularQueue<SushiPlate, capacity> sushiBelt;// 厨师线程 (生产者)auto chef = [&sushiBelt]() {for (int i = 1; i <= 10; ++i) { // 尝试做10盘寿司SushiPlate plate(i);while (!sushiBelt.push(plate)) {// 传送带满了,厨师休息一下std::this_thread::yield();}std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 模拟做寿司时间}};// 顾客线程 (消费者)auto customer = [&sushiBelt]() {SushiPlate plate;for (int i = 0; i < 5; ++i) { // 尝试吃5盘寿司while (!sushiBelt.pop(plate)) {// 传送带空了或想吃的还没摆好,顾客等待std::this_thread::yield();}std::cout << "Eaten: " << plate << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟吃寿司时间}};// 启动1个厨师和2个顾客std::thread chefThread(chef);std::thread customerThread1(customer);std::thread customerThread2(customer);chefThread.join();customerThread1.join();customerThread2.join();std::cout << "All sushi served and eaten!" << std::endl;
}int main() {testLockFreeQueue();return 0;
}

​关键点解释 (结合寿司店例子):​

  1. push 流程 (厨师放寿司):​

    • ​申请空位:​​ 厨师查看_tail找到下一个空盘位置。如果发现下个位置就是_head(传送带满),等待。
    • ​移动_tail标签:​​ 厨师用CAS操作把_tail标签贴到下一个空位 (原子操作,防止多个厨师抢同一个位置)。
    • ​放寿司:​​ 厨师把寿司放到刚刚申请到的盘子上 (_data[current_tail] = plate)。
    • ​更新完成标签:​​ 厨师把_tail_update标签移动到当前盘子位置,告诉顾客“这盘可以吃了”。
  2. pop 流程 (顾客取寿司):​

    • ​查看位置:​​ 顾客查看_head位置。如果_head == _tail(传送带空),等待。
    • ​检查是否就绪:​​ 顾客检查_head位置的盘子,其位置是否小于_tail_update(寿司是否摆好)。如果没摆好 (_head == _tail_update),等待。
    • ​取寿司:​​ 顾客从_head位置端走寿司 (plate = _data[current_head])。(拷贝,不能移动!其他顾客可能还在看这个位置)。
    • ​移动_head标签:​​ 顾客用CAS操作把_head标签贴到下一个有寿司的位置。
  3. ​内存顺序 (memory_order):​

    • acquire (获取):​​ 顾客在读取_tail_tail_update时用acquire。这确保顾客能看到厨师在这些标签​​之前​​对_data的写入(即看到最新做好的寿司)。
    • release (释放):​​ 厨师在更新_tail_tail_update时用release。这确保厨师对_data的写入(放寿司)​​一定完成​​在这些标签更新​​之前​​(顾客看到标签更新时,寿司肯定已放好)。
    • relaxed (放松):​​ 用于非关键操作(如读取当前_head/_tail尝试值),允许编译器/CPU优化,提高性能。
  4. CAS (Compare-And-Swap):​

    • 这是无锁并发的基石。它原子性地检查标签值是否还是我上次看到的 (expected),如果是,就更新为新值 (desired)。如果不是(被其他线程改了),就告诉我失败,我重试。
    • 这避免了使用锁,厨师和顾客可以同时操作(只要他们操作的是传送带上不同的位置)。

​优势:​

  • ​高并发:​​ 多个厨师和顾客可以同时操作传送带的不同部分,效率远高于锁。
  • ​无阻塞:​​ 一个线程操作失败(CAS失败)不会阻塞,它会立即重试或做其他事,避免死锁。

​注意事项 (寿司店的局限):​

  • ​适合小数据:​​ 就像寿司店适合放小份寿司,这个队列最适合存放int, 指针等小数据。如果存放大对象(如满汉全席),pop时的拷贝 (plate = _data[current_head]) 开销很大,且多个顾客竞争时失败的拷贝操作是浪费。
  • ​容量固定:​​ 传送带大小是固定的。
  • ​ABA问题:​​ 这个简单实现没有处理极端情况下的ABA问题(对于基本类型或指针通常没问题)。更复杂的场景需要版本号或double-word CAS

​总结:​

这个无锁队列就像一家高效的旋转寿司店,厨师和顾客通过智能标签 (_head, _tail, _tail_update) 和原子操作 (CAS) 协同工作,无需排队等待锁 (mutex),大大提升了并发处理能力。核心在于精细的标签管理和内存顺序保证,确保数据在并发读写时的正确性。



另一个详细点的说明

从零开始理解无锁并发队列:像旋转餐桌一样高效存取数据

现实生活中的环形队列:旋转餐桌

想象一下餐厅里的旋转餐桌,厨师将菜品放在转盘上,客人从转盘上取用菜品。这个转盘有固定数量的位置,厨师在一个位置放菜,客人在另一个位置取菜,两者互不干扰。这就是环形队列的现实例子。

当转盘空的时候,厨师和客人的位置标记会重合;当转盘满的时候,厨师需要等待客人取走菜品才能继续放新菜。这种设计既高效又自然,避免了厨师和客人之间的直接冲突。

有锁的实现:需要餐厅经理协调

在多线程环境中,如果我们使用传统的锁机制来实现这个"旋转餐桌",就好比聘请了一位餐厅经理。每次厨师要放菜或客人要取菜时,都需要先获得经理的许可。

// 简化的有锁队列核心代码
bool push(const T& val) {std::lock_guard<std::mutex> lock(_mtx); // 获取经理许可if (队列已满) return false;            // 经理检查转盘是否已满// 在指定位置放置菜品_data[_tail] = val;_tail = (_tail + 1) % _max_size;       // 移动尾部标记return true;
}

这种方式虽然安全,但效率不高——每个操作都需要经理协调,即使厨师和客人操作的是转盘的不同部分。

无锁的实现:厨师和客人自主协调

无锁并发就像去掉餐厅经理,让厨师和客人自主协调。他们通过观察转盘上的标记来决定何时可以操作,无需等待中央协调员。

基础无锁实现:单一的协调标记

最初的想法是使用一个原子标记来表示转盘是否正在被使用:

template<typename T, size_t Cap>
class CircularQueSeq {// ... 其他成员std::atomic<bool> _atomic_using; // 标记转盘是否正在使用bool emplace(Args&&... args) {bool use_expected = false, use_desired = true;// 循环尝试获取使用权do {use_expected = false;use_desired = true;} while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));// 实际操作...// 释放使用权do {use_expected = true;use_desired = false;} while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));return true;}
};

这种方法虽然实现了无锁,但仍然像有一个隐形的经理——同一时间只能有一个厨师或客人操作转盘。

改进的无锁实现:独立的头和尾标记

更高效的方法是让厨师和客人有各自的标记,他们只需要关注自己需要操作的位置:

template<typename T, size_t Cap>
class CircularQueLight {// ... 其他成员std::atomic<size_t> _head; // 客人取菜位置std::atomic<size_t> _tail; // 厨师放菜位置bool push(const T& val) {size_t t;do {t = _tail.load();           // 查看当前放菜位置if (转盘已满) return false; // 检查是否还有空位} while (!_tail.compare_exchange_strong(t, (t + 1) % _max_size));_data[t] = val; // 放置菜品return true;}bool pop(T& val) {size_t h;do {h = _head.load();            // 查看当前取菜位置if (转盘为空) return false;  // 检查是否有菜品可取val = _data[h];              // 取走菜品} while (!_head.compare_exchange_strong(h, (h + 1) % _max_size));return true;}
};

这种方法更加高效,因为多个厨师可以同时在不同位置放菜,多个客人也可以同时在不同位置取菜。

处理极端情况:确保数据完整性

但上面的简单实现有一个问题:当厨师刚移动了尾部标记但还没放好菜时,客人可能已经来取菜了。这就像厨师刚标记了一个位置要放菜,但菜还没放到转盘上,客人就试图从这个位置取菜。

为了解决这个问题,我们引入第三个标记_tail_update,表示菜品已实际放置完成的位置:

template<typename T, size_t Cap>
class CircularQueLight {// ... 其他成员std::atomic<size_t> _head;         // 客人取菜位置std::atomic<size_t> _tail;         // 厨师计划放菜位置std::atomic<size_t> _tail_update;  // 菜品实际已放好的位置bool push(const T& val) {size_t t;do {t = _tail.load();if (转盘已满) return false;} while (!_tail.compare_exchange_strong(t, (t + 1) % _max_size));_data[t] = val; // 放置菜品// 更新实际已放好菜品的位置size_t tailup;do {tailup = t;} while (_tail_update.compare_exchange_strong(tailup, (tailup + 1) % _max_size));return true;}bool pop(T& val) {size_t h;do {h = _head.load();if (转盘为空) return false;// 检查想要取菜的位置是否已实际放好菜品if (h == _tail_update.load()) return false;val = _data[h];} while (!_head.compare_exchange_strong(h, (h + 1) % _max_size));return true;}
};

这样,只有当菜品实际放好后,客人才会取走它,保证了数据的完整性。

性能优化:精细控制内存访问顺序

最后,我们可以通过精细控制内存访问顺序来进一步提升性能。默认的内存顺序保证过于严格,我们可以根据实际需要放松一些要求:

bool push(const T& val) {size_t t;do {t = _tail.load(std::memory_order_relaxed); //  relaxed 顺序,性能更高if ((t + 1) % _max_size == _head.load(std::memory_order_acquire)) // 需要acquire顺序return false;} while (!_tail.compare_exchange_strong(t, (t + 1) % _max_size, std::memory_order_release, std::memory_order_relaxed));_data[t] = val;size_t tailup;do {tailup = t;} while (_tail_update.compare_exchange_strong(tailup, (tailup + 1) % _max_size,std::memory_order_release, std::memory_order_relaxed));return true;
}

这种优化相当于告诉处理器:哪些操作需要严格顺序,哪些可以放松要求,从而提升整体性能。

总结

无锁并发队列就像高效的旋转餐桌:

  • 有锁实现需要中央经理协调所有操作
  • 基础无锁实现去掉了经理,但仍有隐形的协调员
  • 改进无锁实现让厨师和客人自主协调,效率更高
  • 完整无锁实现通过额外标记保证数据完整性
  • 性能优化通过精细控制内存顺序进一步提升效率

无锁编程的优势在于高并发场景下的性能表现,但它更适合存储简单的数据类型。对于复杂的对象,拷贝开销可能会抵消无锁带来的性能优势。

这种设计模式广泛应用于高性能计算、游戏服务器、金融交易系统等对并发性能要求极高的领域。

无锁并发队列:旋转餐桌的高效协作

下面我将使用现实生活中旋转餐桌的例子,实现一个完整的无锁并发队列。

现实生活类比

想象一个旋转餐桌:

  • 厨师(生产者线程)在餐桌的一侧放置菜品
  • 客人(消费者线程)在餐桌的另一侧取用菜品
  • 餐桌标记(原子变量)指示下一个放置/取用的位置
  • 菜品放置完成标记(额外原子变量)确保客人不会取到还未放好的菜品

完整实现代码

#include <iostream>
#include <atomic>
#include <memory>
#include <thread>
#include <vector>// 简单的数据类,代表餐桌上的菜品
class Dish {
public:Dish() : id(0), name("Empty") {}Dish(int id, const std::string& name) : id(id), name(name) {}friend std::ostream& operator<<(std::ostream& os, const Dish& dish) {os << "Dish #" << dish.id << " (" << dish.name << ")";return os;}private:int id;std::string name;
};// 无锁环形队列(旋转餐桌)
template<typename T, size_t Capacity>
class LockFreeCircularQueue {
public:LockFreeCircularQueue() : max_size(Capacity + 1), data(std::make_unique<T[]>(max_size)),head(0), tail(0),tail_update(0) {}// 禁止拷贝LockFreeCircularQueue(const LockFreeCircularQueue&) = delete;LockFreeCircularQueue& operator=(const LockFreeCircularQueue&) = delete;// 放置菜品(生产者)bool push(const T& dish) {size_t current_tail;// 循环尝试获取放置位置do {current_tail = tail.load(std::memory_order_relaxed);// 检查餐桌是否已满if ((current_tail + 1) % max_size == head.load(std::memory_order_acquire)) {std::cout << "Table is full! Can't place more dishes.\n";return false;}// 尝试移动尾部标记,如果失败则重试(其他厨师可能抢先了一步)} while (!tail.compare_exchange_weak(current_tail, (current_tail + 1) % max_size,std::memory_order_release,std::memory_order_relaxed));// 在实际位置放置菜品data[current_tail] = dish;std::cout << "Placed " << dish << " at position " << current_tail << "\n";// 更新菜品放置完成标记size_t current_tail_update = current_tail;while (!tail_update.compare_exchange_weak(current_tail_update,(current_tail_update + 1) % max_size,std::memory_order_release,std::memory_order_relaxed)) {// 如果更新失败,重试(其他厨师可能也在更新)current_tail_update = tail_update.load(std::memory_order_relaxed);}return true;}// 取用菜品(消费者)bool pop(T& dish) {size_t current_head;// 循环尝试获取取用位置do {current_head = head.load(std::memory_order_relaxed);// 检查餐桌是否为空if (current_head == tail_update.load(std::memory_order_acquire)) {std::cout << "Table is empty! No dishes to take.\n";return false;}// 检查菜品是否已实际放置好if (current_head == tail_update.load(std::memory_order_acquire)) {return false;}// 获取当前菜品dish = data[current_head];// 尝试移动头部标记,如果失败则重试(其他客人可能抢先了一步)} while (!head.compare_exchange_weak(current_head,(current_head + 1) % max_size,std::memory_order_release,std::memory_order_relaxed));std::cout << "Took " << dish << " from position " << current_head << "\n";return true;}// 查看餐桌状态void print_status() const {std::cout << "Table status: Head=" << head.load() << ", Tail=" << tail.load() << ", TailUpdate=" << tail_update.load() << ", Capacity=" << Capacity << "\n";}private:const size_t max_size;std::unique_ptr<T[]> data;// 原子变量:标记下一个放置/取用的位置alignas(64) std::atomic<size_t> head;  // 客人取菜位置alignas(64) std::atomic<size_t> tail;  // 厨师计划放菜位置alignas(64) std::atomic<size_t> tail_update;  // 菜品实际已放好的位置
};// 厨师线程函数(生产者)
void chef_thread(LockFreeCircularQueue<Dish, 5>& table, int chef_id) {std::vector<Dish> specials = {Dish(1, "Spring Rolls"),Dish(2, "Kung Pao Chicken"),Dish(3, "Sweet and Sour Pork"),Dish(4, "Mapo Tofu"),Dish(5, "Peking Duck"),Dish(6, "Dim Sum"),Dish(7, "Fried Rice"),Dish(8, "Wonton Soup")};for (const auto& dish : specials) {// 随机延迟,模拟准备时间std::this_thread::sleep_for(std::chrono::milliseconds(100 + rand() % 200));if (table.push(dish)) {std::cout << "Chef " << chef_id << " successfully placed " << dish << "\n";} else {std::cout << "Chef " << chef_id << " failed to place " << dish << "\n";}// 偶尔查看餐桌状态if (rand() % 4 == 0) {table.print_status();}}
}// 客人线程函数(消费者)
void guest_thread(LockFreeCircularQueue<Dish, 5>& table, int guest_id) {Dish dish;for (int i = 0; i < 6; ++i) {// 随机延迟,模拟用餐时间std::this_thread::sleep_for(std::chrono::milliseconds(150 + rand() % 300));if (table.pop(dish)) {std::cout << "Guest " << guest_id << " is enjoying " << dish << "\n";} else {std::cout << "Guest " << guest_id << " found no dish to take\n";}// 偶尔查看餐桌状态if (rand() % 5 == 0) {table.print_status();}}
}int main() {std::cout << "=== Restaurant Table Simulation ===\n";std::cout << "A circular table with 5 seats, 2 chefs and 3 guests\n\n";LockFreeCircularQueue<Dish, 5> table;// 创建厨师和客人线程std::vector<std::thread> threads;// 2个厨师线程for (int i = 1; i <= 2; ++i) {threads.emplace_back(chef_thread, std::ref(table), i);}// 3个客人线程for (int i = 1; i <= 3; ++i) {threads.emplace_back(guest_thread, std::ref(table), i);}// 等待所有线程完成for (auto& t : threads) {t.join();}std::cout << "\n=== Simulation ended ===\n";table.print_status();return 0;
}

代码说明

1. Dish类 - 代表餐桌上的菜品

  • 每个菜品有ID和名称
  • 简单易懂的输出操作符

2. LockFreeCircularQueue类 - 无锁环形队列(旋转餐桌)

  • push方法:厨师放置菜品

    • 获取下一个可用位置
    • 检查餐桌是否已满
    • 放置菜品并更新标记
  • pop方法:客人取用菜品

    • 获取下一个可取用的位置
    • 检查餐桌是否为空
    • 确保菜品已实际放置好
    • 取用菜品并更新标记
  • 内存顺序优化

    • memory_order_relaxed:用于不需要严格同步的操作
    • memory_order_acquire/release:确保关键操作的可见性和顺序性

3. 多线程模拟

  • 厨师线程:不断准备并放置菜品
  • 客人线程:不断取用并享用菜品
  • 随机延迟:模拟真实世界中的不确定时间

运行示例

运行此程序,你会看到类似以下输出:

=== Restaurant Table Simulation ===
A circular table with 5 seats, 2 chefs and 3 guestsPlaced Dish #1 (Spring Rolls) at position 0
Chef 1 successfully placed Dish #1 (Spring Rolls)
Took Dish #1 (Spring Rolls) from position 0
Guest 1 is enjoying Dish #1 (Spring Rolls)
Placed Dish #2 (Kung Pao Chicken) at position 1
Chef 2 successfully placed Dish #2 (Kung Pao Chicken)
Table status: Head=1, Tail=2, TailUpdate=2, Capacity=5
...

这个实现展示了多个厨师和客人如何高效协作,无需显式锁机制,就像现实生活中的旋转餐桌一样自然流畅。

关键优势

  1. 高并发性能:多个生产者和消费者可以同时操作
  2. 无锁设计:避免了锁竞争带来的性能瓶颈
  3. 内存顺序优化:精细控制内存访问,提升性能
  4. 线程安全:通过原子操作和适当的内存屏障保证数据一致性

这种无锁队列设计非常适合高性能服务器、实时系统和其他需要高并发处理的场景。

http://www.xdnf.cn/news/1481527.html

相关文章:

  • Sentinel服务治理:服务降级、熔断与线程隔离
  • 新后端漏洞(上)- Weblogic SSRF漏洞
  • 「数据获取」《中国服务业统计与服务业发展(2014)》
  • 详解flink性能优化
  • docker使用nginxWebUI配置
  • OSG工具集
  • Python错误测试与调试——文档测试
  • ElemenetUI之常用小组件
  • Elasticsearch面试精讲 Day 10:搜索建议与自动补全
  • GEE:基于自定义的年度时序数据集进行LandTrendr变化检测
  • Qt UDP通信学习
  • 《sklearn机器学习——模型的持久性》joblib 和 pickle 进行模型保存和加载
  • python的数据结构
  • redission实现读写锁的原理
  • TDengine 时间函数 WEEKDAY() 用户手册
  • 【PCIe EP 设备入门学习专栏 -- 8 PCIe EP 架构详细介绍】
  • dask.dataframe.shuffle.set_index中获取 divisions 的步骤分析
  • 单例模式(巨通俗易懂)普通单例,懒汉单例的实现和区别,依赖注入......
  • 【C++题解】DFS和BFS
  • leetcode 75 颜色分类
  • OS项目构建效能改进策划方案
  • 神马 M60S++ 238T矿机参数解析:高效SHA-256算法比拼
  • Docker加速下载镜像的配置指南
  • 计算机网络:物理层---数据通信基础知识
  • 【C++ 11 模板类】tuple 元组
  • 嵌入式笔记系列——UART:TTL-UART、RS-232、RS-422、RS-485
  • 旧电脑改造linux服务器2:安装系统
  • 软考中级习题与解答——第二章_程序语言与语言处理程序(3)
  • AD渗透中服务账号相关攻击手法总结(Kerberoasting、委派)
  • 数据仓库概要