当前位置: 首页 > ai >正文

读者写者问题

读者写者问题

本文介绍操作系统经典的 读者-写者问题(Reader-Writers Problem)

并且用 C++20 的标准库提供的工具(如互斥锁、信号量等)来实现解决方案。

下面的注意事项是计算机操作系统的规定(也就是无论linux windows 还是什么都是这样)

注意,二值信号量和锁是有区别的!

Mutex “锁”,用于保护共享资源,强调“谁持有,谁释放”。

而二值信号量,无所有者,任何线程均可释放信号量。

什么是“读者-写者问题”?

问题背景:

假设有一个共享资源(比如一块内存、一个文件),多个线程可以访问它:

  • 读者线程(Readers):只读取数据,不修改;
  • 写者线程(Writers):会修改数据;

冲突点:

  • 多个读者同时读取是可以的(不会破坏数据);
  • 但一个写者在写时,不能有其他任何线程(包括读者和写者)访问该资源;
  • 否则会导致数据竞争或一致性错误。

额外提一下,这个共享资源应当“比较大”,访问有一定的时间开销,如果是一个变量这种,加个锁,多个线程等待访问就可以了,反正访问几乎瞬间完成。

目标

读取不会破坏原本数据,因此,多个读者可以同时读取,但是只要有人开始写入,其它人既不能读取,也不能写入(否则数据就乱套了)
因此我们要设计一种机制,使得:

  1. 多个读者可以同时读
  2. 写者独占访问资源

解决方法

读者优先

首先说明解决读者写者问题最简单的方法,也就是“读者优先”

读者优先策略含义:只要还有读者在读,新的读者可以继续进来读,而写者必须等待。

算法说明
核心变量说明:
int readers_count = 0;        // 当前正在读的读者数量
std::mutex readers_count_mutex;      // 保护 readers_count 的互斥锁
std::binary_semaphore resource_semaphore(1);//保护共享资源的二值信号量
流程图说明
👤 读者逻辑:
[开始]↓
获取 mtx_count 锁↓
read_count 增加 1↓
如果是第一个读者 → 获取 wrt 锁(阻塞写者)↓
释放 mtx_count 锁↓
【执行读操作】↓
获取 mtx_count 锁↓
read_count 减少 1↓
如果是最后一个读者 → 释放 wrt 锁(允许写者进入)↓
释放 mtx_count 锁
[结束]
✍️ 写者逻辑:
[开始]↓
获取 wrt 锁(如果已有读者在读,则等待)↓
【执行写操作】↓
释放 wrt 锁
[结束]
参考代码

reader_first.cpp

/* 读者优先 */#include <iostream>
using namespace std;#include <thread>
#include <semaphore>//需要开启 C++ 20 的支持
#include <mutex>#include <chrono>int readers_count = 0;//读者数量
mutex readers_count_mutex;//保护读者数量的互斥锁(使用原子操作 atomic 也可以)uint8_t resource[1024*1024] = { 0 };//共享的资源,这里以一块内存为例
binary_semaphore resource_semaphore(1);//保护共享资源的二值信号量/*** @brief 读者线程的函数* @param readerNumer 读者编号 调试用*/
void reader_function(int readerNumer)
{cout << "读者线程 " << readerNumer << "建立  线程id " << this_thread::get_id() << endl;while (true){/* 读者读取之前应当执行的内容 */readers_count_mutex.lock();//加锁,保护读者计数if (readers_count == 0)//如果是第一个读者{resource_semaphore.acquire();//获取资源信号量}readers_count++;//读者数量增加readers_count_mutex.unlock();//解锁/* 读者读取 */////当比较大时,如果放到栈(stack)区,会出现 栈溢出 的错误//uint8_t readerBuffer[1024 * 1024] = { 0 };uint8_t* readerBuffer = new uint8_t[1024 * 1024];//比较大需要放到堆(heap)中memcpy(readerBuffer, resource, sizeof(resource));//读取cout << " 读者" << readerNumer << "  读取到内容的前8个字节" << endl;for (int i = 0; i < 8; i++){printf("%#X ", readerBuffer[i]);}cout << endl;delete[] readerBuffer;this_thread::sleep_for(chrono::milliseconds(25));//延时,模拟对读取数据以及处理花费时间/*** @note 当它比较大时就可以模拟“读者源源不断”的到达* @note 读者可以同时读,所以读取总时间小于各个读者读取时间之和* @note 对于读者优先当这个时间超过所有写者需要的时间之和,写者就会饥饿*///this_thread::sleep_for(chrono::milliseconds(55));//延时,模拟对读取数据以及处理花费时间/* 读者读取完毕应当执行的内容 */readers_count_mutex.lock();//加锁,保护读者计数        readers_count--;//读者数量减少if (readers_count == 0)//如果是最后一个读者{resource_semaphore.release();//释放资源信号量}readers_count_mutex.unlock();//解锁this_thread::sleep_for(chrono::milliseconds(1));//延时}
}/*** @brief 写者线程* @param writerNumber 写者编号 调试用*/
void writer_function(int writerNumber)
{cout << "写者线程 " << writerNumber << "建立  线程id " << this_thread::get_id() << endl;static int count = 0;//调试用while (true){/* 写者写入之前应当执行的内容 */        resource_semaphore.acquire();//获取资源信号量        /* 写者写入 */count++;for (int i = 0; i < sizeof(resource); i++)//写入数据{resource[i] = (i + count) % 256;}cout << "写者 " << writerNumber << "写入内容的前8个字节是" << endl;for (int i = 0; i < 8; i++){printf("%#X ", resource[i]);}cout << endl;this_thread::sleep_for(chrono::milliseconds(25));//延时,模拟大量数据写入花费的时间/* 写者写入完毕应当执行的内容 */        resource_semaphore.release();//释放资源信号量this_thread::sleep_for(chrono::milliseconds(1));//延时}
}int main()
{std::cout << "开始测试\n";std::thread readers[5], writers[2];for (int i = 0; i < 5; ++i) readers[i] = std::thread(reader_function, i);for (int i = 0; i < 2; ++i) writers[i] = std::thread(writer_function, i);for (auto& t : readers) t.join();for (auto& t : writers) t.join();return 0;
}
分析

读者优先比较简单不再赘述

显然,当有源源不断的读者到达时, resource_semaphore 不会被读者释放,写者进程饥饿。

读写公平

读写公平的核心思想是:遵循到达顺序,无论是读者还是写者,先到者先获得资源,彻底避免饥饿问题。

算法说明
1. 全局变量与同步工具
变量/工具作用
int readers_count = 0当前正在读取的读者数量
std::mutex readers_count_mutex保护 readers_count 的互斥锁
std::binary_semaphore resource_semaphore(1)保护共享资源的二值信号量(读者和写者竞争)
std::mutex service_mutex服务信锁,用来实现“先来先服务”,强制所有线程按顺序请求访问(公平性的关键)

服务锁 (service_mutex)

  • 所有线程(读者和写者)必须首先获取 service,确保严格按到达顺序排队。
  • 这是公平性的核心机制,防止读者或写者插队。
  • service_mutex 的作用:强制所有线程按顺序排队,类似于现实中的“取号机”。
  • 无优先级差异:读者和写者完全平等竞争,先到者先服务。

进一步的讨论请看后面的 分析

2. 流程图说明
👤读者线程流程

读者逻辑

  • 第一个读者获取 resource_sem,最后一个读者释放。
  • 多个读者可并发读取,但必须通过 service_queue 排队。
[开始]↓
lock(service)       // 进入服务↓
lock(read_mutex)          // 保护读者计数↓
if (read_count == 0)      // 第一个读者?↓resource_sem.acquire() // 获取资源锁↓
read_count++↓
unlock(read_mutex)↓
unlock(service)     // 退出服务(允许后续线程被服务)↓
[读取共享资源]             // 临界区(可并发读)↓
lock(read_mutex)↓
read_count--↓
if (read_count == 0)      // 最后一个读者?↓resource_sem.release() // 释放资源锁↓
unlock(read_mutex)↓
[结束]
✍写者线程流程

写者逻辑

  • 写者直接请求 resource_sem,但必须通过 service_queue 排队。
  • 写者执行时会独占资源,阻塞所有后续读者和写者。
[开始]↓
lock(service)       // 进入服务↓
resource_sem.acquire()    // 直接请求资源锁(独占)↓
unlock(service)     // 退出服务↓
[写入共享资源]            // 临界区(独占)↓
resource_sem.release()↓
[结束]
参考代码
/* 读写公平 
*  严格按到达顺序服务,无论是读者还是写者,先到者先获得资源
*/#include <iostream>
using namespace std;#include <thread>
#include <semaphore>//需要开启 C++ 20 的支持
#include <mutex>#include <chrono>uint8_t resource[1024 * 1024] = { 0 };//共享的资源,这里以一块内存为例
binary_semaphore resource_semaphore(1);//保护共享资源的二值信号量int readers_count = 0;//读者数量
mutex readers_count_mutex;//保护读者数量的互斥锁(使用原子操作 atomic 也可以)mutex service_mutex;//服务锁,用来实现“先来先服务”/*** @brief 读者线程的函数* @param readerNumer 读者编号 调试用*/
void reader_function(int readerNumer)
{cout << "读者线程 " << readerNumer << "建立  线程id " << this_thread::get_id() << endl;while (true){/* 读者读取之前应当执行的内容 */service_mutex.lock();//等待服务readers_count_mutex.lock();//加锁,保护读者计数if (readers_count == 0)//如果是第一个读者{resource_semaphore.acquire();//获取资源信号量}readers_count++;//读者数量增加readers_count_mutex.unlock();//解锁service_mutex.unlock();/* 读者读取 */////当比较大时,如果放到栈(stack)区,会出现 栈溢出 的错误//uint8_t readerBuffer[1024 * 1024] = { 0 };uint8_t* readerBuffer = new uint8_t[1024 * 1024];//比较大需要放到堆(heap)中memcpy(readerBuffer, resource, sizeof(resource));//读取cout << "读者 " << readerNumer << "读取到内容的前8个字节" << endl;for (int i = 0; i < 8; i++){printf("%#X ", readerBuffer[i]);}cout << endl;delete[] readerBuffer;this_thread::sleep_for(chrono::milliseconds(25));//延时,模拟对读取到的数据进行某些处理/* 读者读取完毕应当执行的内容 */readers_count_mutex.lock();//加锁,保护读者计数        readers_count--;//读者数量减少if (readers_count == 0)//如果是最后一个读者{resource_semaphore.release();//释放资源信号量}readers_count_mutex.unlock();//解锁this_thread::sleep_for(chrono::milliseconds(1));//延时}
}/*** @brief 写者线程* @param writerNumber 写者编号 调试用*/
void writer_function(int writerNumber)
{cout << "写者线程 " << writerNumber << "建立  线程id " << this_thread::get_id() << endl;static int count = 0;//调试用while (true){/* 写者写入之前应当执行的内容 */service_mutex.lock();//等待服务resource_semaphore.acquire();//获取资源信号量service_mutex.unlock();/* 写者写入 */count++;for (int i = 0; i < sizeof(resource); i++)//写入数据{resource[i] = (i + count) % 256;}cout << "写者 " << writerNumber << "写入内容的前8个字节是" << endl;for (int i = 0; i < 8; i++){printf("%#X ", resource[i]);}cout << endl;this_thread::sleep_for(chrono::milliseconds(25));//延时,模拟大量数据写入花费的时间/* 写者写入完毕应当执行的内容 */resource_semaphore.release();//释放资源信号量        this_thread::sleep_for(chrono::milliseconds(1));//延时}
}int main()
{std::cout << "开始测试\n";std::thread readers[5], writers[2];for (int i = 0; i < 5; ++i) readers[i] = std::thread(reader_function, i);for (int i = 0; i < 2; ++i) writers[i] = std::thread(writer_function, i);for (auto& t : readers) t.join();for (auto& t : writers) t.join();return 0;
}
分析

这样想比较清晰,读写公平的算法分成了两部分

1、共享资源的获取,由 resource_semaphore 控制

2、调度服务的获取,由 service_mutex 控制

注意:

标准库中 信号量的 acquare 是“一直等,等到后获取”

标准库中 锁的 lock 是“一直等,等到后上锁”

分析以下并发执行情况

读者1 正在读取时,写者1到达,随后读者2到达

这是体现 service_mutex 作用的重要情况!

当 读者1 正在读取时,service_mutex 已经被 读者1 解锁, resource_semaphore 仍然被 读者1 获取。也就是说, 读者1 用完了 调度服务 正在占用 共享资源 。

当 写者1 到达后, 写者1 service_mutex.lock() 加锁,占用 调度服务 ,然后 resource_semaphore.acquare() 等待共享资源

当 读者2 到达后, 读者2 service_mutex.lock() 在这里挂起,等待 调度服务

所以,这种情况,读者1读取完后,写者1写入,然后读者2读取,“遵循到达顺序”体现了“读写公平”

假如没有 service_mutex(其实就是前面的读者优先算法)

读者1正在读取时,写者1到达后等待,然后,读者2到达不需要等待就可以开始读取!

读者2”插入“到了写者1的前面!

写者优先

算法说明
1. 全局变量与同步工具
变量/工具作用
int read_count = 0当前正在读取的读者数量
int write_count = 0当前等待和正在写入的写者总数
std::mutex read_mutex保护 read_count 的互斥锁
std::mutex write_mutex保护 write_count 的互斥锁
std::binary_semaphore resource_sem(1)保护共享资源的二值信号量
std::binary_semaphore read_allow_semaphore(1)是否允许读取的信号量(写者优先的关键)

2. 流程图
👤读者线程流程

读者线程需要做:

  1. 通过 read_allow_semaphore 检查是否允许读取
  2. 如果是第一个读者,获取 resource_sem
  3. 执行读取操作
  4. 如果是最后一个读者,释放 resource_sem
开始
↓
read_allow_semaphore.acquire()        检查是否有写者等待
↓
read_mutex.lock()             保护读者计数
↓
if (read_count == 0)          如果是第一个读者resource_sem.acquire()     获取资源锁
↓
read_count++                  增加读者计数
↓
read_mutex.unlock()           释放读者计数锁
↓
read_allow_semaphore.release()        允许其他读者尝试
↓
[读取共享资源]                临界区操作
↓
read_mutex.lock()             保护读者计数
↓
read_count--                  减少读者计数
↓
if (read_count == 0)          如果是最后一个读者resource_sem.release()     释放资源锁
↓
read_mutex.unlock()           释放读者计数锁
↓
结束
✍写者线程流程

写者线程需要做:

  1. 如果是第一个写者,获取 read_allow_semaphore(阻止新读者)
  2. 获取 resource_sem 进行独占写入
  3. 如果是最后一个写者,释放 read_allow_semaphore(允许新读者)
开始
↓
write_mutex.lock()            保护写者计数
↓
if (write_count == 0)         如果是第一个写者read_allow_semaphore.acquire()     阻塞新读者
↓
write_count++                 增加写者计数
↓
write_mutex.unlock()          释放写者计数锁
↓
resource_sem.acquire()        获取资源锁(独占)
↓
[写入共享资源]                临界区操作
↓
resource_sem.release()        释放资源锁
↓
write_mutex.lock()            保护写者计数
↓
write_count--                 减少写者计数
↓
if (write_count == 0)         如果是最后一个写者read_allow_semaphore.release()     允许新读者
↓
write_mutex.unlock()          释放写者计数锁
↓
结束
参考代码
/*
* 写者优先
* 当有写者等待时,新到达的读者必须等待,直到所有写者完成。
*/
#include <iostream>
using namespace std;#include <thread>
#include <semaphore>//需要开启 C++ 20 的支持
#include <mutex>#include <chrono>uint8_t resource[1024 * 1024] = { 0 };//共享的资源,这里以一块内存为例/*** @brief 保护共享资源的二值信号量* @note 多个读者可以同时读,读者a获得后可能由读者b释放,因此必须使用信号量*/
binary_semaphore resource_semaphore(1);int readers_count = 0;//正在读取的读者数量
mutex readers_count_mutex;//保护读者数量的互斥锁(使用原子操作 atomic 也可以)/*** @brief 正在写入以及等待写入的写者数量* @note 注意,是正在写和等着写的写者总数*/
int writers_count = 0;
mutex writing_mutex;//保护正在写的互斥锁(使用原子操作 atomic 也可以)binary_semaphore read_allow_semaphore(1);//是否允许读取的信号量/*** @brief 读者线程的函数* @param readerNumer 读者编号 调试用*/
void reader_function(int readerNumer)
{cout << "读者线程 " << readerNumer << "建立  线程id " << this_thread::get_id() << endl;while (true){/* 读者读取之前应当执行的内容 */read_allow_semaphore.acquire();//等待直到所有写者完毕readers_count_mutex.lock();//加锁,保护读者计数if (readers_count == 0)//如果是第一个读者{resource_semaphore.acquire();//获取资源信号量}readers_count++;//读者数量增加readers_count_mutex.unlock();//解锁read_allow_semaphore.release();/* 读者读取 */////当比较大时,如果放到栈(stack)区,会出现 栈溢出 的错误//uint8_t readerBuffer[1024 * 1024] = { 0 };uint8_t* readerBuffer = new uint8_t[1024 * 1024];//比较大需要放到堆(heap)中memcpy(readerBuffer, resource, sizeof(resource));//读取cout << "读者 " << readerNumer << " 读取到内容的前8个字节" << endl;for (int i = 0; i < 8; i++){printf("%#X ", readerBuffer[i]);}cout << endl;delete[] readerBuffer;this_thread::sleep_for(chrono::milliseconds(5));//延时,模拟对读取到的数据进行某些处理/* 读者读取完毕应当执行的内容 */readers_count_mutex.lock();//加锁,保护读者计数        readers_count--;//读者数量减少if (readers_count == 0)//如果是最后一个读者{resource_semaphore.release();//释放资源信号量}readers_count_mutex.unlock();//解锁this_thread::sleep_for(chrono::milliseconds(1));//延时}
}/*** @brief 写者线程* @param writerNumber 写者编号 调试用*/
void writer_function(int writerNumber)
{cout << "写者线程 " << writerNumber << "建立  线程id " << this_thread::get_id() << endl;static int count = 0;//调试用while (true){/* 写者写入之前应当执行的内容 */writing_mutex.lock();if (writers_count == 0)//第一个写入或等待的写者{read_allow_semaphore.acquire();//阻塞读者  }writers_count++;//正在写入或者等待的读者数量自增         writing_mutex.unlock();resource_semaphore.acquire();//获取资源信号量/* 写者写入 */count++;for (int i = 0; i < sizeof(resource); i++)//写入数据{resource[i] = (i + count) % 256;}cout << "写者 " << writerNumber << " 写入内容的前8个字节是" << endl;for (int i = 0; i < 8; i++){printf("%#X ", resource[i]);}cout << endl;this_thread::sleep_for(chrono::milliseconds(20));//延时,模拟大量数据写入的时间消耗/* 写者写入完毕应当执行的内容 */resource_semaphore.release();//释放资源信号量 writing_mutex.lock();writers_count--;if (writers_count == 0)//如果不再有正在写入或等待的写者{read_allow_semaphore.release();//允许读者 }               writing_mutex.unlock();this_thread::sleep_for(chrono::milliseconds(85));//延时/*** @note 当这个延时小于所有写者写入时间总和,就可以模拟“写者源源不断到来”* @note 对于写者优先,写者源源不断到来,则读者饥饿*///this_thread::sleep_for(chrono::milliseconds(75));//延时}
}int main()
{std::cout << "开始测试\n";std::thread readers[5], writers[4];for (int i = 0; i < 5; ++i) readers[i] = std::thread(reader_function, i);for (int i = 0; i < 4; ++i) writers[i] = std::thread(writer_function, i);for (auto& t : readers) t.join();for (auto& t : writers) t.join();return 0;
}
分析

以下是一种典型情况

写者1正在写入,然后读者1到达,再然后写者2道到达

写者1 正在写入, read_allow_semaphore 已经被写者获取,也就是不允许读取, resource_semaphore 被 写者 获取,也就是资源被 写者1 获取。

读者1 到达后, read_allow_semaphore.acquire(); 死等,直到允许读取。

写者2 到达后,writers_count自增变为2, resource_semaphore.acquire(); 等待资源,直到资源被释放。

假设某个时刻, 写者1 写入完毕

writers_count减少1,变为1,resource_semaphore 不会被写者释放,读者1 继续等待。

resource_semaphore 被 写者1 释放,写者2 获取后就会开始写入

虽然读者1先于写者2到达,但是写者2先开始操作。

读者-写者问题的三种策略对比

1. 读者优先(Readers-Preferred)

现象

  • 只要有一个读者正在读,后续读者可以直接进入,无需等待
  • 写者必须等待所有读者完成后才能写入。
  • 极端情况:如果读者持续到达,写者可能永远无法执行(写者饥饿)。

比喻
图书馆里,只要有人在看书(读者),新来的人可以直接进去看书;想修改图书的人(写者)必须等所有人离开才能动笔,但不断有新读者进来,导致写者一直等不到机会。

2. 读写公平(Fair / No Starvation)

现象

  • 读者和写者按到达顺序排队
  • 新到达的读者不会插队到等待中的写者前面。
  • 无饥饿:写者和读者均能公平执行。

比喻
图书馆门口有一个取号机,读者和写者按号码排队。即使当前有人在看书,新来的写者也会排在后续读者前面。

3. 写者优先(Writers-Preferred)

现象

  • 至少有一个写者在等待时,新到达的读者会被阻塞,直到所有写者完成。
  • 写者能更快获得资源,但读者可能被延迟。
  • 极端情况:如果写者持续到达,读者可能长时间等待(读者饥饿)。

比喻
图书馆规定,只要有人申请修改图书(写者),新来的读者必须等待,直到所有修改完成。但若写者不断到来,读者可能一直无法进入。

三者的核心区别总结

策略优先级饥饿风险典型应用场景
读者优先读者 > 写者写者可能饥饿读多写少,容忍写延迟
读写公平先到先服务无饥饿需要公平性(如数据库调度)
写者优先写者 > 读者读者可能饥饿写操作需及时响应(如日志系统)

四、参考资料

王道计算机考研 操作系统

2.3.5_2 读者-写者问题_哔哩哔哩_bilibili

C++11 多线程编程-小白零基础到手撕线程池

C++11 多线程编程-小白零基础到手撕线程池_哔哩哔哩_bilibili

http://www.xdnf.cn/news/18217.html

相关文章:

  • Linux多线程——线程池
  • Spark学习
  • MySQL基础操作
  • 网络连接的核心机制
  • HTML+CSS:浮动详解
  • Python 文件操作与异常处理全解析
  • Zemax光学设计输出3D
  • idea进阶技能掌握, 使用自带HTTP测试工具,完全可替代PostMan
  • OpenSSH 命令注入漏洞(CVE-2020-15778)修复,升级openssh9.8p1
  • rust语言 (1.88) egui (0.32.1) 学习笔记(逐行注释)(一)基本代码
  • Qt设置软件使用期限【新版防修改系统时间】
  • React响应式链路
  • 【蒸蒸日上】专栏前言
  • Google Chrome v139.0.7258.139 便携增强版
  • 云手机在社交媒体场景中的优势体现在哪些方面?
  • 趣打印高级版--手机打印软件!软件支持多种不同的连接方式,打印神器有这一个就够了!
  • AutoGLM2.0背后的云手机和虚拟机分析(非使用案例)
  • Claude Code NPM 包发布命令
  • 数据挖掘笔记:点到线段的距离计算
  • GitHub宕机生存指南:从应急协作到高可用架构设计
  • [TryHackMe]Mr Robot CTF(hydra爆破+Wordpress更改主题)
  • Leetcode 深度优先搜索 (9)
  • MPR多平面重建一:初步实现
  • linux报permission denied问题
  • 【C语言16天强化训练】从基础入门到进阶:Day 4
  • 创建Vue项目的不同方式及项目规范化配置
  • 大数据常见问题分析与解决方案
  • 《SQLAlchemy 2 In Practice》读后感
  • C++开发/Qt开发:单例模式介绍与应用
  • IDEA:控制台中文乱码