当前位置: 首页 > web >正文

实现一个进程池(精讲)

目录

写进程池前的理论扫盲

进程池的实现


写进程池前的理论扫盲

父进程创建子进程,父子俩都看见同一片资源,这片资源被俩进程利用,用来通信,这片资源就是管道,如图所示,能很好地诠释管道。

那么什么是进程池呢?

我画了一副图,这幅图也很好的解释了什么是进程池,父进程通过管道给子进程分配任务,或者通信,有多个进程,就叫作进程池

而我们开始之前,已经将编译器转换成了vscode,环境变成了Ubuntu,语言使用C++。按照上图来,父进程向管道写,子进程从管道读。

进程池的实现

进程池最重要的就是管道,首先就要在头文件里面写类,首先就是管道类,然后是管道组织类,因为我们要把管道组织起来,最后就是进程池类,我们需要对进程池实现一些函数,首先,有一个头文件 .hpp和 .cc文件.

#include<iostream>
#include<vector>//管道类,先描述
class channel
{
public:channel(){}~channel(){}private:};const int gdefaultnum = 5;//管道数量,暂时定为5个//管道管理类,再组织
class ChannelManage
{
public:ChannelManage(){}~ChannelManage(){}
private:std::vector<channel> _Channels;
};//进程池类
class ProcessPool
{
public:ProcessPool(){}~ProcessPool(){}
private:ChannelManage _cm;
};

管道管理类的变量就是一个内容为 channel 的 vector ,名字叫 _Channels,而进程池类的变量就是这个vector。我们还定义了一个全局变量,gdefaultnum,表示为管道数量。

接下来,我们该创建管道了。将函数定为start,我们看看怎么写的。

#ifndef _PROCESS_POOL_HPP_
#define _PROCESS_POOL_HPP_#include<iostream>
#include<vector>
#include<unistd.h>
#include<cstdlib>//管道类,先描述,建立信道
class channel
{
public:channel(int fd,pid_t pid):_wfd(fd),pid(pid){std::cout << "channel-" <<std::to_string(_wfd)+std::to_string(pid);}~channel(){}private:int _wfd;pid_t pid;std::string name;};const int gdefaultnum = 5;//管道数量,暂时定为5个//管道管理类,再组织
class ChannelManage
{
public:ChannelManage(){}void BuildChannel(int wfd,pid_t pid ){//vector的一个函数,不需要构建临时对象,直接就可以尾插到vector里面_Channels.emplace_back(wfd,pid);}~ChannelManage(){}
private:std::vector<channel> _Channels;
};//进程池类
class ProcessPool
{
public:ProcessPool():_process_num(gdefaultnum){}void Work(int rfd){std::cout<<"子进程工作" <<std::endl;}bool Start(){for (int i = 0; i < _process_num; i++){//1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if(n < 0)return false;//2.创建进程pid_t pid = fork();if(pid < 0){//创建进程失败return false;}else if(pid == 0){//子进程//我们要求的是父进程去写,子进程去读,所以我们要关掉不要用的读写端,0是读,1是写close(pipefd[1]);//子进程关掉写//建立子进程,那么子进程就要工作,先不弄太复杂,就简单写个函数Work(pipefd[0]);close(pipefd[0]);//到最后都要关掉}else{//父进程//关掉读close(pipefd[0]);//父进程创建子进程之后,我们要给子进程建立一个通信信道_cm.BuildChannel(pipefd[1],pid);close(pipefd[1]);//到最后都要关掉}}return true;}~ProcessPool(){}
private:ChannelManage _cm;int _process_num;//管道数量
};#endif
  • 创建管道,使用pipe函数(记得要包的头文件哈!),然后创建进程,子进程负责完成任务,父进程负责为子进程开通管道,自然而然地,有了BuildChannel函数。
  • 我们看到管理管道类里面的BuildChannel函数,我们虽然使用了vector的emplace_back函数,可以不用创建临时对象的,但是我们应该明白底层逻辑是什么样的,
  • 底层逻辑就是创建了一个channel(管道)临时对象,将他尾插到_cm里面之后,再将其销毁。而创建管道是需要父进程的写入端和子进程的pid。

我们创建了子进程,并为他开创了信道之后,就可以得到命令,然后去执行命令了,但是要执行命令,也要选择合适的子进程,让合适的子进程去执行任务。所以下一步需要解决的就是选择合适的子进程执行命令。

那么,怎么挑选合适的子进程呢?有一种常用的方法,轮询这个方法就是从第一个子进程开始,依次往下执行,直到结束,然后再从第一个开始,我们就使用轮询的方法吧。

#ifndef _PROCESS_POOL_HPP_
#define _PROCESS_POOL_HPP_#include<iostream>
#include<vector>
#include<unistd.h>
#include<cstdlib>//管道类,先描述,建立信道
class channel
{
public:channel(int fd,pid_t pid):_wfd(fd),pid(pid){std::cout << "channel-" <<std::to_string(_wfd)+std::to_string(pid);}~channel(){}private:int _wfd;pid_t pid;std::string name;};const int gdefaultnum = 5;//管道数量,暂时定为5个//管道管理类,再组织
class ChannelManage
{
public:ChannelManage():next(0){}void BuildChannel(int wfd,pid_t pid ){//vector的一个函数,不需要构建临时对象,直接就可以尾插到vector里面_Channels.emplace_back(wfd,pid);}//挑选合适的子进程去执行命令channel& Select(){//轮询auto& c = _Channels[0];next++;next %= _Channels.size();return c;}~ChannelManage(){}
private:std::vector<channel> _Channels;int next;
};//进程池类
class ProcessPool
{
public:ProcessPool():_process_num(gdefaultnum){}void Work(int rfd){std::cout<<"子进程工作" <<std::endl;}bool Start(){for (int i = 0; i < _process_num; i++){//1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if(n < 0)return false;//2.创建进程pid_t pid = fork();if(pid < 0){//创建进程失败return false;}else if(pid == 0){//子进程//我们要求的是父进程去写,子进程去读,所以我们要关掉不要用的读写端,0是读,1是写close(pipefd[1]);//子进程关掉写//建立子进程,那么子进程就要工作,先不弄太复杂,就简单写个函数Work(pipefd[0]);close(pipefd[0]);//到最后都要关掉}else{//父进程//关掉读close(pipefd[0]);//父进程创建子进程之后,我们要给子进程建立一个通信信道_cm.BuildChannel(pipefd[1],pid);close(pipefd[1]);//到最后都要关掉}}return true;}void Run(){auto &c = _cm.Select();}~ProcessPool(){}
private:ChannelManage _cm;int _process_num;//管道数量
};#endif

我们已经完成了开创信道,选择子进程,接下来的任务就是写向写入端写入命令。

class channel
{
public:channel(int fd,pid_t pid):_wfd(fd),pid(pid){std::cout << "channel-" <<std::to_string(_wfd)+std::to_string(pid);}void Send(int code){int n = write(_wfd,&code,sizeof(code));(void)n;}~channel(){}private:int _wfd;pid_t pid;std::string name;};
void Run(){int task_code = 0;//选择一个子进程auto &c = _cm.Select();//向写入端发送命令c.Send(task_code);}

上面我们还不知道需要发送什么命令,所以就随便设置了一个0,现在我们需要一套完整的命令了,直接写一个task.hpp文件,设置一套完整的命令。下面是task.hpp的代码编写

#pragma once#include<iostream>
#include<vector>
#include<ctime>typedef void (*task_t)();////////////////执行任务///////////////////////////////////////////////////////
void Printlog()
{std::cout << "我是一个打印日志的任务"<<std::endl;
}void Download()
{std::cout << "我是一个下载的任务"<<std::endl;
}void Upload()
{std::cout << "我是一个上传的任务"<<std::endl;
}///////////////////////////////////////////////////////////////////////////////class TaskManager
{
public:TaskManager(){srand(time(nullptr));}//将执行函数放进去void Register(task_t t){_tasks.push_back(t);}int Code(){return rand() % _tasks.size();}void Execute(int code){if(code >=0 && code < _tasks.size()){_tasks[code];}}~TaskManager(){}private:std::vector<task_t> _tasks;
};

写了任务表了,那就要让子进程在读取管道信息的时候接收到这个任务命令,那么将变动Work函数,让他工作。下面是将命令融合到整个程序里去的代码

#ifndef _PROCESS_POOL_HPP_
#define _PROCESS_POOL_HPP_#include<iostream>
#include<vector>
#include<unistd.h>
#include<cstdlib>
#include"task.hpp"//管道类,先描述,建立信道
class channel
{
public:channel(int fd,pid_t pid):_wfd(fd),pid(pid){std::cout << "channel-" <<std::to_string(_wfd)+std::to_string(pid);}void Send(int code){int n = write(_wfd,&code,sizeof(code));(void)n;}~channel(){}int getwfd(){return _wfd; }pid_t getpid(){return pid;}std::string getname(){return name;}private:int _wfd;pid_t pid;std::string name;};const int gdefaultnum = 5;//管道数量,暂时定为5个//管道管理类,再组织
class ChannelManage
{
public:ChannelManage():next(0){}void BuildChannel(int wfd,pid_t pid ){//vector的一个函数,不需要构建临时对象,直接就可以尾插到vector里面_Channels.emplace_back(wfd,pid);}//挑选合适的子进程去执行命令channel& Select(){//轮询auto& c = _Channels[0];next++;next %= _Channels.size();return c;}~ChannelManage(){}
private:std::vector<channel> _Channels;int next;
};//进程池类
class ProcessPool
{
public:ProcessPool():_process_num(gdefaultnum){//注册任务_tm.Register(Printlog);_tm.Register(Download);_tm.Register(Upload);}void Work(int rfd){while(true){int code = 0;size_t n = read(rfd,&code,sizeof(code));//从读端读到了code,之前父进程写进管道的任务码if(n > 0){if(n != sizeof(code)){continue;}//读到规范的了std::cout << "进程: "<<getpid()<<"收到一个任务码: "<<code<<std::endl;_tm.Execute(code);}else if(n == 0){std::cout<<"子进程退出"<<std::endl;}else{std::cout<<"读取错误"<<std::endl;}}}bool Start(){for (int i = 0; i < _process_num; i++){//1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if(n < 0)return false;//2.创建进程pid_t pid = fork();if(pid < 0){//创建进程失败return false;}else if(pid == 0){//子进程//我们要求的是父进程去写,子进程去读,所以我们要关掉不要用的读写端,0是读,1是写close(pipefd[1]);//子进程关掉写//建立子进程,那么子进程就要工作,先不弄太复杂,就简单写个函数Work(pipefd[0]);close(pipefd[0]);//到最后都要关掉}else{//父进程//关掉读close(pipefd[0]);//父进程创建子进程之后,我们要给子进程建立一个通信信道_cm.BuildChannel(pipefd[1],pid);close(pipefd[1]);//到最后都要关掉}}return true;}void Run(){int task_code = _tm.Code();//随机生成的一个code//选择一个子进程auto &c = _cm.Select();std::cout <<"选择了一个子进程: "<<c.getname() <<std::endl;//向写入端发送命令c.Send(task_code);std::cout <<"发送了一个任务码: "<<task_code <<std::endl;}~ProcessPool(){}
private:ChannelManage _cm;int _process_num;//管道数量TaskManager _tm;//命令管理
};#endif

尤其可以注意一下Work的变动。

上面所实现的就是我们需要的功能,现在我们需要实现一些关闭和等待的功能,我们的管道和进程都是需要实现关闭功能的。

    void Close(int wfd){close(_wfd);}void Wait(){pid_t id = waitpid(pid,nullptr,0);(void)id;}
    void StopSubProcess(){for(auto& channel : _Channels){channel.Close();std::cout<<"关闭:"<<channel.getname()<<std::endl;}}void WaitSubProcess(){for(auto& channel : _Channels){channel.Wait();std::cout<<"回收子进程" <<std::endl;}}
    void Stop(){_cm.StopSubProcess();_cm.WaitSubProcess();}

这就是三个类分别的关闭等待代码,从下往上,是不是感受到了层层调用?

至此,我们的代码已经写完了,我会将完整代码包括测试代码链接贴在下面,大家自行取用。

https://gitee.com/i-still-want-to-be-an-npc/vscode-code

http://www.xdnf.cn/news/17440.html

相关文章:

  • 大模型量化上溢及下溢解析
  • 常见数据结构介绍(顺序表,单链表,双链表,单向循环链表,双向循环链表、内核链表、栈、队列、二叉树)
  • LeetCode 刷题【34. 在排序数组中查找元素的第一个和最后一个位置、35. 搜索插入位置】
  • Redis7集群搭建与原理分析
  • 基于Web的交互式坐标系变换矩阵计算工具
  • BGP综合实验练习作业
  • 使用OAK相机实现智能物料检测与ABB机械臂抓取
  • 从零构建TransformerP2-新闻分类Demo
  • Langchain入门:构建一个基于SQL数据的问答系统
  • 後端開發技術教學(三) 表單提交、數據處理
  • 汽车零部件深孔加工质控升级:新启航激光频率梳 3D 测量解决传统光学扫描遮挡
  • 应急响应流程
  • ADB 命令执行模块开发:双模式(普通模式Shell交互模式)实现、线程安全与资源管理优化
  • Nextcloud容器化部署新范式:Docker与Cpolar如何重塑私有云远程访问能力
  • 为什么输入 URL 后会显示页面?HTTP 协议的 “幕后操作”
  • docker缓存目录转移设置和生效过程
  • WPF 双击行为实现详解:DoubleClickBehavior 源码分析与实战指南
  • linux信号量和日志
  • 杂谈 001 · VScode / Copilot 25.08 更新
  • 【系统编程】进程初识
  • 用JOIN替代子查询的查询性能优化
  • GESP2023年12月认证C++一级( 第三部分编程题(2)小杨报数)
  • 行业速览:中国新能源汽车市场格局与关键趋势
  • 解码华为云安全“铁三角”:用“分层防御”化解安全挑战
  • mac电脑解决在不同项目需要频繁手动切换node版本的困扰
  • JDY后端一二三面经(已OC)
  • 分享超图提供的、很不错的WebGIS学习资源
  • Dixon‘s 因子分解法——C语言实现
  • 基于R语言,“上百种机器学习模型”学习教程 | Mime包
  • 手搓MCP全流程指南:从本地开发部署到PyPI公开发布