消息队列

为了提供与其他系统的兼容性,Linux 也支持 3 种 system V 的进程间通信机制:消息、信号量(semaphores)和共享内存,Linux 对这些机制的实施大同小异。我们把信号量、消息和共享内存统称 System V IPC 的对象,每一个对象都具有同样类型的接口,即系统调用。就像每个文件都有一个打开文件号一样,每个对象也都有唯一的识别号,进程可以通过系统调用传递的识别号来存取这些对象,与文件的存取一样,对这些对象的存取也要验证存取权限,System V IPC 可以通过系统调用对对象的创建者设置这些对象的存取权限。1
一个或多个进程可向消息队列写入消息,而一个或多个进程可从消息队列中读取消息,这种进程间通信机制通常使用在客户/服务器模型中,客户向服务器发送请求消息,服务器读取消息并执行相应请求。在许多微内核结构的操作系统中,内核和各组件之间的基本通信方式就是消息队列。2


IPC: Inter-Process Communication(进程间通信)

流程

一般流程3如下:

  1. 通过 ftok 获得确定的 IPC Key
  2. 使用 msgget 获得用于消息队列的特定 IPC ID
  3. 使用 msgsnd/msgrcv 收发消息
  4. 使用 msgctl 移除队列

函数

常用函数4如下:

ftok

将路径文件名(存在且可访问)和一个 8bit 标识符转换成 System V IPC 键值。

#include <sys/types.h>
#include <sys/ipc.h>

key_t ftok(const char *pathname, int proj_id)
  • pathname:文件名,必须存在
  • proj_id:仅 8bit 有效,不可为零
  • return:-1,发生错误,可通过 errno 查看;其他返回表示得到的键值


只要 pathnameproj_id 一致则返回的键值就一直,因此两个进程可以通过此函数获得同一个键值。

msgget

建立一个消息队列。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg)
  • key:ftok 中返回的键值
  • msgflg:使用 | 连接
    • 权限:可使用 0666 等表示,也可使用 openmode
    • IPC_CREAT:若不存在则创建
    • IPC_EXCL:若存在则返回错误(和 IPC_CREAT 一起使用)
  • return:-1,发生错误,可通过 errno 查看;非负数,消息队列 ID


系统中的消息队列可通过 ipcs -q 查看。

msgctl

对消息队列的控制,查询、设置、删除。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msqid, int cmd, struct msqid_ds *buf)
  • msqid:msgget 得到 ID
  • cmd:常用如下
    • IPC_STAT:从内核获得消息队列的信息到 buf
    • IPC_SET:通过 buf 修改内核中该消息的一些信息
    • IPC_RMID:从内核移除该消息
  • buf:消息队列的相关信息
  • return:-1,发生错误,可通过 errno 查看


系统中已存在的消息也可通过 ipcrm -q/Q 来删除。

msgsnd/msgrcv

发送/接收消息到队列。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg)
  • msqid:msgget 得到 ID

  • msgp:形式如下

      struct msgbuf {
          long mtype;       /* message type, must be > 0 */
          char mtext[1];    /* message data */
      };
    

    mtype 必须包含,data 部分可以是自定义的数据结构(总长度不能超过 8192 字节)5

  • msgsz:消息长度

  • msgflg:

    • 0:忽略
    • IPC_NOWAIT:消息队列满则返回程序继续执行,否则阻塞
    • 0:忽略
    • IPC_NOWAIT:如果消息队列为空则程序继续执行,否则阻塞
    • MSG_EXCEPT:msgtyp > 0 时作用,取不等于
    • MSG_NOERROR:如果消息大于 msgsz 自动截断丢弃,否则不会被取出
  • msgtyp:

    • 等于 0:取出队列中最早的消息
    • 大于 0:取出消息中最早 mtype 等于此值的消息
    • 小于 0:取出消息中最早 mtype 小于等于此值绝对值的消息
  • return:-1,发生错误,可通过 errno 查看

实例

服务器/客户端例子6

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/stat.h>
#include <sys/msg.h>
#include <signal.h>

#define MSG_FILE "server.c"
#define BUFFER 255
#define PERM S_IRUSR|S_IWUSR

int msgid;

static void clean_exit(int sig)
{
// 从内核中移除消息
if (msgctl(msgid, IPC_RMID, NULL) == -1) {
fprintf(stderr,"Remove Message Error:%s\a\n",strerror(errno));
exit(1);
}
exit(sig);
}

struct msgtype {
long mtype;
char buffer[BUFFER+1];
};

int main()
{
signal(SIGINT, clean_exit);

struct msgtype msg;
key_t key;
// 获得消息键
if((key=ftok(MSG_FILE,'a'))==-1) {
fprintf(stderr,"Creat Key Error:%s\a\n",strerror(errno));
exit(1);
}
// 创建消息
if((msgid=msgget(key,PERM|IPC_CREAT|IPC_EXCL))==-1) {
fprintf(stderr,"Creat Message Error:%s\a\n",strerror(errno));
exit(1);
}
while(1) {
msgrcv(msgid,&msg,sizeof(struct msgtype),1,0);
fprintf(stderr,"Server Receive:%s\n",msg.buffer);
msg.mtype=2;
msgsnd(msgid,&msg,sizeof(struct msgtype),0);
}
exit(0);
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/stat.h>

#define MSG_FILE "../server/server.c"
#define BUFFER 255
#define PERM S_IRUSR|S_IWUSR

struct msgtype {
long mtype;
char buffer[BUFFER+1];
};

int main(int argc,char **argv)
{
struct msgtype msg;
key_t key;
int msgid;

if(argc!=2) {
fprintf(stderr,"Usage:%s string\n\a",argv[0]);
exit(1);
}
if((key=ftok(MSG_FILE,'a'))==-1) {
fprintf(stderr,"Creat Key Error:%s\a\n",strerror(errno));
exit(1);
}
if((msgid=msgget(key,PERM))==-1) {
fprintf(stderr,"Creat Message Error:%s\a\n",strerror(errno));
exit(1);
}
msg.mtype=1;
strncpy(msg.buffer,argv[1],BUFFER);
msgsnd(msgid,&msg,sizeof(struct msgtype),0);
memset(&msg,'\0',sizeof(struct msgtype));
msgrcv(msgid,&msg,sizeof(struct msgtype),2,0);
fprintf(stderr,"Client receive:%s\n",msg.buffer);
exit(0);
}

  1. 1.深入分析Linux内核源代码,7.3
  2. 2.深入分析Linux内核源代码,7.3.2
  3. 3.使用 UNIX System V IPC 机制共享应用程序数据
  4. 4.linux man 手册
  5. 5.深入分析Linux内核源代码,7.3.2,P284
  6. 6.linux c学习笔记----消息队列(ftok,msgget,msgsnd,msgrcv,msgctl)

消息队列
https://wishlily.github.io/article/linux/2016/08/31/undefined/
作者
Wishlily
发布于
2016年8月31日
许可协议