进程间通信:管道与共享内存
目录
一:进程间通信介绍
1.1进程间通信目的
1.2进程间通信发展
1.3进程间通信分类
二:管道
三:匿名管道
3.1代码演示
3.2用fork来共享管道原理
3.3站在文件描述符角度-深度理解管道
3.4站在内核角度-管道本质
3.5管道读写规则
3.6管道特点
3.7验证管道通信的4种情况
3.8管道应用---进程池
一:进程间通信介绍
1.1进程间通信目的
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:⼀个进程需要向另⼀个或⼀组进程发送消息,通知它(它们)发生了某种事件(如进 程终止时要通知父进程)。
进程控制:有些进程希望完全控制另⼀个进程的执行(如Debug进程),此时控制进程希望能够 拦截另⼀个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程通信的本质:由OS提供一份公共资源
1.2进程间通信发展
本地通信:同一台主机,同一个OS,不同的进程之间的通信
管道
System V进程间通信
POSIX进程间通信
1.3进程间通信分类
管道 :匿名管道pipe , 命名管道
System V IPC:System V 消息队列, System V 共享内存, System V 信号量
POSIX IPC :消息队列 , 共享内存,信号量 ,互斥量 ,条件变量 ,读写锁
二:管道
什么是管道 ?
管道是Unix中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
三:匿名管道
参数 pipefd:文件描述符数组,其中pipefd[0]表示读端,pipefd[1]表是写端
返回值:成功返回0,失败返回错误代码
进程自己的:new malloc的空间,在堆区上, 是用户级别的,只有自己可以看到
大多数数据结构和共享空间,都有一个引用计数器ref,只有ref为0时,空间才会释放
struct_file中的ref记录文件的读取位置,每个进程读同一个文件的读取位置是不同的独立的
如果父子进程同时向一个文件写入,最后结果会出现打印错乱的现象
3.1代码演示
从键盘读取数据,写⼊管道,读取管道,写到屏幕
#include<iostream>
#include<cstdio>
#include<cstdlib>
#include<string.h>
#include<unistd.h>int main()
{int fds[2];char buf[100];int len = 0;if(pipe(fds) == -1){perror("make pipe");return 1;}//read from stdinwhile(fgets(buf,100,stdin)){//write into pipelen = strlen(buf);if(write(fds[1],buf,len) != len){perror("write to pipe");break;}memset(buf,0x00,sizeof(buf));//read from pipeif((len == read(fds[0],buf,sizeof(buf))) == -1){perror("read to pipe");break;}//write into stdoutif(write(1,buf,len)!= len){perror("write to stdout");break;} }return 0;}
3.2用fork来共享管道原理
3.3站在文件描述符角度-深度理解管道
管道只能单向通信
如果使用完之后不关闭fd,可能会导致内存泄漏或者误操作,使用完之后,最好关闭
先创建管道,再创建子进程,利用子进程可以看到父进程这一特性,只有父进程先创建文件,之后子进程才能看到管道资源。如果先创建子进程,再创建管道,父子进程无法看到同一个资源
父进程打开文件时,要以读写的方式打开文件,这样子进程继承之后,对文件可以R/W,如果父进程只是读(写),那子进程也只能读(写)
管道不需要路径不要名字 ---> 匿名管道
#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0)// 子进程写,父进程读
int main(int argc, char *argv[])
{int pipfd[2];if (pipe(pipfd) == -1){ERR_EXIT("pipe error");}pid_t id = fork();if (id < 0)ERR_EXIT("fork error");else if (id == 0){close(pipfd[0]);write(pipfd[1], "hello", 6);close(pipfd[1]);ERR_EXIT(EXIT_SUCCESS);}else{char buff[1024];close(pipfd[1]);read(pipfd[0], buff, sizeof(buff));printf("buff=%s\n", buff);//close(pipfd[0]);}return 0;
}
3.4站在内核角度-管道本质
所以,看待管道,就如同看待文件一样!管道的使用和文件⼀致,迎合了“Linux⼀切皆文件的思想”
重定向到log.txt的文件里面
同时可以:
#将标准输出和标准错误的信息都放在ok.txt中
./mypipe 1>ok.txt 2>&1
#进行输出是分离信息,将正常打印的放在ok.txt中,将错误信息放在err.txt中
./mypipe 1>ok.txt 2>err.txt
3.5管道读写规则
当没有数据可读时:
O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直到有数据来为止
O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
当管道满的时候:
O_NONBLOCK disable:write调用阻塞,直到有进程读走数据
O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
如果所有管道写端对应的文件描述符被关闭,则read返回0
如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出
当要写入的数据量不大于PIPE_BUF(ubuntu下是64KB)时,linux将保证写入的原子性。
当要写入的数据量大于PIPE_BUF时,linux将不再保证写⼊的原子性
3.6管道特点
只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;
通常,一个管道由一个进程创建,然后该进程调用fork,此后父子进程之间就可应用该管道。
管道提供流式服务(面向字节流):读端不管写端写了多少内容,读端只按照自己读的方式读这个管道,写端也不管读端是怎么读的,只按照自己的格式向管道写内容,对于读写两端管道是流式的
⼀般而言,进程退出,管道释放,所以管道的生命周期随进程
⼀般而言,内核会对管道操作进行同步与互斥
管道是半双工的,数据只能向⼀个方向流动(单向数据通信);需要双方通信时,需要建立起两个管道
3.7验证管道通信的4种情况
// 验证管道的四种读写情况
//子进程写,父进程读
int main()
{// 1.创建管道int fds[2] = {0};int n = pipe(fds); // fds:输出型参数if (n != 0){perror("pipe");return 1;}// Father process read// Chile process write// 2.创建子进程pid_t id = fork();if (id < 0){perror("fork");return 2;}else if (id == 0){// 子进程// 3.关闭不需要的fd,关闭readclose(fds[0]);int cont = 0;int total = 0;while (true){std::string message = "h";message += std::to_string(getpid());message += ",";message += std::to_string(cont);// fds[1]//:: 表示是系统调用,与库函数调用区分total += ::write(fds[1], message.c_str(), message.size());cont++;std::cout << "total : " << total << std::endl;sleep(2);// break;}exit(0);}else{// 父进程// 3.关闭不需要的fd,关闭write// close(fds[1]);char buff[1024];int cont = 0;int total = 0;while (true){ssize_t n = ::read(fds[0], buff, sizeof(buff));if (n > 0){buff[n] = 0;std::cout << "child->father,message : " << buff << std::endl;}else if (n == 0){// 写段关闭// 读端读完管道内的数据// 在读取的时候就会读到返回值0// 也表示读到文件结尾std::cout << "n : " << n << std::endl;std::cout << "chile quit ?? me too " << std::endl;break;}}int status = 0;// pid_t rid = waitpid(id,nullptr,0);// std::cout <<"father wait success "<<std::endl;pid_t rid = waitpid(id, &status, 0);std::cout << "father wait success: " << rid<< ",eixt code : " << ((status << 8) & 0XFF) << ", exit :" << (status & 0X7F) << std::endl;}return 0;
}
读正常&&写满
read会进行阻塞
写正常&&读空
write进行堵塞
写关闭&&读正常
读端读到0,表示读到文件的结尾
读关闭&&写正常
OS会直接杀死进程,杀死的方式是向进程发送13信号
3.8管道应用---进程池
Task.hpp:
#pragma once#include <iostream>
#include <unordered_map>
#include <functional>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>using task_t = std::function<void()>;class TaskManger
{
public:TaskManger(){srand(time(nullptr));tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n"<< std::endl; });tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行url解析\n"<< std::endl; });tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行加密任务\n"<< std::endl; });tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n"<< std::endl; });}int SelectTask(){return rand() % tasks.size();}void Excute(unsigned long number){if (number > tasks.size() || number < 0)return;tasks[number]();}~TaskManger(){}private:std::vector<task_t> tasks;
};TaskManger tm;void Worker()
{while (true){int cmd = 0;//输入任务码int n = ::read(0, &cmd, sizeof(cmd));//执行特定任务码的任务if (n == sizeof(cmd)){tm.Excute(cmd);}else if (n == 0){std::cout << "pid: " << getpid() << " quit..." << std::endl;break;}else{}}
}
Channel.hpp:渠道,每一个子进程都有一个对应的通道,但是对于父进程来说,他要管理子进程这些通道,先描述在组织
#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__#include <iostream>
#include <unistd.h>
#include <string>// 先描述在组织
class Channel
{
public:Channel(int wfd, pid_t who) : _wfd(wfd), _who(who){// Channel-3-1234_name = "Channel-" + std::to_string(wfd) + "-" + std::to_string(who);}std::string Name(){return _name;}pid_t Id(){return _who;}int Wfd(){return _wfd;}void Send(int cmd){::write(_wfd, &cmd, sizeof(cmd));}void Close(){::close(_wfd);}~Channel(){}private:int _wfd;//写端,文件inodestd::string _name;pid_t _who;//进程id
};#endif
ProcessPool.hpp:
#ifndef __PROCESSPOOL_HPP__
#define __PROCESSPOOL_HPP__#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <functional>
#include "Task.hpp"
#include "Channel.hpp"// typedef std::function<void()> work_t;
using work_t = std::function<void()>;enum
{OK = 0,UsageError,PipeError,ForkError
};class ProcessPool
{
public:ProcessPool(int n, work_t w) : processnum(n), work(w){}// channel是输出型参数// work是回调函数int InitProcessPool(){// 2.创建制定个数的进程for (int i = 0; i < processnum; i++){// 1.现由管道int fds[2];int n = pipe(fds);if (n == -1){return PipeError;}// 2.创建进程pid_t id = fork();if (id < 0)return ForkError;else if (id == 0) // 建立通信的信道{// child::close(fds[1]); // readstd::cout << "debug :" << fds[0] << std::endl;dup2(fds[0], 0);work();exit(0);}// 父进程执行::close(fds[0]); // writechannels.emplace_back(fds[1], id);}return OK;}// 分发任务void DispatchTask(){int who = 0;// 2.派发任务int num = 20;while (num--){// 1.选择一个任务,整数int task = tm.SelectTask();// 2.选择一个子进程:采用轮询Channel &curr = channels[who++];who %= channels.size();std::cout << "#######################"<<std::endl;std::cout << "send" <<task<<"to" << curr.Name()<<",任务还剩:"<<num<<std::endl;std::cout << "########################"<<std::endl;//3.派发任务curr.Send(task);sleep(1);}}void CleanProcessPool(){for(auto&c : channels){c.Close();pid_t rid = waitpid(c.Id(),nullptr,0);if(rid > 0){std::cout << "child: "<<rid << "wait....success"<<std::endl;}}}void DebugPrint(){for(auto& c : channels){std::cout << c.Name()<<std::endl;}}~ProcessPool(){}private:std::vector<Channel> channels;//保存的是master打开的所有写端int processnum;work_t work;
};#endif
Main.cc:
#include "ProcessPool.hpp"
#include "Task.hpp"void Usage(std::string proc)
{std::cout << "Usage: " << proc << " process-num" << std::endl;
}// 我们自己就是master
int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);return UsageError;}int num = std::stoi(argv[1]);ProcessPool *pp = new ProcessPool(num, Worker);// 1. 初始化进程池pp->InitProcessPool();// 2. 派发任务pp->DispatchTask();// 3. 退出进程池pp->CleanProcessPool();delete pp;return 0;
}
但是上面的代码最终父进程等不到子进程退出,这是为什么呢?
由上图可以看出,channel相当于是保存了历史上master的所以打开的写端
如何解决这个问题 ---> 反向释放之前的写端
改进后的ProcessPool.hpp中的InitProcessPool函数:
int InitProcessPool(){// 2.创建制定个数的进程for (int i = 0; i < processnum; i++){// 1.现由管道int fds[2];int n = pipe(fds);if (n == -1){return PipeError;}// 2.创建进程pid_t id = fork();if (id < 0)return ForkError;else if (id == 0) // 建立通信的信道{// 关闭历史的wfdstd::cout << getpid() << ",child close history fd: ";for (auto &c : channels){std::cout << c.Wfd() << " ";c.Close();}std::cout << "over" << std::endl;// child::close(fds[1]); // readdup2(fds[0], 0);work();exit(0);}// 父进程执行::close(fds[0]); // writestd::cout << "debug :" << fds[1] << std::endl;channels.emplace_back(fds[1], id);}return OK;}