17.进程间通信(三)
一、System V 消息队列基本结构与理解
消息队列是全双工通信,可以同时收发消息。
结论1:消息队列提供了一种,一个进程给另一个进程发送有类型数据块的方式!
结论2:OS中消息队列可能有多个,要对消息队列进行管理,先描述,在组织。
结论3:两个进程怎么保证看到的是同一个消息队列?用户层约定的key值。
结论4:每个消息队列的总的字节数也是有上限的(MSGMNB),系统上消息队列的总数也有上限 (MSGMNI)的。结论5:消息队列的生命周期也是随内核的。数据块结构:
struct msgbuf {long mtype; /* message type, must be > 0 */char mtext[1]; /* message data */ };
其中mtype为数据类型,由用户层约定,规定必须是大于0的数字。
mtext为数据内容,大小由用户层自定义。
消息队列的数据结构:
二、消息队列的相关接口
1.创建消息队列
int msgget(key_t key, int msgflg);
key为用户层约定标识消息队列唯一性的值。
msgflg,选项IPC_CREAT:不存在就新建,存在就返回现存的。IPC_EXCL:与IPC_CREAT配合使用,不存在就新建,存在就出错返回。
返回值:成功返回消息队列描述符,失败返回-1,错误码被设置。
2.删除消息队列
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
msqid为消息队列描述符。
cmd为对应的操作:
简单点说:IPC_STAT往外带值,IPC_SET往内设值。
buf既可以作输出型参数又可以作输入性参数。
strcut msqid_ds的结构如下:
3.收发消息
发消息:
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
msqid为消息队列标识符。
msqp为消息体,即:
struct msgbuf {long mtype; /* message type, must be > 0 */char mtext[1]; /* message data */ };
msgsz为消息内容大小,注意不包括类型,只是数据的大小。
msgflg:控制着当前消息队列满 或 到达系统上限 时将要发生的事情, 0即。msgflg=IPC_NOWAIT 表示队列满不等待,返回 EAGAIN 错误 。
返回值:成功0返回,失败-1返回,错误码被设置。
收消息:
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
大体同上,mgstyp为数据类型,msgp在此为输出型参数,带出数据内容。
返回值:成功为消息的数据内容大小,失败返回-1,错误码被设置。
4.msgqueue接口使用
msgqueue.hpp
#ifndef MSGQUEUE_HPP #define MSGQUEUE_HPP#include <iostream> #include <string> #include <cstdio> #include <cstdlib> #include <cstring> #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h>#define PATH_NAME "." #define PROJ_ID 1 #define MODE_DEFAULT 0666 #define DATA_SIZE 1024 #define EXIT_ERROR(m) \do \{ \perror(m); \exit(1); \} while (0)class Msgqueue {struct Msgbuf{long mtype;char mtext[DATA_SIZE];};protected:Msgqueue(){_key = ftok(PATH_NAME, PROJ_ID);std::cout << "Msgqueue() key:0x" << std::hex << _key << std::endl;}~Msgqueue(){}void Getmsqid(const int msgflg){_msqid = msgget(_key, msgflg);if (_msqid < 0){EXIT_ERROR("msgget");}std::cout << "msgget success! msqid:" << _msqid << std::endl;}void Destroy(){int ret = msgctl(_msqid, IPC_RMID, nullptr);if (ret < 0){EXIT_ERROR("msgctl");}std::cout << "msgrm success! msqid:" << _msqid << std::endl;}void Send(const std::string &msg, const long type){Msgbuf buf;buf.mtype = type;strcpy(buf.mtext, msg.c_str());int ret = msgsnd(_msqid, &buf, DATA_SIZE, 0);if (ret < 0){EXIT_ERROR("msgsnd");}}void Receive(std::string &msg, const long type){Msgbuf buf;ssize_t n = msgrcv(_msqid, &buf, DATA_SIZE, type, 0);if (n < 0){EXIT_ERROR("msgrcv");}buf.mtext[n] = 0;msg = buf.mtext;}void Stat(){struct msqid_ds ms;int ret = msgctl(_msqid, IPC_STAT, &ms);std::cout << "Stat key:0x" << std::hex << ms.msg_perm.__key << std::endl;}protected:int _msqid;key_t _key; };const long server_type = 1; const long client_type = 2;class Server : public Msgqueue { public:Server(){Msgqueue::Getmsqid(IPC_CREAT | IPC_EXCL | MODE_DEFAULT);Stat();}~Server(){Msgqueue::Destroy();}void Receive(std::string &msg){Msgqueue::Receive(msg, client_type);} };class Client : public Msgqueue { public:Client(){Getmsqid(IPC_CREAT);Stat();}~Client(){}void Send(const std::string &msg){Msgqueue::Send(msg, client_type);} };#endif
三、责任链模式(Chain of Responsibility Pattern)
新需求:1.client 发送给 server 的输入内容,拼接上时间,进程pid信息。2.server 收到的内容持久化保存到文件中。3.文件的内容如果过大,要进行切片保存并在指定的目录下打包保存,命令自定义。实现结构:责任链模式,基类HandlerText,派生类HandlerTextFormat,HandlerTextSaveFile,HandlerTextBackupFile分别处理文本格式化,写入到文件,备份并打包。采用链式结构,将这些加工过程链接起来,加工之后给下一个任务进行处理。并设置任务的开关,以此实现功能的裁剪。
实现细节:采用C++17 filesystem和fstream来处理文件操作。注意点:tar命令打包文件如果带上路径会递归式生成目录并打包,因此要切换到指定目录在打包。验证程序退出的状态码,WIFEXITED(status) 验证子进程是否正常返回,值大于0则表示正常退出,WEXITSTATUS(status) 值为子进程的退出码,两者结合使用。实现代码如下:#ifndef CHAIN_OF_RESP_HPP #define CHAIN_OF_RESP_HPP#include <iostream> #include <string> #include <memory> #include <filesystem> #include <fstream> #include <ctime> #include <unistd.h> #include <sys/types.h> #include <sys/wait.h>#define PATH_DEFAULT "./tmp/" #define FILE_DEFAULT "text.log" #define EXIT_ERROR(m) \do \{ \perror(m); \exit(1); \} while (0)class HandlerText {public:virtual void Excute(const std::string &msg) = 0;void SetNext(const std::shared_ptr<HandlerText> &next){_next = next;}void Disable() { _status = false; }void Enable() { _status = true; }bool Stat() { return _status; }protected:std::shared_ptr<HandlerText> _next;private:bool _status = true; };// 格式化内容,拼接上时间戳和进程pid class HandlerTextFormat : public HandlerText { public:HandlerTextFormat(const std::string &pathname, const std::string &filename): _pathname(pathname), _filename(filename){}~HandlerTextFormat(){}virtual void Excute(const std::string &msg){std::string format = msg + "\n";// 动态裁剪if (Stat()){std::cout << "step 1:文本格式化" << std::endl;std::string cur = std::to_string((int)time(nullptr));format += "时间戳:" + cur + " 进程pid:" + std::to_string(getpid()) + "\n";}else{std::cout << "step 1:文本格式化(已禁用)" << std::endl;}if (_next){_next->Excute(format);}else{std::cout << "责任链终止在HandlerTextFormat" << std::endl;}}private:std::string _pathname;std::string _filename; };// 把msg写到文件中 class HandlerTextSaveFile : public HandlerText { public:HandlerTextSaveFile(const std::string &pathname, const std::string &filename): _pathname(pathname), _filename(filename){if (std::filesystem::exists(_pathname))return;// 递归创建目录结构try{std::filesystem::create_directories(_pathname);}catch (const std::filesystem::filesystem_error &e){std::cerr << e.what() << std::endl;exit(1);}}virtual void Excute(const std::string &msg){if (Stat()){std::cout << "step 2:文本保存到文件" << std::endl;std::string file = _pathname + _filename;std::ofstream append_file(file, std::ios::app);if (!append_file.is_open()){EXIT_ERROR("open file");}// 写入文件,追加写入append_file << msg;append_file.close();}else{std::cout << "step 2:文本保存到文件(已禁用)" << std::endl;}if (_next){_next->Excute(msg);}else{std::cout << "责任链终止在HandlerTextSaveFile" << std::endl;}}private:std::string _pathname;std::string _filename; };const int maxLineSize = 5;// 备份(改名,改名成文件名拼接上时间戳),打包文件,删除源文件 class HandlerTextBackupFile : public HandlerText { private:bool LineOverFlow(){std::string file = _pathname + _filename;std::ifstream read_file(file);if (!read_file.is_open()){EXIT_ERROR("open file");}std::string line;int lineCount = 0;while (std::getline(read_file, line)){lineCount++;}read_file.close();if (lineCount > _maxLineSize){return true;}return false;}// 备份void Backup(){std::string file = _pathname + _filename;// 新文件名std::string backup_filename = _filename + "." + std::to_string((int)time(nullptr));_backup_filename = backup_filename;std::string backup_file = _pathname + backup_filename;// 重命名std::filesystem::rename(file, backup_file);}// 打包并删除源文件void PackAndDelete(){std::string pack_filename = _backup_filename + ".tgz";pid_t pid = fork();if (pid == 0){// 改变子进程工作路径至_pathname用C++方式std::filesystem::current_path(_pathname);// 子进程程序替换执行tar命令,打包文件execlp("tar", "tar", "-czf", pack_filename.c_str(), _backup_filename.c_str(), nullptr);exit(1);}int status;int ret = waitpid(pid, &status, 0);if (ret < 0){EXIT_ERROR("waitpid");}else{// 子进程正常退出,且退出码为0,删除源文件if (WIFEXITED(status) && WEXITSTATUS(status) == 0){std::filesystem::path file = _pathname + _backup_filename;std::filesystem::remove(file);}}}public:HandlerTextBackupFile(const std::string &pathname, const std::string &filename): _pathname(pathname), _filename(filename), _maxLineSize(maxLineSize){}virtual void Excute(const std::string &msg){if (Stat()){std::cout << "step 3:文本备份" << std::endl;// 备份if (LineOverFlow()){std::cout << "超过最大行数限制,备份" << std::endl;Backup();std::cout << "备份文件名:" << _backup_filename << std::endl;std::cout << "打包并删除备份文件" << std::endl;PackAndDelete();}}else{std::cout << "step 3:文本备份(已禁用)" << std::endl;}if (_next){_next->Excute(msg);}else{std::cout << "责任链终止在HandlerTextBackupFile" << std::endl;}}private:std::string _pathname;std::string _filename;std::string _backup_filename;int _maxLineSize; };class HandlerEntry { private:void StatSet(bool e1, bool e2, bool e3){e1 ? _format->Enable() : _format->Disable();e2 ? _save->Enable() : _save->Disable();e3 ? _backup->Enable() : _backup->Disable();}public:HandlerEntry(bool e1, bool e2, bool e3){_format = std::make_shared<HandlerTextFormat>(PATH_DEFAULT, FILE_DEFAULT);_save = std::make_shared<HandlerTextSaveFile>(PATH_DEFAULT, FILE_DEFAULT);_backup = std::make_shared<HandlerTextBackupFile>(PATH_DEFAULT, FILE_DEFAULT);_format->SetNext(_save);_save->SetNext(_backup);StatSet(e1, e2, e3);}void Run(const std::string &msg){_format->Excute(msg);}private:std::shared_ptr<HandlerTextFormat> _format;std::shared_ptr<HandlerTextSaveFile> _save;std::shared_ptr<HandlerTextBackupFile> _backup; };#endif // CHAIN_OF_RESP_HPP