当前位置: 首页 > news >正文

17.进程间通信(三)

一、System V 消息队列基本结构与理解

消息队列是全双工通信,可以同时收发消息。

结论1:消息队列提供了一种,一个进程给另一个进程发送有类型数据块的方式!

结论2:OS中消息队列可能有多个,要对消息队列进行管理,先描述,在组织。

结论3:两个进程怎么保证看到的是同一个消息队列?用户层约定的key值。

结论4:每个消息队列的总的字节数也是有上限的(MSGMNB),系统上消息队列的总数也有上限 (MSGMNI)的。
结论5:消息队列的生命周期也是随内核的。

数据块结构:

struct msgbuf {long mtype;       /* message type, must be > 0 */char mtext[1];    /* message data */
};

其中mtype为数据类型,由用户层约定,规定必须是大于0的数字。

mtext为数据内容,大小由用户层自定义。

消息队列的数据结构:

二、消息队列的相关接口

1.创建消息队列

int msgget(key_t key, int msgflg);

key为用户层约定标识消息队列唯一性的值。

msgflg,选项IPC_CREAT:不存在就新建,存在就返回现存的。IPC_EXCL:与IPC_CREAT配合使用,不存在就新建,存在就出错返回。

返回值:成功返回消息队列描述符,失败返回-1,错误码被设置。

2.删除消息队列

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

msqid为消息队列描述符。

cmd为对应的操作:

简单点说:IPC_STAT往外带值,IPC_SET往内设值。

buf既可以作输出型参数又可以作输入性参数。

strcut msqid_ds的结构如下:

3.收发消息

发消息:

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

msqid为消息队列标识符。

msqp为消息体,即:

struct msgbuf {long mtype;       /* message type, must be > 0 */char mtext[1];    /* message data */
};

msgsz为消息内容大小,注意不包括类型,只是数据的大小。

msgflg:控制着当前消息队列满 或 到达系统上限 时将要发生的事情, 0即。msgflg=IPC_NOWAIT 表示队列满不等待,返回 EAGAIN 错误 。

返回值:成功0返回,失败-1返回,错误码被设置。

收消息:

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

大体同上,mgstyp为数据类型,msgp在此为输出型参数,带出数据内容。

返回值:成功为消息的数据内容大小,失败返回-1,错误码被设置。

4.msgqueue接口使用

msgqueue.hpp

#ifndef MSGQUEUE_HPP
#define MSGQUEUE_HPP#include <iostream>
#include <string>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>#define PATH_NAME "."
#define PROJ_ID 1
#define MODE_DEFAULT 0666
#define DATA_SIZE 1024
#define EXIT_ERROR(m) \do                \{                 \perror(m);    \exit(1);      \} while (0)class Msgqueue
{struct Msgbuf{long mtype;char mtext[DATA_SIZE];};protected:Msgqueue(){_key = ftok(PATH_NAME, PROJ_ID);std::cout << "Msgqueue() key:0x" << std::hex << _key << std::endl;}~Msgqueue(){}void Getmsqid(const int msgflg){_msqid = msgget(_key, msgflg);if (_msqid < 0){EXIT_ERROR("msgget");}std::cout << "msgget success! msqid:" << _msqid << std::endl;}void Destroy(){int ret = msgctl(_msqid, IPC_RMID, nullptr);if (ret < 0){EXIT_ERROR("msgctl");}std::cout << "msgrm success! msqid:" << _msqid << std::endl;}void Send(const std::string &msg, const long type){Msgbuf buf;buf.mtype = type;strcpy(buf.mtext, msg.c_str());int ret = msgsnd(_msqid, &buf, DATA_SIZE, 0);if (ret < 0){EXIT_ERROR("msgsnd");}}void Receive(std::string &msg, const long type){Msgbuf buf;ssize_t n = msgrcv(_msqid, &buf, DATA_SIZE, type, 0);if (n < 0){EXIT_ERROR("msgrcv");}buf.mtext[n] = 0;msg = buf.mtext;}void Stat(){struct msqid_ds ms;int ret = msgctl(_msqid, IPC_STAT, &ms);std::cout << "Stat key:0x" << std::hex << ms.msg_perm.__key << std::endl;}protected:int _msqid;key_t _key;
};const long server_type = 1;
const long client_type = 2;class Server : public Msgqueue
{
public:Server(){Msgqueue::Getmsqid(IPC_CREAT | IPC_EXCL | MODE_DEFAULT);Stat();}~Server(){Msgqueue::Destroy();}void Receive(std::string &msg){Msgqueue::Receive(msg, client_type);}
};class Client : public Msgqueue
{
public:Client(){Getmsqid(IPC_CREAT);Stat();}~Client(){}void Send(const std::string &msg){Msgqueue::Send(msg, client_type);}
};#endif

三、责任链模式(Chain of Responsibility Pattern)

新需求:
1.client 发送给 server 的输入内容,拼接上时间,进程pid信息。
2.server 收到的内容持久化保存到文件中。
3.文件的内容如果过大,要进行切片保存并在指定的目录下打包保存,命令自定义。
实现结构:
责任链模式,基类HandlerText,派生类HandlerTextFormat,HandlerTextSaveFile,HandlerTextBackupFile分别处理文本格式化,写入到文件,备份并打包。采用链式结构,将这些加工过程链接起来,加工之后给下一个任务进行处理。并设置任务的开关,以此实现功能的裁剪。
实现细节:
采用C++17 filesystem和fstream来处理文件操作。
注意点:
tar命令打包文件如果带上路径会递归式生成目录并打包,因此要切换到指定目录在打包。
验证程序退出的状态码,WIFEXITED(status) 验证子进程是否正常返回,值大于0则表示正常退出,WEXITSTATUS(status) 值为子进程的退出码,两者结合使用。
实现代码如下:
#ifndef CHAIN_OF_RESP_HPP
#define CHAIN_OF_RESP_HPP#include <iostream>
#include <string>
#include <memory>
#include <filesystem>
#include <fstream>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>#define PATH_DEFAULT "./tmp/"
#define FILE_DEFAULT "text.log"
#define EXIT_ERROR(m) \do                \{                 \perror(m);    \exit(1);      \} while (0)class HandlerText
{public:virtual void Excute(const std::string &msg) = 0;void SetNext(const std::shared_ptr<HandlerText> &next){_next = next;}void Disable() { _status = false; }void Enable() { _status = true; }bool Stat() { return _status; }protected:std::shared_ptr<HandlerText> _next;private:bool _status = true;
};// 格式化内容,拼接上时间戳和进程pid
class HandlerTextFormat : public HandlerText
{
public:HandlerTextFormat(const std::string &pathname, const std::string &filename): _pathname(pathname), _filename(filename){}~HandlerTextFormat(){}virtual void Excute(const std::string &msg){std::string format = msg + "\n";// 动态裁剪if (Stat()){std::cout << "step 1:文本格式化" << std::endl;std::string cur = std::to_string((int)time(nullptr));format += "时间戳:" + cur + " 进程pid:" + std::to_string(getpid()) + "\n";}else{std::cout << "step 1:文本格式化(已禁用)" << std::endl;}if (_next){_next->Excute(format);}else{std::cout << "责任链终止在HandlerTextFormat" << std::endl;}}private:std::string _pathname;std::string _filename;
};// 把msg写到文件中
class HandlerTextSaveFile : public HandlerText
{
public:HandlerTextSaveFile(const std::string &pathname, const std::string &filename): _pathname(pathname), _filename(filename){if (std::filesystem::exists(_pathname))return;// 递归创建目录结构try{std::filesystem::create_directories(_pathname);}catch (const std::filesystem::filesystem_error &e){std::cerr << e.what() << std::endl;exit(1);}}virtual void Excute(const std::string &msg){if (Stat()){std::cout << "step 2:文本保存到文件" << std::endl;std::string file = _pathname + _filename;std::ofstream append_file(file, std::ios::app);if (!append_file.is_open()){EXIT_ERROR("open file");}// 写入文件,追加写入append_file << msg;append_file.close();}else{std::cout << "step 2:文本保存到文件(已禁用)" << std::endl;}if (_next){_next->Excute(msg);}else{std::cout << "责任链终止在HandlerTextSaveFile" << std::endl;}}private:std::string _pathname;std::string _filename;
};const int maxLineSize = 5;// 备份(改名,改名成文件名拼接上时间戳),打包文件,删除源文件
class HandlerTextBackupFile : public HandlerText
{
private:bool LineOverFlow(){std::string file = _pathname + _filename;std::ifstream read_file(file);if (!read_file.is_open()){EXIT_ERROR("open file");}std::string line;int lineCount = 0;while (std::getline(read_file, line)){lineCount++;}read_file.close();if (lineCount > _maxLineSize){return true;}return false;}// 备份void Backup(){std::string file = _pathname + _filename;// 新文件名std::string backup_filename = _filename + "." + std::to_string((int)time(nullptr));_backup_filename = backup_filename;std::string backup_file = _pathname + backup_filename;// 重命名std::filesystem::rename(file, backup_file);}// 打包并删除源文件void PackAndDelete(){std::string pack_filename = _backup_filename + ".tgz";pid_t pid = fork();if (pid == 0){// 改变子进程工作路径至_pathname用C++方式std::filesystem::current_path(_pathname);// 子进程程序替换执行tar命令,打包文件execlp("tar", "tar", "-czf", pack_filename.c_str(), _backup_filename.c_str(), nullptr);exit(1);}int status;int ret = waitpid(pid, &status, 0);if (ret < 0){EXIT_ERROR("waitpid");}else{// 子进程正常退出,且退出码为0,删除源文件if (WIFEXITED(status) && WEXITSTATUS(status) == 0){std::filesystem::path file = _pathname + _backup_filename;std::filesystem::remove(file);}}}public:HandlerTextBackupFile(const std::string &pathname, const std::string &filename): _pathname(pathname), _filename(filename), _maxLineSize(maxLineSize){}virtual void Excute(const std::string &msg){if (Stat()){std::cout << "step 3:文本备份" << std::endl;// 备份if (LineOverFlow()){std::cout << "超过最大行数限制,备份" << std::endl;Backup();std::cout << "备份文件名:" << _backup_filename << std::endl;std::cout << "打包并删除备份文件" << std::endl;PackAndDelete();}}else{std::cout << "step 3:文本备份(已禁用)" << std::endl;}if (_next){_next->Excute(msg);}else{std::cout << "责任链终止在HandlerTextBackupFile" << std::endl;}}private:std::string _pathname;std::string _filename;std::string _backup_filename;int _maxLineSize;
};class HandlerEntry
{
private:void StatSet(bool e1, bool e2, bool e3){e1 ? _format->Enable() : _format->Disable();e2 ? _save->Enable() : _save->Disable();e3 ? _backup->Enable() : _backup->Disable();}public:HandlerEntry(bool e1, bool e2, bool e3){_format = std::make_shared<HandlerTextFormat>(PATH_DEFAULT, FILE_DEFAULT);_save = std::make_shared<HandlerTextSaveFile>(PATH_DEFAULT, FILE_DEFAULT);_backup = std::make_shared<HandlerTextBackupFile>(PATH_DEFAULT, FILE_DEFAULT);_format->SetNext(_save);_save->SetNext(_backup);StatSet(e1, e2, e3);}void Run(const std::string &msg){_format->Excute(msg);}private:std::shared_ptr<HandlerTextFormat> _format;std::shared_ptr<HandlerTextSaveFile> _save;std::shared_ptr<HandlerTextBackupFile> _backup;
};#endif // CHAIN_OF_RESP_HPP
http://www.xdnf.cn/news/775225.html

相关文章:

  • ps可选颜色调整
  • 每日一道面试题---ArrayList的自动扩容机制(口述版本)
  • LLM模型量化从入门到精通:Shrink, Speed, Repeat
  • Java线程生命周期详解
  • 【数据分析】第三章 numpy(1)
  • 第二十一章 格式化输出
  • 制作开发AI试衣换装小程序系统介绍
  • URP - 水效果Shader
  • 类和对象(二)
  • 《Pytorch深度学习实践》ch3-反向传播
  • 使用ArcPy批量处理矢量数据
  • 力扣刷题Day 67:N 皇后(51)
  • 树莓派实验
  • 使用Bambi包进行贝叶斯混合效应模型分析
  • 强化学习-深度学习和强化学习领域
  • 通讯录Linux的实现
  • 如何选择合适的哈希算法以确保数据安全?
  • 列表推导式(Python)
  • 线程间和进程间是如何进行通信
  • PH热榜 | 2025-05-30
  • Linux中的mysql逻辑备份与恢复
  • 【AI+若依框架】基础应用篇
  • CUDA内存溢出问题解决方案
  • C++学习打卡1.01
  • SAP BC 修复MM60 报错的问题
  • MySQL 核心知识整理【一】
  • AI智能体|扣子(Coze)搭建【合同/文档审查】工作流
  • 应用程序错误 application error (0xc000007b) 处理方法
  • URL的结构与作用
  • ubuntu系统扩容