Linux 进程间通信之消息队列:原理 + API 与实战 (System-V IPC)
目录
- System V 消息队列的特点
- msgget函数
- 函数原型
- msgctl函数
- 函数原型
- 操作结构体cmd命令详解
- msgsnd函数
- 函数原型
- 消息结构体
- msgrcv函数
- 函数原型
- 综合练习
- 使用消息队列实现两个程序之间互相聊天
- 利用消息队列配合共享内存,实现A程序每次更改完共享内存,B程序才打印共享内存的数据出来
System V 消息队列的特点
- 内核管理:
- 消息队列由内核维护,进程通过队列的标识符(
msqid
)进行访问。 - 消息在发送方进程和接收方进程之间由内核负责存储和传递。
- 消息队列由内核维护,进程通过队列的标识符(
- 异步通信:
- 发送方发送消息后可以立即返回,无需等待接收方接收。
- 消息会在队列中等待,直到接收方读取。
- 支持消息类型:
- 消息可以带有类型(
mtype
),接收方可以按类型读取特定的消息。
- 消息可以带有类型(
- 消息优先级:
- 类型
mtype
可以作为优先级使用,接收方可以优先读取某些类型的消息。
- 类型
- 持久性:
- 消息队列存在于内核中,直到显式删除(通过
msgctl
删除),即使所有进程退出,队列仍然存在。
- 消息队列存在于内核中,直到显式删除(通过
msgget函数
函数功能:获取一个消息队列的对象的 id,他可以创建及打开一个 IPC 对象的消息队列
函数原型
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int msgget(key_t key, int msgflg);
参数
key
:IPC 对象的 key 值msgflg
: 消息队列操作标记位
IPC_CREAT
:如果消息队列不存在,则创建一个新的队列。IPC_EXCL
:与IPC_CREAT
一起使用,如果队列已存在则返回错误。- 在
IPC_CREAT
基础上 | 上一个八进制数设置权限如0666
返回值
- 成功:返回一个非负整数,表示消息队列的标识符(
msqid
)。- 失败:返回
-1
,并设置
errno
。常见错误码:
EEXIST
:指定了IPC_CREAT | IPC_EXCL
,但消息队列已存在。EINVAL
:key
或
msgflg
参数无效。ENOSPC
:系统已达到消息队列的最大限制。EACCES
:没有权限访问或创建消息队列。
作用
- 创建新的消息队列或获取现有的消息队列
示例:
int msg_id = msgget(key,IPC_CREAT,0666);//创建消息队列并返回消息队列唯一标识符号
msgctl函数
函数功能:对消息队列执行 管理操作,如获取消息队列的信息、设置消息队列的属性、删除消息队列等。
函数原型
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
参数
msqid
:消息队列标识符,由msgget
函数创建
cmd
:控制操作的命令。
IPC_STAT
:获取消息队列的状态信息,填充到buf
中。IPC_SET
:设置消息队列的属性(如权限),从buf
中读取新值。IPC_RMID
:删除消息队列。IPC_INFO
(系统扩展):获取系统级的消息队列限制信息(某些系统可能不支持)。
buf
:指向shmid_ds
结构的指针,用于获取或设置共享内存的属性。
结构体struct msqid_ds
msgctl
的核心结构是msqid_ds
,用于描述消息队列的状态和属性。
struct msqid_ds {struct ipc_perm msg_perm; // 权限和 UID、GID 信息time_t msg_stime; // 上次发送消息的时间time_t msg_rtime; // 上次接收消息的时间time_t msg_ctime; // 队列最后修改的时间unsigned long msg_cbytes; // 当前队列中的字节数unsigned long msg_qnum; // 队列中当前消息数unsigned long msg_qbytes; // 队列的最大字节数pid_t msg_lspid; // 最后发送消息的进程 PIDpid_t msg_lrpid; // 最后接收消息的进程 PID
};
返回值
- 成功:返回
0
。- 失败:返回
-1
,并设置
errno
。
- 常见错误码:
EINVAL
:msqid
或cmd
参数无效。EACCES
:权限不足。EPERM
:尝试删除消息队列,但没有管理员权限。
操作结构体cmd命令详解
IPC_STAT
:
struct msqid_ds buf;
msgctl(msg_id,IPC_STAT,&buf);//获取当前消息队列状态
IPC_RMID
:典型用法,完成队列操作后删除队列,删除后无法继续访问该队列
msgctl(msg_id,IPC_RMID,NULL);//删除队列
msgsnd函数
函数功能:用于 向消息队列发送消息。消息会存储在消息队列中,等待接收进程通过 msgrcv
函数读取。
函数原型
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数
msqid
:消息队列ID,由msgget
创建。
msgp
:指向消息缓冲区的指针,通常是一个
msgbuf
类型的结构。
msgsz
:消息正文的大小(不包括mtype
的大小)
msgflg
:发送行为标志
0
:代表正常发送(缓冲区满了会阻塞等待)IPC_NOWAIT
:如果消息队列已满,立即返回错误,而不是阻塞。
返回值
- 成功则返回 0,失败则返回-1,errno 会被设置
消息结构体
struct msgbuf {long mtype; // 消息类型,必须大于 0char mtext[1]; // 消息正文,实际大小可以大于 1
};
注意这个mtext[]
如果在结构体指定为1
,那么在msgsnd中msgsz
参数要指定大于1
的数如100
,需要动态分配大小,否则会导致未定义行为,导致内存访问越界
mtext[1]
的情况下动态分配正文大小为100
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>// 定义消息结构,mtext[1] 为最小定义
struct msgbuf {long mtype; // 消息类型char mtext[1]; // 消息正文
};int main() {key_t key = ftok("/tmp", 65);int msqid = msgget(key, IPC_CREAT | 0666);if (msqid == -1) {perror("msgget failed");return 1;}// 分配动态内存struct msgbuf *message = malloc(sizeof(struct msgbuf) + 100);//结构体本身大小+100动态分配的字节if (message == NULL) {perror("malloc failed");return 1;}message->mtype = 1; // 设置消息类型strcpy(message->mtext, "This is a long message exceeding the fixed size of 1 byte!");// 使用 msgsz 动态指定正文的实际大小if (msgsnd(msqid, message, strlen(message->mtext) + 1, 0) == -1) {//包含结束符号'\0'perror("msgsnd failed");free(message);return 1;}printf("Message sent: %s\n", message->mtext);free(message);return 0;
}
- 为什么不能直接动态分配
mtext
?- 在 C 中,结构体的大小是固定的(
sizeof(struct)
的结果)。 - 结构体中的数组大小必须在编译时确定,无法在运行时动态扩展。
- 换句话说,
mtext
的大小在编译时已经固定为 1 字节(或者你定义的其他大小,如mtext[100]
),无法动态调整。 - 因此使用最小的
mtext[1]
或mtext[0]
作为占位符,然后通过动态分配额外的内存实现变长(通过指针)。
- 在 C 中,结构体的大小是固定的(
最简单的方式直接定死正文大小,mtext
设置成100
,在msgsnd
的时候也写100
msgrcv函数
函数功能:用于从消息队列中接收消息。它可以根据消息类型进行有选择地接收,也可以按消息到达的顺序接收。
函数原型
include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,int msgflg);
参数
msqid
: 消息队列标识符,由msgget返回
msgp
:指向消息缓冲区的指针,通常是一个
msgbuf
类型的结构。
msgsz
: 正文大小
msgtyp
:接收的消息类型
- 常用取值:
0
:接收队列中第一个消息(按到达顺序)。- 大于
0
:接收指定类型的第一个消息。- 小于
0
:接收类型值小于等于abs(msgtyp)
的优先级最高的消息。
msgflg
:操作标记位
0
:代表正常发送(缓冲区满了会阻塞等待)IPC_NOWAIT
:如果没有符合条件的消息,立即返回错误EAGAIN
。MSG_NOERROR
:如果消息正文大小超过msgsz
,截断消息并不报错。
返回值
- 成功返回实际接收到的消息正文大小(单位:字节)。
- 失败返回
-1
,并设置errno
。
消息结构体
struct msgbuf {long mtype; // 消息类型,必须大于 0char mtext[1]; // 消息正文
};
mtype
:
- 用于标识消息类型。
- 消息发送时由发送端设置,接收端可根据类型有选择地接收消息。
mtext
:
- 存储消息正文。
- 实际大小由
msgsz
决定。
示例:
- 接收任何类型的消息,
mtype
为0
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>// 定义消息结构
struct msgbuf {long mtype;char mtext[100];
};int main() {// 获取消息队列key_t key = ftok("/tmp", 65);int msqid = msgget(key, 0666);if (msqid == -1) {perror("msgget failed");exit(1);}struct msgbuf message;// 接收消息if (msgrcv(msqid, &message, sizeof(message.mtext), 0, 0) == -1) {perror("msgrcv failed");exit(1);}printf("Message received:\nType: %ld\nText: %s\n", message.mtype, message.mtext);return 0;
}
- 接收特定类型的消息
struct msgbuf message;if (msgrcv(msqid, &message, sizeof(message.mtext), 2, 0) == -1) {perror("msgrcv failed");exit(1);
}printf("Received type 2 message: %s\n", message.mtext);
- 接收高优先级的消息:类型值小于等于3的数据
struct msgbuf message;if (msgrcv(msqid, &message, sizeof(message.mtext), -3, 0) == -1) {perror("msgrcv failed");exit(1);
}printf("Received high-priority message: %s\n", message.mtext);
- 处理消息正文大小超过限制:使用
MSG_NOERROR
struct msgbuf message;// 接收消息,并允许截断
if (msgrcv(msqid, &message, 50, 0, MSG_NOERROR) == -1) {perror("msgrcv failed");exit(1);
}printf("Received (truncated) message: %s\n", message.mtext);
综合练习
使用消息队列实现两个程序之间互相聊天
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/msg.h>
#include <signal.h>
#include <sys/wait.h>struct msgbuf {long mtype; /* message type, must be > 0 */char mtext[100]; /* message data */
};int main(int argc, const char *argv[])
{key_t ipc_key;int msg_id;ipc_key = ftok(".", 1); // 获取 IPC 对象的 key 值msg_id = msgget(ipc_key, IPC_CREAT | 0664); // 获取消息队列pid_t pid = fork();if (pid < 0){perror("fork error");}struct msgbuf my_msg;int rcv_size;//子进程接收消息类型为250的信息if(pid == 0){while (1) {/****** 接收来自 B 的消息 (类型 250) ******/rcv_size = msgrcv(msg_id, &my_msg, sizeof(my_msg.mtext), 250, 0); // 接收类型为 250 的消息if (rcv_size == -1) {perror("rcv msg error");exit(1);}printf("B: %s\n", my_msg.mtext);fflush(stdout); // 刷新标准输出缓冲区,确保输出立即显示}}else//父进程发送信号类型为260的信息{/****** 发送消息给 B (类型 260) ******/my_msg.mtype = 260; // 设置发送类型为 260while (1) {fgets(my_msg.mtext, sizeof(my_msg.mtext), stdin); // 从标准输入获取发送内容char *newline = strchr(my_msg.mtext, '\n'); // 查找换行符if (newline) {*newline = '\0'; // 替换换行符为字符串结束符}if (strcmp(my_msg.mtext, "exit") == 0){kill(pid, SIGKILL); // 向子进程发送终止信号wait(NULL); // 等待子进程终止msgctl(msg_id,IPC_RMID,0); // 删除消息队列exit(0); // 退出父进程}if (msgsnd(msg_id, &my_msg, sizeof(my_msg.mtext), 0) == -1) // 发送消息{perror("send msg error");exit(1);}}}return 0;
}
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/msg.h>
#include <signal.h>
#include <sys/wait.h>struct msgbuf {long mtype; /* message type, must be > 0 */char mtext[100]; /* message data */
};int main(int argc, const char *argv[]) {key_t ipc_key;int msg_id;ipc_key = ftok(".", 1); // 获取 IPC 对象的 key 值msg_id = msgget(ipc_key, IPC_CREAT | 0664); // 获取消息队列pid_t pid = fork();if (pid < 0){perror("fork error");exit(1);}struct msgbuf my_msg;int rcv_size;if (pid == 0){while (1){/****** 接收来自 A 的消息 (类型 260) ******/rcv_size = msgrcv(msg_id, &my_msg, sizeof(my_msg.mtext), 260, 0); // 接收类型为 260 的消息if (rcv_size == -1) {perror("rcv msg error");exit(1);}printf("A: %s\n", my_msg.mtext);fflush(stdout);}}else{/****** 发送消息给 A (类型 250) ******/my_msg.mtype = 250; // 设置发送类型为 250while (1) {fgets(my_msg.mtext, 100, stdin); // 从标准输入获取发送内容char *newline = strchr(my_msg.mtext, '\n'); // 查找换行符if (newline) {*newline = '\0'; // 替换换行符为字符串结束符}if (strcmp(my_msg.mtext, "exit") == 0){kill(pid, SIGKILL);//向子进程发送信号,终止子进程wait(NULL);//等待子进程结束msgctl(msg_id, IPC_RMID, NULL); // 删除消息队列exit(0);}if (msgsnd(msg_id, &my_msg, sizeof(my_msg.mtext), 0) == -1) // 发送消息{perror("send msg error");exit(1);}}}return 0;
}
利用消息队列配合共享内存,实现A程序每次更改完共享内存,B程序才打印共享内存的数据出来
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <string.h>
#include <errno.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/msg.h>
#include <sys/shm.h>/*利用消息队列配合共享内存,实现A程序每次更改完共享内存,B程序才打印共享内存的数据出来
*/struct msgbuf {long mtype; /* message type, must be > 0 */char mtext[100]; /* message data */};int main(void)
{char* shm_addr;//共享内存的地址key_t ipc_key = ftok(".", 1);//拿到共享内存的key值key_t msg_key = ftok(".", 2);//拿到消息队列的key值if (ipc_key == -1){fprintf(stderr,"获取IPCkey失败:%s\n",strerror(errno));return -1;}if (msg_key == -1){fprintf(stderr,"获取MSGkey失败:%s\n",strerror(errno));return -1;}int shm_id = shmget(ipc_key,4096,IPC_CREAT|0666); //创建共享内存if (shm_id == -1){perror("创建共享内存失败");return -1;}shm_addr = shmat(shm_id, NULL, 0); //将共享内存映射到当前进程的地址空间if(shm_addr == (void*)-1){fprintf(stderr, "映射共享内存失败:%s\n", strerror(errno));return -1;}int msg_id = msgget(msg_key, IPC_CREAT|0664); //创建消息队列if (msg_id == -1){fprintf(stderr,"创建消息队列失败:%s\n",strerror(errno));return -1;}struct msgbuf msg;//消息队列的数据结构msg.mtype = 1;//消息类型/************主逻辑****************/while (1){/****************写数据入共享内存**********************/printf("请输入要写入共享内存的数据:(输入exit退出)\n");fgets(shm_addr,100,stdin);//写入共享内存char* newline = strchr(shm_addr,'\n');//去除换行符if(newline)*newline = '\0';if(strcmp(shm_addr,"exit") == 0){ //输入exit退出strcpy(msg.mtext,"exit");msgsnd(msg_id, &msg, sizeof(msg.mtext), 0); // 通知 B 程序退出break;}/***************通知B程序共享内存已更新*****************/if (msgsnd(msg_id,&msg,0,0) == -1){perror("消息发送失败");break;} }//解除共享内存映射shmdt(shm_addr);//删除队列msgctl(msg_id, IPC_RMID, NULL);//删除共享内存shmctl(shm_id, IPC_RMID, NULL);return 0;
}
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <string.h>
#include <errno.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/msg.h>
#include <sys/shm.h>struct msgbuf {long mtype; /* message type, must be > 0 */char mtext[100]; /* message data */};//./msgsend 消息类型
int main(int argc, const char *argv[])
{char* shm_addr;//共享内存的地址key_t ipc_key = ftok(".", 1);//拿到共享内存的key值key_t msg_key = ftok(".", 2);//拿到消息队列的key值if (ipc_key == -1){fprintf(stderr,"获取IPCkey失败:%s\n",strerror(errno));return -1;}if (msg_key == -1){fprintf(stderr,"获取MSGkey失败:%s\n",strerror(errno));return -1;}int shm_id = shmget(ipc_key,4096,IPC_CREAT|0666); //创建共享内存if (shm_id == -1){perror("创建共享内存失败");return -1;}shm_addr = (char*)shmat(shm_id, NULL, 0); //将共享内存映射到当前进程的地址空间if(shm_addr == (void*)-1){fprintf(stderr, "映射共享内存失败:%s\n", strerror(errno));return -1;}int msg_id = msgget(msg_key, IPC_CREAT|0664); //创建消息队列if (msg_id == -1){fprintf(stderr,"创建消息队列失败:%s\n",strerror(errno));return -1;}struct msgbuf msg;/**********主逻辑****************/while(1){/*****等待A的通知*************/if(msgrcv(msg_id,&msg,sizeof(msg.mtext),1,0) == -1){perror("msgrvc失败");break;}//如果发送的是exit,则退出if (strcmp(msg.mtext,"exit") == 0){printf("退出\n");break;}/*******打印共享内存的数据**************/printf("共享内存中的数据为:%s\n",shm_addr);}shmdt(shm_addr); //解除共享内存映射return 0;
}
后续会更新FreeRTOS
的消息队列😘