C语言进程间通信:基础篇
进程间通信(IPC)是指允许进程之间进行通信并同步其执行的机制和方法。默认情况下,进程拥有独立的内存空间且不共享数据。然而实际上,进程经常需要共享数据和资源。此外,它们还需要协调执行以实现预期行为,这被称为进程同步。
匿名管道是最简单的IPC机制之一。它允许相关进程之间的单向通信,通常是父子进程之间。数据只能单向流动(单向通信),只有具有共同祖先的进程才能使用这种机制。它的存在依赖于父进程,并且是基于内存的,因为它在内核内存中使用缓冲区。
具体来说,匿名管道通过pipe()
系统调用创建,它返回两个文件描述符:pipefd[0]
用于读取,pipefd[1]
用于写入。创建管道后,父进程可以使用pipefd[1]
向管道写入数据,子进程可以使用pipefd[0]
从管道读取数据。需要注意的是,这种管道有缓冲区大小限制(通常为几KB)。
管道设置和进程派生
我们让每个进程P_i
创建一个到P_{i+1}
的管道。P_0
调用create_children
。
void create_children(int n, int pipes[][2]) {for (int i = 0; i < n - 1; i++) {pipe(pipes[i]); // 创建管道pid_t pid = fork(); // 创建子进程if (pid == 0) { // 子进程close(pipes[i][1]); // 关闭管道的写入端if (i == n - 2) break; // 最后一个子进程退出循环} else { // 父进程close(pipes[i][0]); // 关闭管道的读取端break;}}
}
在上面的代码中,pipes[][2]是一个管道数组,其中每个管道是一个包含两个文件描述符的数组。第一个文件描述符用于读取,第二个用于写入。每次迭代中,都会创建一个新管道并派生一个子进程。然后子进程关闭写入端(因为它只需要读取),父进程关闭读取端(因为它只需要写入)。最后一个子进程会退出循环。
在命令行(shell)中,|操作符在命令之间创建匿名管道。当我们链接一系列管道时,我们将每个进程的输出作为下一个进程的输入。例如,ls -l | grep "txt"会列出当前目录中的所有文件和目录,然后过滤输出只显示以"txt"结尾的文件和目录。单个管道只支持单向通信。两个管道可以实现双向通信。管道没有名称(因此称为匿名管道)。管道是UNIX操作系统最基本的方面之一。它是构建基于进程的并行和分布式应用程序的极其强大的概念。
消息传递
现在让我们看看如何编码/解码消息,以及send_message和receive_message函数。
首先我们如下定义Message结构体:
struct {int dest_id; // 目标进程IDint src_id; // 源进程IDchar msg[50]; // 实际的消息内容(最多50个字符)
} packet;
其中进程根据dest_id转发消息。
消息发送
发送消息时,我们将执行以下操作:
- 创建一个包含目标ID、源ID和消息的数据包
- 安全地将消息复制到数据包中
- 根据进程ID确定要使用的正确管道
- 将整个数据包写入管道
void send_message(int process, char *msg, int msg_len, int src_id, int pipes[][2], int n) {// 初始化数据包,包含目标、源和消息struct { int dest_id; int src_id; char msg[50]; } packet = {process, src_id, {0}};// 将消息安全地复制到数据包中(使用strncpy)strncpy(packet.msg, msg, 50);if (src_id < process && src_id < n - 1)write(pipes[src_id][1], &packet, sizeof(packet));else if (src_id > process)write(pipes[src_id - 1][1], &packet, sizeof(packet));
}
消息接收
接收消息时,我们将执行以下操作:
- 尝试从该进程可能连接的所有管道读取
- 对于每个管道:
- 如果消息是给当前进程的(packet.dest_id == curr_id),则将其复制到输出缓冲区
- 如果消息是给其他进程的,则将其转发到正确的方向
- 使用进程ID确定消息转发的方向
void receive_message(char *buffer, int *nread, int buffer_max, int curr_id, int pipes[][2], int n) {struct { int dest_id; int src_id; char msg[50]; } packet;// 检查该进程可能读取的所有管道for (int i = 0; i < n - 1; i++) {// 检查该进程是否应该从pipe[i]读取if (curr_id == i + 1) {// 如果这条消息是给我的,将其复制到缓冲区*nread = read(pipes[i][0], &packet, sizeof(packet));if (*nread > 0) {if (packet.dest_id == curr_id) {strncpy(buffer, packet.msg, buffer_max);*nread = strlen(packet.msg);} // 否则,如果需要,将其转发到下一个进程else if (packet.dest_id > curr_id && curr_id < n - 1)write(pipes[curr_id][1], &packet, sizeof(packet));}}// 检查该进程是否应该从pipe[i-1]读取(用于向左传递的消息)if (curr_id == i && i < n - 1) {*nread = read(pipes[i][0], &packet, sizeof(packet));if (*nread > 0 && packet.dest_id < curr_id)write(pipes[i - 1][1], &packet, sizeof(packet));}}
}
消息路由
为了路由消息,我们将:
- 让每条消息包含用于路由的dest_id和src_id
- 使用进程根据目标ID转发消息
- 双向通信:
- 系统允许消息双向流动
- 每个进程既可以是发送者也可以是接收者
- 进程链:
- 进程按线性链排列
- 每个进程通过管道与其邻居连接
- 流量控制:
- 协议确保消息只向目标方向转发
- 使用进程ID确定方向,防止无限循环
示例流程
- 进程1想向进程3发送"Hello"
- 进程1调用send_message(3, “Hello”, 5, 1, pipes, n)
- 消息被写入进程1和进程2之间的管道
- 进程2收到消息,发现dest_id = 3(不是给自己的)
- 进程2将消息转发给进程3
- 进程3收到消息,发现dest_id = 3(是给自己的),然后处理它
资源清理
最后,我们必须free_resources,因为关闭管道对于防止资源泄漏和确保正确清理资源很重要。
void free_resources(int curr_id, int pipes[][2], int n) {// 遍历系统中的所有可能管道// n个进程有n-1个管道(因为它们按链式排列)for (int i = 0; i < n - 1; i++) {// 如果这个进程是pipe[i]的左端(进程i)if (curr_id == i) // 关闭pipe[i]的写入端(索引1是写入端)close(pipes[i][1]);// 如果这个进程是pipe[i]的右端(进程i+1)if (curr_id == i + 1) // 关闭pipe[i]的读取端(索引0是读取端)close(pipes[i][0]);}
}
其中:
curr_id
:当前进程的ID(0到n-1)pipes[][2]
:二维数组,每行代表一个管道,包含[read_fd, write_fd]
n
:系统中的进程总数- 为了防止循环,我们需要确保进程只在
dest_id != curr_id
时转发消息。为了确保收敛,我们使用线性链确保消息通过定向转发准确地到达dest_id
一次。