【Linux系统】进程间通信(管道)
1. 进程间通信介绍
进程间通信的本质:让各个进程看到同一份资源
1-1进程间通信目的
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程需要共享同样的一组资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
1-2进程间通信分类
管道
- 匿名管道pipe
- 命名管道
System VIPC
- System V消息队列
- System V共享内存
- System V信号量
POSIX IPC
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
2. 管道
- 管道是Unix中最古老的进程间通信的形式。
- 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
- 管道是Linux系统中一种简单的进程间通信(IPC)机制,它可以看做是一个内核中的缓冲区,数据从管道的一端流入,从另一端流出
- 管道提供了一种单向的数据传输通道,通常用于具有父子关系或兄弟关系进程间通信
分类:
- 匿名管道
- 命名管道
3. 匿名管道
是在内核中开辟的一块缓冲区,用于在两个具有亲属关系之间的单向传输
#include <unistd.h>
// 功能:创建一无名管道
// 原型
int pipe(int fd[2]);
// 参数
// fd: 文件描述符数组,其中fd[0]表示读端,fd[1]表示写端
// 返回值:成功返回0,并将两个有效的文件描述符存储在fd数组中
// 失败返回错误代码
3-1实例代码
// 例子: 从键盘读取数据,写入管道,读取管道,写到屏幕
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>int main(void)
{int fds[2];char buf[100];int len;if (pipe(fds) == -1){perror("make pipe");exit(1);}// read from stdinwhile (fgets(buf, 100, stdin)){len = strlen(buf);// write into pipeif (write(fds[1], buf, len) != len){perror("write to pipe");break;}memset(buf, 0x00, sizeof(buf));// read from pipeif ((len = read(fds[0], buf, 100)) == -1){perror("read from pipe");break;}// write to stdoutif (write(1, buf, len) != len){perror("write to stdout");break;}}return 0;
}
3-2用fork来共享管道原理
3-3站在文件描述符角度-深度理解管道
3-4站在内核角度-管道本质
- 所以,看待管道,就如同看待文件一样!管道的使用和文件一致,迎合了“Linux一切皆文件思想”。
3-5 测试管道读写
#include <iostream>
#include <string>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>// Fater process -> read
// Child process -> write
int main()
{// 1. 创建管道int fds[2] = {0};int n = pipe(fds); // fds:输出型参数if (n != 0){std::cerr << "pipe error" << std::endl;return 1;}// Fater process -> read// Child process -> write// 2. 创建子进程pid_t id = fork();if (id < 0){std::cerr << "fork error" << std::endl;return 2;}else if (id == 0){// 子进程// 3. 关闭不需要的fd,关闭readclose(fds[0]);int cnt = 0;int total = 0;while (true){std::string message = "h";// message += std::to_string(getpid());// message += ", ";// message += std::to_string(cnt);// fds[1]total += ::write(fds[1], message.c_str(), message.size());cnt++;std::cout << "total: " << total << std::endl;sleep(2);// break;}exit(0);}else{// 父进程// 3. 关闭不需要的fd,关闭writeclose(fds[1]);char buffer[1024];while (true){sleep(1);ssize_t n = ::read(fds[0], buffer, 1024);if (n > 0){buffer[n] = 0;std::cout << "child->father, message: " << buffer << std::endl;}else if(n == 0) {// 如果写端关闭// 读端读完管道内部的数据,在读取的时候,// 就会读取到返回值0,表示对端关闭,也表示读到文件结尾std::cout << "n:" << n << std::endl;std::cout << "child quit ??? me too" << std::endl;break;}close(fds[0]);break;std::cout << std::endl;}int status = 0;pid_t rid = waitpid(id, &status, 0);std::cout << "father wait child success: " << rid << " exit code: " <<((status<<8)&0xFF) << ", exit sig: " << (status & 0x7F) << std::endl;}return 0;
}
进程池
进程池是一种软件设计模式,主要用于管理多个进程。
在Linux系统下,进程池预先创建一组进程,这些进程在初始化后处于阻塞等待状态。当有任务到来时,进程池中的一个空闲进程会被分配处理该任务,任务处理完成后,进程又回到等待状态,这样就可以避免频繁创建和销毁进程所带来的系统开销
// Channel.hpp
#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__#include <iostream>
#include <string>
#include <unistd.h>// 先描述
class Channel
{
public:Channel(int wfd, pid_t who) : _wfd(wfd), _who(who){// Channel-3-1234_name = "Channel-" + std::to_string(wfd) + "-" + std::to_string(who);}std::string Name(){return _name;}void Send(int cmd){::write(_wfd, &cmd, sizeof(cmd));}void Close(){::close(_wfd); // 明确表示调用全局作用域下的close函数}pid_t Id(){return _who;}int wFd(){return _wfd;}~Channel(){}private:int _wfd; // 管道文件描述符std::string _name;pid_t _who; // 进程id
};#endif
// ProcessPool.hpp#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <functional>
#include "Task.hpp"
#include "Channel.hpp"// typedef std::function<void()> work_t;
using work_t = std::function<void()>;enum
{OK = 0,UsageError,PipeError,ForkError
};class ProcessPool
{
public:ProcessPool(int n, work_t w): processnum(n), work(w){}// channels : 输出型参数// work_t work: 回调int InitProcessPool(){// 2. 创建指定个数个进程for (int i = 0; i < processnum; i++){// 1. 先有管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return PipeError;// 2. 创建进程pid_t id = fork();if (id < 0)return ForkError;// 3. 建立通信信道if (id == 0){// 关闭历史wfdstd::cout << getpid() << ", child close history fd: ";for (auto &c : channels){std::cout << c.wFd() << " ";c.Close();}std::cout << " over" << std::endl;::close(pipefd[1]); // read// childstd::cout << "debug: " << pipefd[0] << std::endl;dup2(pipefd[0], 0);work();::exit(0);}// 父进程执行::close(pipefd[0]); // writechannels.emplace_back(pipefd[1], id);// Channel ch(pipefd[1], id);// channels.push_back(ch);}return OK;}void DispatchTask(){int who = 0;// 2. 派发任务int num = 20;while (num--){// a. 选择一个任务, 整数int task = tm.SelectTask();// b. 选择一个子进程channelChannel &curr = channels[who++];who %= channels.size();std::cout << "######################" << std::endl;std::cout << "send " << task << " to " << curr.Name() << ", 任务还剩: " << num << std::endl;std::cout << "######################" << std::endl;// c. 派发任务curr.Send(task);sleep(1);}}void CleanProcessPool(){// version 3for (auto &c : channels){c.Close();pid_t rid = ::waitpid(c.Id(), nullptr, 0);if (rid > 0){std::cout << "child " << rid << " wait ... success" << std::endl;}}// version 2// for (auto &c : channels)// for(int i = channels.size()-1; i >= 0; i--)// {// channels[i].Close();// pid_t rid = ::waitpid(channels[i].Id(), nullptr, 0); // 阻塞了!// if (rid > 0)// {// std::cout << "child " << rid << " wait ... success" << std::endl;// }// }// version 1// for (auto &c : channels)// {// c.Close();// }//?// for (auto &c : channels)// {// pid_t rid = ::waitpid(c.Id(), nullptr, 0);// if (rid > 0)// {// std::cout << "child " << rid << " wait ... success" << std::endl;// }// }}void DebugPrint(){for (auto &c : channels){std::cout << c.Name() << std::endl;}}private:std::vector<Channel> channels;int processnum;work_t work;
};
// Task.hpp#pragma once#include <iostream>
#include <unordered_map>
#include <functional>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include <vector>using task_t = std::function<void()>;class TaskManger
{
public:TaskManger(){srand(time(nullptr));tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n"<< std::endl; });tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行url解析\n"<< std::endl; });tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行加密任务\n"<< std::endl; });tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n"<< std::endl; });}int SelectTask(){return rand() % tasks.size();}void Excute(unsigned long number){if (number > tasks.size() || number < 0)return;tasks[number]();}~TaskManger(){}private:std::vector<task_t> tasks;
};TaskManger tm;void Worker()
{while (true){int cmd = 0;int n = ::read(0, &cmd, sizeof(cmd));if (n == sizeof(cmd)){tm.Excute(cmd);}else if (n == 0){std::cout << "pid: " << getpid() << " quit..." << std::endl;break;}else{}}
}
3-6 管道读写规则
- 当没有数据可读时
- O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
- O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
- 当管道满的时候
- O_NONBLOCK disable:write调用阻塞,直到有进程读走数据
- O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
- 如果所有管道写端对应的文件描述符被关闭,则read返回0
- 如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出
- 当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。
- 当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。
3-7 管道特点
- 只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
- 管道提供流式服务
- 一般而言,进程退出,管道释放,所以管道的生命周期随进程
- 一般而言,内核会对管道操作进行同步与互斥
- 管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
4. 命名管道
- 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
- 如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
- 命名管道是一种特殊类型的文件,也被称为FIFO文件。
- 与匿名管道不同,命名管道又文件名,可以在文件系统中一文件形式存在。这是得其可以被多个没有血缘关系的进程所使用,只要这些进程有权限访问该命名管道
4-1 创建一个命名管道
- 命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
$ mkfifo filename
- 命名管道也可以从程序里创建,相关函数有:
int mkfifo(const char *filename,mode_t mode);
int main(int argc, char *argv[])
{mkfifo("p2", 0644);return 0;
}
4-2 匿名管道与命名管道的区别
- 匿名管道由
pipe
函数创建并打开。 - 命名管道由
mkfifo
函数创建,打开用open
。 - FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,但这些工作完成之后,它们具有相同的语义。
4-3 命名管道的打开规则
- 如果当前打开操作是为读而打开FIFO时
- O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
- O_NONBLOCK enable:立刻返回成功
- 如果当前打开操作是为写而打开FIFO时
- O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
- O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
实例1. 用命名管道实现文件拷贝
读取文件,写入命名管道:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \perror(m); \exit(EXIT_FAILURE); \
} while(0)int main(int argc, char *argv[])
{mkfifo("tp", 0644);int infd;infd = open("abc", O_RDONLY);if (infd == -1) ERR_EXIT("open");int outfd;outfd = open("tp", O_WRONLY);if (outfd == -1) ERR_EXIT("open");char buf[1024];int n;while ((n=read(infd, buf, 1024))>0){write(outfd, buf, n);}close(infd);close(outfd);return 0;
}
读取管道,写入目标文件:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \perror(m); \exit(EXIT_FAILURE); \
} while(0)int main(int argc, char *argv[])
{int outfd;outfd = open("abc.bak", O_WRONLY | O_CREAT | O_TRUNC, 0644);if (outfd == -1) ERR_EXIT("open");int infd;infd = open("tp", O_RDONLY);if (outfd == -1)ERR_EXIT("open");char buf[1024];int n;while ((n=read(infd, buf, 1024))>0){write(outfd, buf, n);}close(infd);close(outfd);unlink("tp");return 0;
}
实例2. 用命名管道实现server&client通信
# ll
total 12
-rw-r--r--. 1 root root 46 Sep 18 22:37 clientPipe.c
-rw-r--r--. 1 root root 164 Sep 18 22:37 Makefile
-rw-r--r--. 1 root root 46 Sep 18 22:38 serverPipe.c
# cat Makefile
.PHONY:all
all:clientPipe serverPipe
clientPipe:clientPipe.cgcc -o $@ $^
serverPipe:serverPipe.cgcc -o $@ $^
.PHONY:clean
clean:rm -f clientPipe serverPipe
serverPipe.c
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>#define ERR_EXIT(m) \
do{\perror(m);\exit(EXIT_FAILURE);\
}while(0)int main()
{unlink("mypipe");if(mkfifo("mypipe", 0644) < 0){ERR_EXIT("mkfifo");}int rfd = open("mypipe", O_RDONLY);if(rfd < 0)ERR_EXIT("open");char buf[1024];while(1){buf[0] = 0;printf("please wait...\n");ssize_t s = read(rfd, buf, sizeof(buf) -1);if(s > 0){buf[s-1] = 0;printf("client say: %s\n", buf);}else if(s == 0){printf("client quit, exit now!\n");exit(EXIT_SUCCESS);}else{ERR_EXIT("read");}}close(rfd);return 0;
}
clientPipe.c
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>#define ERR_EXIT(m) \
do{\perror(m);\exit(EXIT_FAILURE);\
}while(0)int main()
{int wfd = open("mypipe", O_WRONLY);if(wfd < 0)ERR_EXIT("open");char buf[1024];while(1){buf[0] = 0;printf("please enter: ");fflush(stdout);ssize_t s = read(0, buf, sizeof(buf) -1);if(s > 0){write(wfd, buf, strlen(buf));}else{ERR_EXIT("read");}}close(wfd);return 0;
}
4-4 命名管道的特点
- 半双工通信
数据可以在一个方向上流动,妖魔是写入进程到读取进程,要么是地区进程到写入进程,但不可同时双向通道 - 阻塞特性
- 读取操作:当命名管道的读取端以阻塞模式进行读取操作时,如果管道中没有数据,读取操作会阻塞,直到有数据被写入管道或者管道被关闭。
- 当命名管道的写入端进行写入操作时,如果管道的缓冲区已满,写入操作会阻塞,直到管道缓冲区有足够的空间可以继续写入数据或者管道被关闭。
- 可以通过设置非阻塞标志(如在 Linux 中使用 O_NONBLOCK 标志)来改变命名管道的阻塞特性,使打开、读写操作在无法立即完成时不会阻塞,而是立即返回一个错误码。