当前位置: 首页 > backend >正文

进程间通信:管道与共享内存

目录

一:进程间通信介绍

1.1进程间通信目的 

1.2进程间通信发展

1.3进程间通信分类

二:管道

三:匿名管道 

3.1代码演示

3.2用fork来共享管道原理 

3.3站在文件描述符角度-深度理解管道 

3.4站在内核角度-管道本质

3.5管道读写规则

3.6管道特点

3.7验证管道通信的4种情况

3.8管道应用---进程池


一:进程间通信介绍

1.1进程间通信目的 

数据传输:一个进程需要将它的数据发送给另一个进程

资源共享:多个进程之间共享同样的资源。

通知事件:⼀个进程需要向另⼀个或⼀组进程发送消息,通知它(它们)发生了某种事件(如进 程终止时要通知父进程)。

进程控制:有些进程希望完全控制另⼀个进程的执行(如Debug进程),此时控制进程希望能够 拦截另⼀个进程的所有陷入和异常,并能够及时知道它的状态改变。

进程通信的本质:由OS提供一份公共资源 


1.2进程间通信发展

本地通信:同一台主机,同一个OS,不同的进程之间的通信

管道

System V进程间通信

POSIX进程间通信


1.3进程间通信分类

管道 :匿名管道pipe , 命名管道

System V IPC:System V 消息队列,  System V 共享内存, System V 信号量

POSIX IPC  :消息队列 , 共享内存,信号量 ,互斥量 ,条件变量 ,读写锁


二:管道

什么是管道 ?

管道是Unix中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”


三:匿名管道 

参数 pipefd:文件描述符数组,其中pipefd[0]表示读端,pipefd[1]表是写端

返回值:成功返回0,失败返回错误代码 

进程自己的:new malloc的空间,在堆区上, 是用户级别的,只有自己可以看到

大多数数据结构和共享空间,都有一个引用计数器ref,只有ref为0时,空间才会释放

 

struct_file中的ref记录文件的读取位置,每个进程读同一个文件的读取位置是不同的独立的

如果父子进程同时向一个文件写入,最后结果会出现打印错乱的现象


3.1代码演示

从键盘读取数据,写⼊管道,读取管道,写到屏幕

#include<iostream>
#include<cstdio>
#include<cstdlib>
#include<string.h>
#include<unistd.h>int main()
{int fds[2];char buf[100];int len = 0;if(pipe(fds) == -1){perror("make pipe");return 1;}//read from stdinwhile(fgets(buf,100,stdin)){//write into pipelen = strlen(buf);if(write(fds[1],buf,len) != len){perror("write to pipe");break;}memset(buf,0x00,sizeof(buf));//read from pipeif((len == read(fds[0],buf,sizeof(buf))) == -1){perror("read to pipe");break;}//write into stdoutif(write(1,buf,len)!= len){perror("write to stdout");break;} }return 0;}


3.2用fork来共享管道原理 


3.3站在文件描述符角度-深度理解管道 

管道只能单向通信

如果使用完之后不关闭fd,可能会导致内存泄漏或者误操作,使用完之后,最好关闭

先创建管道,再创建子进程,利用子进程可以看到父进程这一特性,只有父进程先创建文件,之后子进程才能看到管道资源。如果先创建子进程,再创建管道,父子进程无法看到同一个资源

父进程打开文件时,要以读写的方式打开文件,这样子进程继承之后,对文件可以R/W,如果父进程只是读(写),那子进程也只能读(写)

管道不需要路径不要名字 ---> 匿名管道 

#define ERR_EXIT(m)         \do                      \{                       \perror(m);          \exit(EXIT_FAILURE); \} while (0)// 子进程写,父进程读
int main(int argc, char *argv[])
{int pipfd[2];if (pipe(pipfd) == -1){ERR_EXIT("pipe error");}pid_t id = fork();if (id < 0)ERR_EXIT("fork error");else if (id == 0){close(pipfd[0]);write(pipfd[1], "hello", 6);close(pipfd[1]);ERR_EXIT(EXIT_SUCCESS);}else{char buff[1024];close(pipfd[1]);read(pipfd[0], buff, sizeof(buff));printf("buff=%s\n", buff);//close(pipfd[0]);}return 0;
}


3.4站在内核角度-管道本质

所以,看待管道,就如同看待文件一样!管道的使用和文件⼀致,迎合了“Linux⼀切皆文件的思想”

 

重定向到log.txt的文件里面

同时可以:

#将标准输出和标准错误的信息都放在ok.txt中
./mypipe 1>ok.txt 2>&1
#进行输出是分离信息,将正常打印的放在ok.txt中,将错误信息放在err.txt中
./mypipe 1>ok.txt 2>err.txt

3.5管道读写规则

当没有数据可读时:

O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直到有数据来为止

O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。

当管道满的时候:

O_NONBLOCK disable:write调用阻塞,直到有进程读走数据

O_NONBLOCK enable:调用返回-1,errno值为EAGAIN

如果所有管道写端对应的文件描述符被关闭,则read返回0

如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出

当要写入的数据量不大于PIPE_BUF(ubuntu下是64KB)时,linux将保证写入的原子性。

当要写入的数据量大于PIPE_BUF时,linux将不再保证写⼊的原子性


3.6管道特点

只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;

通常,一个管道由一个进程创建,然后该进程调用fork,此后父子进程之间就可应用该管道。

管道提供流式服务(面向字节流):读端不管写端写了多少内容,读端只按照自己读的方式读这个管道,写端也不管读端是怎么读的,只按照自己的格式向管道写内容,对于读写两端管道是流式的

⼀般而言,进程退出,管道释放,所以管道的生命周期随进程

⼀般而言,内核会对管道操作进行同步与互斥

管道是半双工的,数据只能向⼀个方向流动(单向数据通信);需要双方通信时,需要建立起两个管道

 


3.7验证管道通信的4种情况

// 验证管道的四种读写情况
//子进程写,父进程读
int main()
{// 1.创建管道int fds[2] = {0};int n = pipe(fds); // fds:输出型参数if (n != 0){perror("pipe");return 1;}// Father process read// Chile process write// 2.创建子进程pid_t id = fork();if (id < 0){perror("fork");return 2;}else if (id == 0){// 子进程// 3.关闭不需要的fd,关闭readclose(fds[0]);int cont = 0;int total = 0;while (true){std::string message = "h";message += std::to_string(getpid());message += ",";message += std::to_string(cont);// fds[1]//:: 表示是系统调用,与库函数调用区分total += ::write(fds[1], message.c_str(), message.size());cont++;std::cout << "total : " << total << std::endl;sleep(2);// break;}exit(0);}else{// 父进程// 3.关闭不需要的fd,关闭write// close(fds[1]);char buff[1024];int cont = 0;int total = 0;while (true){ssize_t n = ::read(fds[0], buff, sizeof(buff));if (n > 0){buff[n] = 0;std::cout << "child->father,message : " << buff << std::endl;}else if (n == 0){// 写段关闭// 读端读完管道内的数据// 在读取的时候就会读到返回值0// 也表示读到文件结尾std::cout << "n : " << n << std::endl;std::cout << "chile quit ?? me too " << std::endl;break;}}int status = 0;// pid_t rid = waitpid(id,nullptr,0);// std::cout <<"father wait success "<<std::endl;pid_t rid = waitpid(id, &status, 0);std::cout << "father wait success:  " << rid<< ",eixt code : " << ((status << 8) & 0XFF) << ", exit :" << (status & 0X7F) << std::endl;}return 0;
}

读正常&&写满

read会进行阻塞 

写正常&&读空

write进行堵塞 

写关闭&&读正常

读端读到0,表示读到文件的结尾

读关闭&&写正常

OS会直接杀死进程,杀死的方式是向进程发送13信号


3.8管道应用---进程池

Task.hpp: 

#pragma once#include <iostream>
#include <unordered_map>
#include <functional>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>using task_t = std::function<void()>;class TaskManger
{
public:TaskManger(){srand(time(nullptr));tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n"<< std::endl; });tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行url解析\n"<< std::endl; });tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行加密任务\n"<< std::endl; });tasks.push_back([](){ std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n"<< std::endl; });}int SelectTask(){return rand() % tasks.size();}void Excute(unsigned long number){if (number > tasks.size() || number < 0)return;tasks[number]();}~TaskManger(){}private:std::vector<task_t> tasks;
};TaskManger tm;void Worker()
{while (true){int cmd = 0;//输入任务码int n = ::read(0, &cmd, sizeof(cmd));//执行特定任务码的任务if (n == sizeof(cmd)){tm.Excute(cmd);}else if (n == 0){std::cout << "pid: " << getpid() << " quit..." << std::endl;break;}else{}}
}

Channel.hpp:渠道,每一个子进程都有一个对应的通道,但是对于父进程来说,他要管理子进程这些通道,先描述在组织

#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__#include <iostream>
#include <unistd.h>
#include <string>// 先描述在组织
class Channel
{
public:Channel(int wfd, pid_t who) : _wfd(wfd), _who(who){// Channel-3-1234_name = "Channel-" + std::to_string(wfd) + "-" + std::to_string(who);}std::string Name(){return _name;}pid_t Id(){return _who;}int Wfd(){return _wfd;}void Send(int cmd){::write(_wfd, &cmd, sizeof(cmd));}void Close(){::close(_wfd);}~Channel(){}private:int _wfd;//写端,文件inodestd::string _name;pid_t _who;//进程id
};#endif

 ProcessPool.hpp:

#ifndef __PROCESSPOOL_HPP__
#define __PROCESSPOOL_HPP__#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <functional>
#include "Task.hpp"
#include "Channel.hpp"// typedef std::function<void()> work_t;
using work_t = std::function<void()>;enum
{OK = 0,UsageError,PipeError,ForkError
};class ProcessPool
{
public:ProcessPool(int n, work_t w) : processnum(n), work(w){}// channel是输出型参数// work是回调函数int InitProcessPool(){// 2.创建制定个数的进程for (int i = 0; i < processnum; i++){// 1.现由管道int fds[2];int n = pipe(fds);if (n == -1){return PipeError;}// 2.创建进程pid_t id = fork();if (id < 0)return ForkError;else if (id == 0) // 建立通信的信道{// child::close(fds[1]); // readstd::cout << "debug :" << fds[0] << std::endl;dup2(fds[0], 0);work();exit(0);}// 父进程执行::close(fds[0]); // writechannels.emplace_back(fds[1], id);}return OK;}// 分发任务void DispatchTask(){int who = 0;// 2.派发任务int num = 20;while (num--){// 1.选择一个任务,整数int task = tm.SelectTask();// 2.选择一个子进程:采用轮询Channel &curr = channels[who++];who %= channels.size();std::cout << "#######################"<<std::endl;std::cout << "send" <<task<<"to" << curr.Name()<<",任务还剩:"<<num<<std::endl;std::cout << "########################"<<std::endl;//3.派发任务curr.Send(task);sleep(1);}}void CleanProcessPool(){for(auto&c : channels){c.Close();pid_t rid = waitpid(c.Id(),nullptr,0);if(rid > 0){std::cout << "child: "<<rid << "wait....success"<<std::endl;}}}void DebugPrint(){for(auto& c : channels){std::cout << c.Name()<<std::endl;}}~ProcessPool(){}private:std::vector<Channel> channels;//保存的是master打开的所有写端int processnum;work_t work;
};#endif

 Main.cc:

#include "ProcessPool.hpp"
#include "Task.hpp"void Usage(std::string proc)
{std::cout << "Usage: " << proc << " process-num" << std::endl;
}// 我们自己就是master
int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);return UsageError;}int num = std::stoi(argv[1]);ProcessPool *pp = new ProcessPool(num, Worker);// 1. 初始化进程池pp->InitProcessPool();// 2. 派发任务pp->DispatchTask();// 3. 退出进程池pp->CleanProcessPool();delete pp;return 0;
}

 

但是上面的代码最终父进程等不到子进程退出,这是为什么呢?

 

由上图可以看出,channel相当于是保存了历史上master的所以打开的写端

如何解决这个问题 ---> 反向释放之前的写端

改进后的ProcessPool.hpp中的InitProcessPool函数:

int InitProcessPool(){// 2.创建制定个数的进程for (int i = 0; i < processnum; i++){// 1.现由管道int fds[2];int n = pipe(fds);if (n == -1){return PipeError;}// 2.创建进程pid_t id = fork();if (id < 0)return ForkError;else if (id == 0) // 建立通信的信道{// 关闭历史的wfdstd::cout << getpid() << ",child close history fd: ";for (auto &c : channels){std::cout << c.Wfd() << " ";c.Close();}std::cout << "over" << std::endl;// child::close(fds[1]); // readdup2(fds[0], 0);work();exit(0);}// 父进程执行::close(fds[0]); // writestd::cout << "debug :" << fds[1] << std::endl;channels.emplace_back(fds[1], id);}return OK;}

 

http://www.xdnf.cn/news/16973.html

相关文章:

  • Antlr学习笔记 02、使用antlr4实现简易版计算器
  • 【无标题】标准 I/O 中的一些函数,按功能分类说明其用法和特点
  • 【LeetCode刷题集】--排序(一)
  • clocking_cb驱动之坑
  • BackgroundTasks 如何巧妙驾驭多任务并发?
  • 测试-概念篇(3)
  • <PhotoShop><JavaScript><脚本>基于JavaScript,利用脚本实现PS软件批量替换图片,并转换为智能对象?
  • Linux 逻辑卷管理
  • 深入理解Spring中的循环依赖及解决方案
  • ssh连接VirtualBox中的Ubuntu24.04(win11、putty、NAT 模式)
  • 模型蒸馏(Distillation):原理、算法、应用
  • 每日任务day0804:小小勇者成长记之药剂师的小咪
  • 深入剖析Java Stream API性能优化实践指南
  • AgxOrin平台JetPack5.x版本fix multi-cam race condition 补丁
  • (2023ICML)BLIP-2:使用冻结图像编码器和大语言模型引导语言-图像预训练
  • Ubuntu共享文件夹权限设置
  • 【数据结构初阶】--顺序表(一)
  • 使用AWS for PHP SDK实现Minio文件上传
  • nodejs 封装方法将打印日志输出到指定文件
  • mybatis-plus报错Caused by: java.sql.SQLException: 无效的列类型: 1111
  • 论文Review LIO Multi-session Voxel-SLAM | 港大MARS出品!体素+平面特征的激光SLAM!经典必读!
  • Spring Boot 应用结合 Knife4j 进行 API 分组授权管理配置
  • 【世纪龙科技】汽车自动变速器拆装虚拟实训软件
  • 国产化低代码平台如何筑牢企业数字化安全底座
  • Go语言 并发安全sync
  • Linux 磁盘管理
  • 如何选择一个容易被搜索引擎发现的域名?
  • 从零开始的云计算生活——项目实战
  • Perl 面向对象编程深入解析
  • 京东商品销量数据如何获取?API接口调用操作详解