GDPU操作系统实验:生产者消费者问题
文章目录
- 一、实验目的
- 二、实验内容
- 1. 生产者消费者问题(信号量实现)
- 1.1 数据结构说明
- 1.2 实验流程
- 1.3 代码实现
- 1.4 运行效果
- 2. 进阶实现:选择性消费
- 2.1 实验内容
- 2.2 数据结构说明
- 2.3 实验流程
- 2.4 代码实现
- 2.5 运行效果
- 3. 睡觉的理发师问题
- 3.1 数据结构说明
- 3.2 实验流程
- 3.3 Java实现
- 信号量类 (SemaPhores.java)
- 理发师进程 (Barber.java)
- 顾客进程 (Consumer.java)
- 主函数 (TestMain.java)
- 3.4 运行效果
一、实验目的
- 理解和掌握共享互斥资源管理方式
- 理解和掌握进程同步工作原理
- 掌握经典进程同步问题——生产者消费者工作方式
- 掌握经典进程同步问题——理发师睡觉问题
二、实验内容
请用C/Java/python语言编程实现
1. 生产者消费者问题(信号量实现)
参考教材中的生产者消费者算法,创建5个进程,其中三个进程为生产者进程,两个进程为消费者进程。三个生产者不停的产生小写字符放入缓冲区(缓冲区大小自定义)。两个消费者不断地从缓冲中读取一个字符并输出。为了使得程序的输出易于看到结果,仿照的实例程序,分别在生产者和消费者进程的合适的位置加入一些随机睡眠时间。(每次执行的结果都不同,仅作参考)
1.1 数据结构说明
- 缓冲区:字符数组
buffer[BUFFER_SIZE]
,用于存储生产者生成的产品 - 信号量:
empty
:记录空闲缓冲区数量,初始值为BUFFER_SIZE
full
:记录已占用缓冲区数量,初始值为0mutex
:互斥锁,用于保护对缓冲区的访问,初始值为1
- 指针:
in
:生产者下一次存放产品的位置out
:消费者下一次取出产品的位置
1.2 实验流程
- 初始化信号量和缓冲区
- 创建3个生产者线程和2个消费者线程
- 生产者线程循环执行:
- 生成随机字母
- 等待空闲缓冲区(
empty
) - 获取互斥锁(
mutex
) - 将产品放入缓冲区
- 释放互斥锁(
mutex
) - 增加已占用缓冲区计数(
full
) - 睡眠一段时间
- 消费者线程循环执行:
- 等待有产品的缓冲区(
full
) - 获取互斥锁(
mutex
) - 从缓冲区取出产品
- 释放互斥锁(
mutex
) - 增加空闲缓冲区计数(
empty
) - 睡眠一段时间
- 等待有产品的缓冲区(
- 等待所有线程结束
- 销毁信号量
1.3 代码实现
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>#define BUFFER_SIZE 10// 定义缓冲区
char buffer[BUFFER_SIZE];// 定义信号量
sem_t empty, full, mutex;int in = 0, out = 0;void* producer(void* arg) {int id = *((int*) arg);while (1) {// 随机生成字母char item;if (rand() % 2 == 0)item = 'a' + rand() % 26;elseitem = 'A' + rand() % 26;sem_wait(&empty);sem_wait(&mutex);buffer[in] = item;in = (in + 1) % BUFFER_SIZE;sem_post(&mutex);sem_post(&full);printf("Producer %d produced %c\n", id, item);sleep(1);}
}void* consumer(void* arg) {int id = *((int*)arg);while (1) {sem_wait(&full);sem_wait(&mutex);char item = buffer[out];out = (out + 1) % BUFFER_SIZE;sem_post(&mutex);sem_post(&empty);printf("Consumer %d consumed %c\n", id, item);sleep(1);}
}int main() {pthread_t prod_threads[3], cons_threads[2];int prod_ids[3], cons_ids[2];// 初始化信号量sem_init(&empty, 0, BUFFER_SIZE);sem_init(&full, 0, 0);sem_init(&mutex, 0, 1);srand(time(NULL));// 创建生产者线程for (int i = 0; i < 3; i++) {prod_ids[i] = i+1;pthread_create(&prod_threads[i], NULL, producer, &prod_ids[i]);}// 创建消费者线程for (int i = 0; i < 2; i++) {cons_ids[i] = i+1;pthread_create(&cons_threads[i], NULL, consumer, &cons_ids[i]);}// 等待线程结束for (int i = 0; i < 3; i++) {pthread_join(prod_threads[i], NULL);}for (int i = 0; i < 2; i++) {pthread_join(cons_threads[i], NULL);}// 销毁信号量sem_destroy(&empty);sem_destroy(&full);sem_destroy(&mutex);return 0;
}
1.4 运行效果
2. 进阶实现:选择性消费
实验进阶:在上面实验的基础上实现生产者和消费者有选择地生产或消费某些产品。例如一个消费者只消费小写字符,一个消费者只消费大写字母,而另一个消费者则无选择地消费任何产品。消费者要消费的产品没有时,消费者进程被阻塞。注意缓冲的管理。
2.1 实验内容
生产者随机生产大小写字母,消费者分为三类:
- 消费者 1:仅消费小写字母。
- 消费者 2:仅消费大写字母。
- 消费者 3:可消费任意字母。
通过环形缓冲区存储产品,使用信号量实现线程同步和互斥,确保多个生产者和消费者协同工作且不会发生数据竞争或死锁。
2.2 数据结构说明
-
环形缓冲区:
-
使用数组 buffer[BUFFER_SIZE] 模拟环形缓冲区,存储生产者生成的产品。
-
字段:
-
item:字母内容,可以为 ‘a’ 到 ‘z’ 或 ‘A’ 到 ‘Z’。
-
type:产品类型,1 表示小写字母,2 表示大写字母。
-
-
指针:
- in:生产者存放产品的位置。
- out:消费者取出产品的位置。
-
信号量:
- empty:记录缓冲区中空闲位置数量,初始值为 BUFFER_SIZE。
- full:记录缓冲区中已存储产品的数量,初始值为 0。
- mutex:互斥锁,保护缓冲区的并发访问。
- lowercase 和 uppercase:分别记录小写字母和大写字母产品的数量,初始值均为 0。
2.3 实验流程
生产者流程:
-
生产者生成一个随机字母:
- 随机决定生成小写字母 (‘a’ 到 ‘z’) 或大写字母 (‘A’ 到 ‘Z’)。
- 设置产品类型,1 表示小写字母,2 表示大写字母。
-
检查缓冲区是否有空闲位置:
- 等待信号量 empty(空闲位置数量 > 0)。
- 进入临界区(锁定 mutex),将产品存入缓冲区,更新生产指针 in。
-
根据产品类型更新分类信号量:
- 小写字母增加 lowercase 信号量。
- 大写字母增加 uppercase 信号量。
-
解锁互斥锁 mutex,增加 full 信号量(已占用位置数量 +1)。
消费者流程:
-
消费者检查缓冲区:
- 消费者 1 等待 lowercase 信号量(有小写字母)。
- 消费者 2 等待 uppercase 信号量(有大写字母)。
- 消费者 3 等待 full 信号量(任意字母)。
-
确认产品类型:
- 分类消费者需确认产品类型是否匹配,否则重新等待。
- 通用消费者直接取走产品。
-
消费产品:
- 进入临界区,取出产品,更新消费指针 out。
- 解锁互斥锁 mutex,释放 empty 信号量(空闲位置数量 +1)。
-
输出消费日志,显示消费者 ID 和消费的产品内容。
main函数流程:
-
初始化信号量:
- empty:空闲位置数量,初始值为缓冲区大小 BUFFER_SIZE。
- full:已占用位置数量,初始值为 0。
- mutex:互斥锁,用于控制缓冲区的并发访问。
- lowercase 和 uppercase:记录小写和大写字母产品的数量,初始值均为 0。
-
创建生产者线程和消费者线程:
- 启动 3 个生产者线程,每个线程随机生成字母。
- 启动 3 个消费者线程,分别处理小写字母、大写字母和任意字母。
-
等待线程完成:
- 主线程通过 pthread_join 等待所有线程结束。
2.4 代码实现
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
#include <time.h>#define BUFFER_SIZE 10typedef struct {char item;int type; // 1:小写字母, 2:大写字母
} Product;Product buffer[BUFFER_SIZE];sem_t empty, full, mutex, lowercase, uppercase;int in = 0, out = 0;void* producer(void* arg) {int id = *((int*)arg);while (1) {// 随机生成字母和类型char item;int type = rand() % 2 + 1;if (type == 1) {item = 'a' + rand() % 26;} else {item = 'A' + rand() % 26;}sem_wait(&empty);sem_wait(&mutex);// 生产产品buffer[in].item = item;buffer[in].type = type;in = (in + 1) % BUFFER_SIZE;// 根据产品类型更新信号量if (type == 1) {sem_post(&lowercase);} else {sem_post(&uppercase);}sem_post(&full);sem_post(&mutex);printf("Producer %d produced %c\n", id, item);sleep(1);}
}void* consumer(void* arg) {int id = *((int*)arg);while (1) {if (id == 3) {// 消费者3:消费任意字母sem_wait(&full);sem_wait(&mutex);Product item = buffer[out];out = (out + 1) % BUFFER_SIZE;sem_post(&mutex);sem_post(&empty);printf("Consumer %d consumed(random) %c\n", id, item.item);} else {// 消费者1和2:消费特定类型字母if (id == 1) {sem_wait(&lowercase);} else {sem_wait(&uppercase);}sem_wait(&mutex);// 确认产品类型是否匹配while (buffer[out].type != id) {sem_post(&mutex);if (id == 1) {sem_wait(&lowercase);} else {sem_wait(&uppercase);}sem_wait(&mutex);}Product item = buffer[out];out = (out + 1) % BUFFER_SIZE;sem_post(&mutex);sem_post(&empty);if (id == 1) {printf("Consumer %d consumed(lowercase) %c\n", id, item.item);} else {printf("Consumer %d consumed(UPPERCASE) %c\n", id, item.item);}}sleep(1);}
}int main() {pthread_t prod_threads[3], cons_threads[3];int prod_ids[3], cons_ids[3];// 初始化信号量sem_init(&empty, 0, BUFFER_SIZE);sem_init(&full, 0, 0);sem_init(&mutex, 0, 1);sem_init(&lowercase, 0, 0);sem_init(&uppercase, 0, 0);srand(time(NULL));// 创建生产者线程for (int i = 0; i < 3; i++) {prod_ids[i] = i + 1;pthread_create(&prod_threads[i], NULL, producer, &prod_ids[i]);}// 创建消费者线程for (int i = 0; i < 3; i++) {cons_ids[i] = i + 1;pthread_create(&cons_threads[i], NULL, consumer, &cons_ids[i]);}// 等待线程结束for (int i = 0; i < 3; i++) {pthread_join(prod_threads[i], NULL);pthread_join(cons_threads[i], NULL);}// 销毁信号量sem_destroy(&empty);sem_destroy(&full);sem_destroy(&mutex);sem_destroy(&lowercase);sem_destroy(&uppercase);return 0;
}
2.5 运行效果
3. 睡觉的理发师问题
理发师问题的描述:一个理发店接待室有10张椅子,工作室有1张椅子;没有顾客时,理发师睡觉;第一个顾客来到时,必须将理发师唤醒;顾客来时如果还有空座的话,他就坐在一个座位上等待;如果顾客来时没有空座位了,他就离开,不理发了;当理发师处理完所有顾客,而又没有新顾客来时,他又开始睡觉。(测试构造一个理发师进程,100个顾客进程,在代码中加入随机睡眠时间,测试不同时间的效果)
理发师问题的伪代码如下:
int waiting = 0;//等候理发顾客坐的椅子数
int CHAIRS = N;//为顾客准备的椅子数(实际代码中换成一个常数)
semaphore customers = 0;//顾客数量
semaphore barbers = 0;//理发师初始状态为睡着状态,故初始化为0
semaphore mutex = 1;
//理发师进程
process barber(){while(true){P(customers);//有顾客吗?若无顾客,理发师睡眠P(mutex);//若有顾客时,进入临界区waiting--;//等候顾客数少一个V(barbers);//理发师准备为顾客理发V(mutex);//退出临界区cut_hair();//理发师正在理发(非临界区)}
}
//顾客进程
process customer i(){P(mutex);//进入临界区if(waiting < CHAIRS){//有空椅子吗waiting++;//等候顾客数加1V(customers);//唤醒理发师V(mutex);//退出临界区P(barbers);//理发师忙,顾客坐下等待get_haircut();//否则顾客坐下理发}else{V(mutex); // 人满了,走吧!}
}
3.1 数据结构说明
- 共享变量:
waiting
:当前等待理发的顾客数量CHAIRS
:等待区椅子的最大数量
- 信号量:
customers
:表示等待理发的顾客数量,初始值为0barbers
:表示理发师状态,初始值为0(睡觉状态)mutex
:互斥锁,用于保护对共享变量的访问,初始值为1
3.2 实验流程
- 初始化信号量和共享变量
- 创建理发师线程并启动
- 循环创建100个顾客线程:
- 随机间隔时间创建顾客线程
- 顾客线程尝试获取互斥锁(
mutex
) - 检查等待区是否有空位:
- 有空位:顾客加入等待区,增加等待计数,通知理发师,释放互斥锁,等待理发师服务
- 无空位:顾客离开,释放互斥锁
- 理发师线程循环执行:
- 等待顾客信号(
customers
) - 获取互斥锁(
mutex
) - 减少等待顾客计数
- 通知顾客可以理发(
barbers
) - 释放互斥锁(
mutex
) - 执行理发操作
- 等待顾客信号(
- 顾客接受理发服务后离开
3.3 Java实现
信号量类 (SemaPhores.java)
package com.gulu.OSexperiment;
import java.util.concurrent.Semaphore;public class SemaPhores {static int waiting = 0; // 等候理发顾客坐的椅子数 static final int CHAIRS = 10; // 为顾客准备的椅子数Semaphore customers = new Semaphore(0); // 顾客数量Semaphore barbers = new Semaphore(0); // 理发师初始状态为睡着状态Semaphore mutex = new Semaphore(1); // 互斥信号量
}
理发师进程 (Barber.java)
package com.gulu.OSexperiment;public class Barber extends Thread {private SemaPhores paras;public void setParas(SemaPhores paras) {this.paras = paras;}public void run() {try {while(true) {paras.customers.acquire(); // 等待顾客paras.mutex.acquire(); // 进入临界区SemaPhores.waiting--; // 等候顾客数减1System.out.println("理发师准备为顾客理发。等待顾客数:" + SemaPhores.waiting);paras.barbers.release(); // 通知顾客可以理发paras.mutex.release(); // 退出临界区cut_hair(); // 理发}} catch(InterruptedException e) {e.printStackTrace();}}private void cut_hair() {System.out.println("理发师正在理发……");try {Thread.sleep(1000); // 理发时间} catch (InterruptedException e) {e.printStackTrace();}}
}
顾客进程 (Consumer.java)
package com.gulu.OSexperiment;public class Consumer extends Thread {private SemaPhores paras;public void setParas(SemaPhores paras) {this.paras = paras;}public void run() {try {paras.mutex.acquire(); // 进入临界区if (SemaPhores.waiting < SemaPhores.CHAIRS) {SemaPhores.waiting++; // 等候顾客数加1System.out.println("顾客" + Thread.currentThread().getName() + "进入等待区。当前等待人数:" + SemaPhores.waiting);paras.customers.release(); // 通知理发师有顾客paras.mutex.release(); // 退出临界区paras.barbers.acquire(); // 等待理发师get_haircut(); // 理发} else {System.out.println("顾客" + Thread.currentThread().getName() + "离开了,等待区已满。");paras.mutex.release(); // 退出临界区}} catch(InterruptedException e) {e.printStackTrace();}}private void get_haircut() {System.out.println("顾客" + Thread.currentThread().getName() + "正在理发……");try {Thread.sleep(300); // 理发时间} catch (InterruptedException e) {e.printStackTrace();}}
}
主函数 (TestMain.java)
package com.gulu.OSexperiment;
import java.util.Random;public class TestMain {public static void main(String args[]) {SemaPhores semaphores = new SemaPhores();Barber barber = new Barber();barber.setParas(semaphores);barber.start(); // 启动理发师进程// 启动100个顾客进程for (int i = 1; i <= 100; i++) {Consumer customer = new Consumer();customer.setParas(semaphores);customer.setName(String.valueOf(i));customer.start();// 随机间隔生成顾客try {Thread.sleep(new Random().nextInt(300));} catch (InterruptedException e) {e.printStackTrace();}}}
}