消息队列亦称报文队列,也叫做信箱。意思是说,这种通信机制传递的数据具有某种结构,而不是简单的字节流。消息队列的工作机制如下所示:
消息的结构
用户空间的消息缓冲区
为在进程与内核之间传递消息,无论发送进程还是接收进程,都需要在进程空间中用消息缓冲区来暂存消息。该消息缓冲区的结构定义如下:
struct msgbuf { long mtype; /* 消息的类型 */ char mtext[1]; /* 消息正文 */ };
从这个缓冲区的定义可以看到,消息通信的最大特点在于,发送进程可以在域mtype中定义消息的类型,这样就为接收进程提供了一个方便,即接收进程可以根据mtype来判断队列中的一个消息是否为它所等待的消息,从而使接收进程可以有选择地进行接收。
域mtext[]为存放消息正文的数组,发送进程可以根据消息的大小定义该数组的长度。
内核空间的消息结构
为便于内核对消息的维护和管理,以及要将大型消息分页存放,所以内存中用于消息通信的数据结构要比进程消息缓冲区的结构稍微复杂一些。内核空间消息结构分为首页结构和一般页结构。其首页结构msg_msg的定义如下:
struct msg_msg { struct list_head m_list; long m_type; //消息结构 int m_ts; /* 消息文本大小 */ struct msg_msgseg* next; //下一个消息片段页 void *security; //下面是可能存在的消息内容 };
当进程发送消息时,负责发送消息的系统调用会把进程空间的msgbuf中的消息正文复制到内核空间消息首页结构msg_msg的后面。
- 当消息的大小加上msg_msg结构的大小小于一个页面时,消息正文紧跟着msg_msg结构的后面存放;
- 当消息的大小加上msg_msg结构的大小大于一个页面时,则需要将消息分段存放。即消息正文的开头部分存放在首页结构msg_msg中,剩余部分则分别存放在多个一般页结构msg_msgseg中,然后把首页和一般页按逻辑顺序用指针next连接成链表形成一个消息。
一般页结构msg_msgseg的定义如下:
struct msg_msgseg { struct msg_msgseg* next; /* the next part of the message follows immediately */ };
一个大型消息的结构如下所示:
消息队列的结构
每个消息队列都有一个msg_queue结构类型的队列头。msg_queue结构的定义如下:
struct msg_queue { struct kern_ipc_perm q_perm; time_t q_stime; /* 最后发送消息时间 */ time_t q_rtime; /* 最后接收消息时间 */ time_t q_ctime; /* 最后改变时间 */ unsigned long q_cbytes; /* 当前队列字节数 */ unsigned long q_qnum; /* 当前队列消息数 */ unsigned long q_qbytes; /* 队列的最大字节数 */ pid_t q_lspid; /* 最后发送消息进程PID */ pid_t q_lrpid; /* 最后接受消息进程PID */ struct list_head q_messages; //消息队列 struct list_head q_receivers; //接收信号进程等待队列 struct list_head q_senders; //发送消息进程等待队列 };
消息队列的结构图如下所示:
结构msg_queue中还分别记录了接收和发送进程的等待队列。每个正在等待接收进程的描述结构如下:
struct msg_receiver { struct list_head r_list; struct task_struct *r_tsk; //进程控制块指针 int r_mode; //读方式 long r_msgtype; //读的消息类型 long r_maxsize; //读的消息最大尺寸 struct msg_msg *volatile r_msg; //消息指针 };
每个正在等待发送进程的描述结构如下:
struct msg_sender { struct list_head list; struct task_struct *tsk; //进程控制块指针 };
与共享内存的管理方式一样,Linux把所有消息队列都组织在一个数组中。在文件linux/ipc/util.h如下:
struct ipc_id_ary { int size; struct kern_ipc_perm *p[0]; //存放段描述结构的数组 };
同时,在文件linux/ipc/msg.c中也定义了一个全局变量msg_ids来管理消息队列:
static struct ipc_ids msg_ids;
这个变量的数据类型为ipc_ids结构。在文件linux/ipc/util.h中声明的ipc_ids结构代码如下:
struct ipc_ids { int in_use; unsigned short seq; unsigned short seq_max; struct rw_semaphore rw_mutex; struct idr ipcs_idr; struct ipc_id_ary *entries; //指向struct ipc_id_ary的指针 };
消息队列的整体结构如下所示:
消息队列的创建与打开
头文件:
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h>
消息队列的内核持续性要求每个消息队列都在系统范围内对应唯一的键值,所以,要获得一个消息队列的描述符,只需提供该消息队列的键值即可。
消息队列描述字是由在系统范围内唯一的键值生成的,而键值可以看作对应系统内的一条路径。
进程可以通过调用函数msgget()来创建一个消息队列。msgget()对应的系统调用如下:
int msgget(key_t key, int msgflg); asmlinkage long sys_msgget(key_t key, int msgflg);
其中,参数key是用户给定的键值。如果该值为0,系统会为进程创建一个进程自用的消息队列,即供其自发自收;否则创建或打开一个消息队列。参数msgflg是该函数的功能标志。
消息队列的读写
消息读写操作非常简单,对于开发人员来说,每个消息都类似于如下的数据结构,即需要用户声明一个数据结构:
struct msgbuf { long mtype; /* 消息的类型 */ char mtext[1]; /* 消息正文 */ };
其中,mtype成员代表消息类型,从消息队列中读取消息的一个重要依据就是消息的类型;mtext是消息内容,当然长度不一定为1。
对于发送消息来说,首先预置一个msgbuf缓冲区并写入消息类型和内容,调用相应的发送函数即可;对于读取消息来说,首先分配这样一个msgbuf缓冲区,然后把消息读入该缓冲区即可。
int msgsnd(int msqid, struct msgbuf * msgp, int msgsz, int msgflg);
向消息队列发送一条消息。其中,msqid为已打开的消息队列ID;msgp为存放消息的结构;msgsz为消息数据的长度;msgflg为发送标志。有意义的msgflg标志为IPC_NOWAIT,指明在消息队列没有足够空间容纳要发送的消息时,msgsnd是否等待。
int msgrcv(int msqid, struct msgbuf * msgp, int msgsz, long msgtyp, int msgflg);
从msqid代表的消息队列中读取一个消息,并把消息存放在msgp指向的msgbuf结构中。在成功地读取了一条消息后,队列中的这条消息将被删除。
操作成功时返回0,失败返回-1。
例子:父进程向消息队列发送消息,子进程从消息队列接收消息。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <stdio.h> #include <string.h> #include <stdlib.h> int main() { int pid, msqid; struct msgbuf { long mtype; char mtext[20]; }send_buf, receive_buf; if ((msqid = msgget(IPC_PRIVARE, 0700)) < 0) { //创建消息队列 printf("msgget建立消息队列失败"); exit(1); }else printf("msgget建立消息队列成功"); if ((pid = fork()) < 0) { //创建子进程 printf("fork()函数调用失败"); exit(2); }else if (pid > 0) { //父进程,发送消息到消息队列 send_buf.mtype = 1; strcpy(send_buf.mtext, "Hello World"); printf("发送到消息队列的信息内容为:%s", send_buf.mtext); if (msgsnd(msqid, &send_buf, 20, IPC_NOWAIT) < 0) { //发送消息到消息队列 printf("消息发送失败"); exit(3); }else printf("消息发送成功"); sleep(2); exit(0); }else { //子进程,从消息队列中接收消息 sleep(2); int infolen; if ((infolen = msgrcv(msqid, &receive_buf, 20, 0, IPC_NOWAIT)) < 0) { //从消息队列中接收消息 printf("消息读取错误"); exit(4); }else printf("消息读取成功"); if ((msgctl(msqid, IPC_RWID, NULL)) < 0) { //删除消息队列中的消息 printf("消息删除错误"); exit(5); } else printf("消息删除成功"); exit(0); } return 0; }