当前位置: 首页 > news >正文

Linux 进程间通信之消息队列:原理 + API 与实战 (System-V IPC)

目录

  • System V 消息队列的特点
  • msgget函数
    • 函数原型
  • msgctl函数
    • 函数原型
    • 操作结构体cmd命令详解
  • msgsnd函数
    • 函数原型
    • 消息结构体
  • msgrcv函数
    • 函数原型
  • 综合练习
    • 使用消息队列实现两个程序之间互相聊天
    • 利用消息队列配合共享内存,实现A程序每次更改完共享内存,B程序才打印共享内存的数据出来

System V 消息队列的特点

  1. 内核管理
    • 消息队列由内核维护,进程通过队列的标识符(msqid)进行访问。
    • 消息在发送方进程和接收方进程之间由内核负责存储和传递。
  2. 异步通信
    • 发送方发送消息后可以立即返回,无需等待接收方接收。
    • 消息会在队列中等待,直到接收方读取。
  3. 支持消息类型
    • 消息可以带有类型(mtype),接收方可以按类型读取特定的消息。
  4. 消息优先级
    • 类型 mtype 可以作为优先级使用,接收方可以优先读取某些类型的消息。
  5. 持久性
    • 消息队列存在于内核中,直到显式删除(通过 msgctl 删除),即使所有进程退出,队列仍然存在。

msgget函数

函数功能:获取一个消息队列的对象的 id,他可以创建及打开一个 IPC 对象的消息队列

函数原型

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int msgget(key_t key, int msgflg);

参数

  • key:IPC 对象的 key 值
  • msgflg: 消息队列操作标记位
    • IPC_CREAT:如果消息队列不存在,则创建一个新的队列。
    • IPC_EXCL:与 IPC_CREAT
      一起使用,如果队列已存在则返回错误。
    • IPC_CREAT基础上 | 上一个八进制数设置权限如0666

返回值

  • 成功:返回一个非负整数,表示消息队列的标识符(msqid)。
  • 失败:返回 -1,并设置
    errno。常见错误码:
    • EEXIST:指定了 IPC_CREAT | IPC_EXCL,但消息队列已存在。
    • EINVALkey
      msgflg 参数无效。
    • ENOSPC:系统已达到消息队列的最大限制。
    • EACCES:没有权限访问或创建消息队列。

作用

  • 创建新的消息队列或获取现有的消息队列

示例:

int msg_id = msgget(key,IPC_CREAT,0666);//创建消息队列并返回消息队列唯一标识符号

msgctl函数

函数功能:对消息队列执行 管理操作,如获取消息队列的信息、设置消息队列的属性、删除消息队列等。

函数原型

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);

参数

msqid:消息队列标识符,由msgget函数创建

cmd:控制操作的命令。

  • IPC_STAT:获取消息队列的状态信息,填充到 buf 中。
  • IPC_SET:设置消息队列的属性(如权限),从 buf 中读取新值。
  • IPC_RMID:删除消息队列。
  • IPC_INFO(系统扩展):获取系统级的消息队列限制信息(某些系统可能不支持)。

buf:指向 shmid_ds 结构的指针,用于获取或设置共享内存的属性。


结构体struct msqid_ds

  • msgctl 的核心结构是 msqid_ds,用于描述消息队列的状态和属性。
struct msqid_ds {struct ipc_perm msg_perm;   // 权限和 UID、GID 信息time_t msg_stime;           // 上次发送消息的时间time_t msg_rtime;           // 上次接收消息的时间time_t msg_ctime;           // 队列最后修改的时间unsigned long msg_cbytes;   // 当前队列中的字节数unsigned long msg_qnum;     // 队列中当前消息数unsigned long msg_qbytes;   // 队列的最大字节数pid_t msg_lspid;            // 最后发送消息的进程 PIDpid_t msg_lrpid;            // 最后接收消息的进程 PID
};

返回值

  • 成功:返回 0
  • 失败:返回 -1,并设置
    errno
    • 常见错误码:
      • EINVALmsqid
        cmd 参数无效。
      • EACCES:权限不足。
      • EPERM:尝试删除消息队列,但没有管理员权限。

操作结构体cmd命令详解

IPC_STAT:

struct msqid_ds buf;
msgctl(msg_id,IPC_STAT,&buf);//获取当前消息队列状态

IPC_RMID:典型用法,完成队列操作后删除队列,删除后无法继续访问该队列

msgctl(msg_id,IPC_RMID,NULL);//删除队列

msgsnd函数

函数功能:用于 向消息队列发送消息。消息会存储在消息队列中,等待接收进程通过 msgrcv 函数读取。

函数原型

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

参数

msqid:消息队列ID,由msgget创建。

msgp:指向消息缓冲区的指针,通常是一个
msgbuf 类型的结构。

msgsz消息正文的大小(不包括 mtype 的大小)

msgflg发送行为标志

  • 0:代表正常发送(缓冲区满了会阻塞等待)
  • IPC_NOWAIT:如果消息队列已满,立即返回错误,而不是阻塞。

返回值

  • 成功则返回 0,失败则返回-1,errno 会被设置

消息结构体

struct msgbuf {long mtype;      // 消息类型,必须大于 0char mtext[1];   // 消息正文,实际大小可以大于 1
};

注意这个mtext[]如果在结构体指定为1,那么在msgsnd中msgsz参数要指定大于1的数如100,需要动态分配大小,否则会导致未定义行为,导致内存访问越界

  • mtext[1]的情况下动态分配正文大小为100
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>// 定义消息结构,mtext[1] 为最小定义
struct msgbuf {long mtype;      // 消息类型char mtext[1];   // 消息正文
};int main() {key_t key = ftok("/tmp", 65);int msqid = msgget(key, IPC_CREAT | 0666);if (msqid == -1) {perror("msgget failed");return 1;}// 分配动态内存struct msgbuf *message = malloc(sizeof(struct msgbuf) + 100);//结构体本身大小+100动态分配的字节if (message == NULL) {perror("malloc failed");return 1;}message->mtype = 1; // 设置消息类型strcpy(message->mtext, "This is a long message exceeding the fixed size of 1 byte!");// 使用 msgsz 动态指定正文的实际大小if (msgsnd(msqid, message, strlen(message->mtext) + 1, 0) == -1) {//包含结束符号'\0'perror("msgsnd failed");free(message);return 1;}printf("Message sent: %s\n", message->mtext);free(message);return 0;
}
  • 为什么不能直接动态分配 mtext
    • 在 C 中,结构体的大小是固定的(sizeof(struct) 的结果)。
    • 结构体中的数组大小必须在编译时确定,无法在运行时动态扩展。
    • 换句话说,mtext 的大小在编译时已经固定为 1 字节(或者你定义的其他大小,如 mtext[100]),无法动态调整。
    • 因此使用最小的 mtext[1]mtext[0] 作为占位符,然后通过动态分配额外的内存实现变长(通过指针)。

最简单的方式直接定死正文大小,mtext设置成100,在msgsnd的时候也写100


msgrcv函数

函数功能:用于从消息队列中接收消息。它可以根据消息类型进行有选择地接收,也可以按消息到达的顺序接收。

函数原型

include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,int msgflg);

参数

msqid: 消息队列标识符,由msgget返回

msgp指向消息缓冲区的指针,通常是一个
msgbuf 类型的结构。

msgsz: 正文大小

msgtyp接收的消息类型

  • 常用取值:
    • 0:接收队列中第一个消息(按到达顺序)。
    • 大于 0:接收指定类型的第一个消息。
    • 小于 0:接收类型值小于等于 abs(msgtyp) 的优先级最高的消息。

msgflg操作标记位

  • 0:代表正常发送(缓冲区满了会阻塞等待)
  • IPC_NOWAIT:如果没有符合条件的消息,立即返回错误 EAGAIN
  • MSG_NOERROR:如果消息正文大小超过 msgsz,截断消息并不报错。

返回值

  • 成功返回实际接收到的消息正文大小(单位:字节)。
  • 失败返回 -1,并设置 errno

消息结构体

struct msgbuf {long mtype;      // 消息类型,必须大于 0char mtext[1];   // 消息正文
};
  • mtype
    • 用于标识消息类型。
    • 消息发送时由发送端设置,接收端可根据类型有选择地接收消息。
  • mtext
    • 存储消息正文。
    • 实际大小由 msgsz 决定。

示例:

  • 接收任何类型的消息,mtype0
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>// 定义消息结构
struct msgbuf {long mtype;char mtext[100];
};int main() {// 获取消息队列key_t key = ftok("/tmp", 65);int msqid = msgget(key, 0666);if (msqid == -1) {perror("msgget failed");exit(1);}struct msgbuf message;// 接收消息if (msgrcv(msqid, &message, sizeof(message.mtext), 0, 0) == -1) {perror("msgrcv failed");exit(1);}printf("Message received:\nType: %ld\nText: %s\n", message.mtype, message.mtext);return 0;
}
  • 接收特定类型的消息
struct msgbuf message;if (msgrcv(msqid, &message, sizeof(message.mtext), 2, 0) == -1) {perror("msgrcv failed");exit(1);
}printf("Received type 2 message: %s\n", message.mtext);
  • 接收高优先级的消息:类型值小于等于3的数据
struct msgbuf message;if (msgrcv(msqid, &message, sizeof(message.mtext), -3, 0) == -1) {perror("msgrcv failed");exit(1);
}printf("Received high-priority message: %s\n", message.mtext);
  • 处理消息正文大小超过限制:使用MSG_NOERROR
struct msgbuf message;// 接收消息,并允许截断
if (msgrcv(msqid, &message, 50, 0, MSG_NOERROR) == -1) {perror("msgrcv failed");exit(1);
}printf("Received (truncated) message: %s\n", message.mtext);

综合练习

使用消息队列实现两个程序之间互相聊天

#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/msg.h>
#include <signal.h>
#include <sys/wait.h>struct msgbuf {long mtype;       /* message type, must be > 0 */char mtext[100];  /* message data */
};int main(int argc, const char *argv[]) 
{key_t ipc_key;int msg_id;ipc_key = ftok(".", 1); // 获取 IPC 对象的 key 值msg_id = msgget(ipc_key, IPC_CREAT | 0664); // 获取消息队列pid_t pid = fork();if (pid < 0){perror("fork error");}struct msgbuf my_msg;int rcv_size;//子进程接收消息类型为250的信息if(pid == 0){while (1) {/****** 接收来自 B 的消息 (类型 250) ******/rcv_size = msgrcv(msg_id, &my_msg, sizeof(my_msg.mtext), 250, 0); // 接收类型为 250 的消息if (rcv_size == -1) {perror("rcv msg error");exit(1);}printf("B: %s\n", my_msg.mtext);fflush(stdout); // 刷新标准输出缓冲区,确保输出立即显示}}else//父进程发送信号类型为260的信息{/****** 发送消息给 B (类型 260) ******/my_msg.mtype = 260; // 设置发送类型为 260while (1) {fgets(my_msg.mtext, sizeof(my_msg.mtext), stdin); // 从标准输入获取发送内容char *newline = strchr(my_msg.mtext, '\n'); // 查找换行符if (newline) {*newline = '\0'; // 替换换行符为字符串结束符}if (strcmp(my_msg.mtext, "exit") == 0){kill(pid, SIGKILL); // 向子进程发送终止信号wait(NULL); // 等待子进程终止msgctl(msg_id,IPC_RMID,0); // 删除消息队列exit(0); // 退出父进程}if (msgsnd(msg_id, &my_msg, sizeof(my_msg.mtext), 0) == -1) // 发送消息{perror("send msg error");exit(1);}}}return 0;
}
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/msg.h>
#include <signal.h>
#include <sys/wait.h>struct msgbuf {long mtype;       /* message type, must be > 0 */char mtext[100];  /* message data */
};int main(int argc, const char *argv[]) {key_t ipc_key;int msg_id;ipc_key = ftok(".", 1); // 获取 IPC 对象的 key 值msg_id = msgget(ipc_key, IPC_CREAT | 0664); // 获取消息队列pid_t pid = fork();if (pid < 0){perror("fork error");exit(1);}struct msgbuf my_msg;int rcv_size;if (pid == 0){while (1){/****** 接收来自 A 的消息 (类型 260) ******/rcv_size = msgrcv(msg_id, &my_msg, sizeof(my_msg.mtext), 260, 0); // 接收类型为 260 的消息if (rcv_size == -1) {perror("rcv msg error");exit(1);}printf("A: %s\n", my_msg.mtext);fflush(stdout);}}else{/****** 发送消息给 A (类型 250) ******/my_msg.mtype = 250; // 设置发送类型为 250while (1) {fgets(my_msg.mtext, 100, stdin); // 从标准输入获取发送内容char *newline = strchr(my_msg.mtext, '\n'); // 查找换行符if (newline) {*newline = '\0'; // 替换换行符为字符串结束符}if (strcmp(my_msg.mtext, "exit") == 0){kill(pid, SIGKILL);//向子进程发送信号,终止子进程wait(NULL);//等待子进程结束msgctl(msg_id, IPC_RMID, NULL); // 删除消息队列exit(0);}if (msgsnd(msg_id, &my_msg, sizeof(my_msg.mtext), 0) == -1) // 发送消息{perror("send msg error");exit(1);}}}return 0;
}

利用消息队列配合共享内存,实现A程序每次更改完共享内存,B程序才打印共享内存的数据出来

#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <string.h>
#include <errno.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/msg.h>
#include <sys/shm.h>/*利用消息队列配合共享内存,实现A程序每次更改完共享内存,B程序才打印共享内存的数据出来
*/struct msgbuf {long mtype;       /* message type, must be > 0 */char mtext[100];    /* message data */};int main(void)
{char* shm_addr;//共享内存的地址key_t ipc_key = ftok(".", 1);//拿到共享内存的key值key_t msg_key = ftok(".", 2);//拿到消息队列的key值if (ipc_key == -1){fprintf(stderr,"获取IPCkey失败:%s\n",strerror(errno));return -1;}if (msg_key == -1){fprintf(stderr,"获取MSGkey失败:%s\n",strerror(errno));return -1;}int shm_id = shmget(ipc_key,4096,IPC_CREAT|0666);       //创建共享内存if (shm_id == -1){perror("创建共享内存失败");return -1;}shm_addr = shmat(shm_id, NULL, 0);                     //将共享内存映射到当前进程的地址空间if(shm_addr == (void*)-1){fprintf(stderr, "映射共享内存失败:%s\n", strerror(errno));return -1;}int msg_id = msgget(msg_key, IPC_CREAT|0664);          //创建消息队列if (msg_id == -1){fprintf(stderr,"创建消息队列失败:%s\n",strerror(errno));return -1;}struct msgbuf msg;//消息队列的数据结构msg.mtype = 1;//消息类型/************主逻辑****************/while (1){/****************写数据入共享内存**********************/printf("请输入要写入共享内存的数据:(输入exit退出)\n");fgets(shm_addr,100,stdin);//写入共享内存char* newline = strchr(shm_addr,'\n');//去除换行符if(newline)*newline = '\0';if(strcmp(shm_addr,"exit") == 0){	//输入exit退出strcpy(msg.mtext,"exit");msgsnd(msg_id, &msg, sizeof(msg.mtext), 0); // 通知 B 程序退出break;}/***************通知B程序共享内存已更新*****************/if (msgsnd(msg_id,&msg,0,0) == -1){perror("消息发送失败");break;}	}//解除共享内存映射shmdt(shm_addr);//删除队列msgctl(msg_id, IPC_RMID, NULL);//删除共享内存shmctl(shm_id, IPC_RMID, NULL);return 0;
}
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <string.h>
#include <errno.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/msg.h>
#include <sys/shm.h>struct msgbuf {long mtype;       /* message type, must be > 0 */char mtext[100];    /* message data */};//./msgsend  消息类型
int main(int argc, const char *argv[])
{char* shm_addr;//共享内存的地址key_t ipc_key = ftok(".", 1);//拿到共享内存的key值key_t msg_key = ftok(".", 2);//拿到消息队列的key值if (ipc_key == -1){fprintf(stderr,"获取IPCkey失败:%s\n",strerror(errno));return -1;}if (msg_key == -1){fprintf(stderr,"获取MSGkey失败:%s\n",strerror(errno));return -1;}int shm_id = shmget(ipc_key,4096,IPC_CREAT|0666);       //创建共享内存if (shm_id == -1){perror("创建共享内存失败");return -1;}shm_addr = (char*)shmat(shm_id, NULL, 0);                     //将共享内存映射到当前进程的地址空间if(shm_addr == (void*)-1){fprintf(stderr, "映射共享内存失败:%s\n", strerror(errno));return -1;}int msg_id = msgget(msg_key, IPC_CREAT|0664);          //创建消息队列if (msg_id == -1){fprintf(stderr,"创建消息队列失败:%s\n",strerror(errno));return -1;}struct msgbuf msg;/**********主逻辑****************/while(1){/*****等待A的通知*************/if(msgrcv(msg_id,&msg,sizeof(msg.mtext),1,0) == -1){perror("msgrvc失败");break;}//如果发送的是exit,则退出if (strcmp(msg.mtext,"exit") == 0){printf("退出\n");break;}/*******打印共享内存的数据**************/printf("共享内存中的数据为:%s\n",shm_addr);}shmdt(shm_addr); //解除共享内存映射return 0;
}

后续会更新FreeRTOS的消息队列😘

http://www.xdnf.cn/news/44569.html

相关文章:

  • 人工智能-机器学习其他技术(决策树,异常检测,主成分分析)
  • 论文笔记(七十八)Do generative video models understand physical principles?
  • vscode使用技巧
  • SpringBoot 3 与 SpringDoc 打造完美接口文档
  • 面试常用基础算法
  • JSON-RPC远程控制
  • Linux中的信号量
  • 健身房管理系统设计与实现(springboot+ssm+vue+mysql)含万字详细文档
  • 01.04、回文排序
  • AI日报 - 2025年04月21日
  • 高效获取淘宝实时商品数据:API 接口开发与数据采集实战指南
  • Vue3核心源码解析
  • nvm管理node版本 与 nvm常用指令的使用
  • SpringBoot3集成ES8.15实现余额监控
  • Docker镜像仓库
  • 深拷贝和浅拷贝的区别
  • React Router V7使用详解
  • LeetCode[232]用栈实现队列
  • PySide6 GUI 学习笔记——常用类及控件使用方法(常用类矩阵QRect)
  • Hello, Dirty page
  • 【Flutter】使用LiveKit和Flutter构建实时视频聊天应用
  • Linux操作系统--进程的创建和终止
  • java面试篇(常见的集合底层原理)
  • 中国占全球工业机器人装机量的52%,国产机器人崛起加速洗牌,拆分机器人业务独立上市,软硬件协同增强,AI工业机械臂催生业务再增长
  • Opencv图像处理:轮廓检测、轮廓近似、绘制外接圆外接矩形
  • Linux学习——TCP
  • Viper配置管理笔记
  • 基于springboot+vue的仓库管理系统
  • AI日报 - 2025年04月19日
  • 《Operating System Concepts》阅读笔记:p748-p748