消息队列是 Linux IPC 中很常用的一种通信方式,今天分析一下Posix消息队列,本文中所讲的消息队列均为Posix消息队列。
什么是 Posix 消息队
消息队列可以认为它是一个消息链表,有足够写权限的进程可以往队列中发送消息,有足够读权限的进程可以往队列中接收消息。
每个消息都是一个记录,它由发送者赋予一个优先级。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。这也就说明消息队列具有随内核的持续性,也就是说进程关闭后,消息队列依然存在,除非内核重新自举。
Posix 消息队列有如下特点:
- 对Posix消息队列的读总是返回优先级最高最早的消息。
- 当往空的消息队列中放置一个消息时,Posix消息队列允许产生一个信号或者启动一个接收线程。
Posix 消息队列中的每条消息通常具有以下属性:
- 一个表示优先级的整数;
- 消息的数据部分的长度;
- 消息数据本身;
消息队列的基本操作
打开或创建一个 posix 消息队列操作接口
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
Link with -lrt.
参数 name 为 posix IPC 名字, 即将要被打开或创建的消息队列对象,为了便于移植,需要指定为“/name”的格式。
参数oflag必须要有O_RDONLY(只读)、标志O_RDWR (读写), O_WRONLY(只写)之一,除此之外还可以指定 O_CREAT(没有该对象则创建)、O_EXCL(如果 O_CREAT 指定,但 name 不存在,就返回错误),O_NONBLOCK(以非阻塞方式打开消息队列,在正常情况下mq_receive和mq_send 函数会阻塞的地方,使用该标志打开的消息队列会返回 EAGAIN 错误)。
当操作一个新队列时,使用 O_CREAT 标识,此时后面两个参数需要被指定,参数 mode 为指定权限位,attr 指定新创建队列的属性。
关闭进程描述符操作接口
int mq_close(mqd_t mqdes);
关闭之后告诉进程不在使用该描述符,但消息队列不会从系统中删除。
系统中删除某个消息队列操作接口
int mq_unlink(const char *name);
参数为mq_open()函数第一个参数,调用该接口后删除会马上发生,即使该队列的描述符引用计数仍然大于0。
关于消息队列中设置和和获取消息队列属性接口
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
每个消息队列有四个属性, mq_getattr返回所有的这些属性, mq_setattr设置其中的某个属性
消息队列的消息具体属性如下
struct mq_attr {
long mq_flags; /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue */
}
指向mq_attr的指针可以作为mq_open函数的第四个参数传递, 从而在创建队列初就设置好每个消息的最大长度和允许存在的最大消息数量, 另外两个成员被忽略。
mq_setattr给所指定队列设置属性, 但是只使用由attr指向的mq_attr结构的mq_flags成员, 以设置或清除非阻塞标志, 其他三个成员则被忽略(其中两个只能在创建队列时指定, 还有一个及时获取)。当然, mq_setattr的最后一个参数用于接收之前的属性和当前状态
向消息队列放置和取走消息的操作接口
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
参数msg_ptr为指向消息的指针。
msg_len为消息长度,该值不能大于属性值中mq_msgsize的值。
msg_prio为优先级,消息在队列中将按照优先级大小顺序来排列消息。
如果消息队列已满,mq_send()函数将阻塞,直到队列有可用空间再次允许放置消息或该调用被信号打断;如果O_NONBLOCK被指定,mq_send()那么将不会阻塞,而是返回EAGAIN错误。
如果队列空,mq_receive()函数将阻塞,直到消息队列中有新的消息;如果O_NONBLOCK被指定,mq_receive()那么将不会阻塞,而是返回EAGAIN错误。
消息队列的原理分析
消息队列的初始化
static int __init init_mqueue_fs(void)
{
...
//注册消息队列文件系统
error = register_filesystem(&mqueue_fs_type);
//构建struct vfsmount结构主要是获取文件系统的super_block对象与根目录的inode与dentry对象,并将这些对象加入到系统链表
if (IS_ERR(mqueue_mnt = kern_mount(&mqueue_fs_type))) {
...
}
queues_count = 0;
spin_lock_init(&mq_lock);
return 0;
out_filesystem:
out_sysctl:
return error;
}
__initcall(init_mqueue_fs);
消息队列文件系统初始化很简单,主要工作如下:
- 注册文件系统,把 mqueue_fs_type 加入到 file_systems 链表中。
- 构建struct vfsmount结构,把获取的文件系统的super_block对象与根目录的 inode 与 dentry 对象,并将这些对象加入到系统链表中。
mq_open 接口分析
asmlinkage long sys_mq_open(const char __user *u_name, int oflag, mode_t mode, struct mq_attr __user *u_attr)
{
...
//获取一个未使用的文件描述符
fd = get_unused_fd();
mutex_lock(&mqueue_mnt->mnt_root->d_inode->i_mutex);
//获取一个名字为name的dentry结构
dentry = lookup_one_len(name, mqueue_mnt->mnt_root, strlen(name));
mntget(mqueue_mnt);
//若是新创建
if (oflag & O_CREAT) {
if (dentry->d_inode) { /* entry already exists */
audit_inode(name, dentry);
error = -EEXIST;
if (oflag & O_EXCL)
goto out;
//若已经存在,则直接打开file
filp = do_open(dentry, oflag);
} else {
//创建一个file结构,把inode和dentry与之关联
filp = do_create(mqueue_mnt->mnt_root, dentry,
oflag, mode, u_attr);
}
} else { //否则,直接获取
error = -ENOENT;
if (!dentry->d_inode)
goto out;
audit_inode(name, dentry);
filp = do_open(dentry, oflag);
}
if (IS_ERR(filp)) {
error = PTR_ERR(filp);
goto out_putfd;
}
//给描述符设置close_on_exec标志
set_close_on_exec(fd, 1);
//文件描述符与file进行关联
fd_install(fd, filp);
goto out_upsem;
...
return fd;
}
mq_open 的操作很简单,操作如下:
- 获取一个未使用的文件描述符 fd;
- 根据参数name获取一个 dentry;
- 根据 oflag 表示判断是否是新创建还是使用已存在的 file,若新创建,则生成一个 file 结构,同时与 fd 、 inode、dentry 进行关联;
- 否则打开一个已存在的file。
mq_send 接口分析
当使用 mq_send 发送消息时,比如如下调用,
mq_send(mqd, msg, msg_len, msg_prio)
发送消息时,最终会调用如下函数
mq_timedsend(mqd, msg, msg_len, msg_prio, NULL)
asmlinkage long sys_mq_timedsend(mqd_t mqdes, const char __user *u_msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec __user *u_abs_timeout)
{
...
//获取file结构
filp = fget(mqdes);
if (unlikely(!filp))
goto out;
inode = filp->f_path.dentry->d_inode;
//获取inode 下的 mqueue_inode_info
info = MQUEUE_I(inode);
//把用户传来的消息转换成内核消息链表,若用户消息长度大于PAGE_SIZE,则链表结构为 msg_msg-> msg_msgseg -> msg_msgseg 每个节点都已一个页大小,除了最后一个节点
//若用户消息大小小于PAGE_SIZE,链表只有一个节点 msg_msg
// msg_ptr 指向链表头节点 msg_msg
msg_ptr = load_msg(u_msg_ptr, msg_len);
//msg_msg 中记录消息的总长度,和 消息优先级
msg_ptr->m_ts = msg_len;
msg_ptr->m_type = msg_prio;
spin_lock(&info->lock);
//若消息数量达到最大值
if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) {
//若非阻塞,则返回
if (filp->f_flags & O_NONBLOCK) {
spin_unlock(&info->lock);
ret = -EAGAIN;
//若超时时间小于0,则返回超时时间
} else if (unlikely(timeout < 0)) {
spin_unlock(&info->lock);
ret = timeout;
} else {
//阻塞调用,则进程休眠,把wait加入到info中的SEND等待队列中
wait.task = current;
wait.msg = (void *) msg_ptr;
wait.state = STATE_NONE;
ret = wq_sleep(info, SEND, timeout, &wait);
}
} else {
//若info 接收队列中有阻塞的进程,则把要发送的数据挂到阻塞的进程的消息节点上,然后唤醒阻塞的接收进程
receiver = wq_get_first_waiter(info, RECV);
if (receiver) {
pipelined_send(info, msg_ptr, receiver);
} else {
//把消息挂到消息队列上,并进行通知
msg_insert(msg_ptr, info);
__do_notify(info);
}
inode->i_atime = inode->i_mtime = inode->i_ctime =
CURRENT_TIME;
spin_unlock(&info->lock);
ret = 0;
}
return ret;
}
sys_mq_timedsend 功能如下:
- 发送消息前先把用户消息转换成消息链表。
- 若消息队列满了,则根据条件进行是否阻塞。
- 消息队列未满,若接收队列中存在阻塞的接收进程,则把要发送的数据挂到阻塞的进程的消息节点上,然后唤醒阻塞的接收进程;否则把消息加到消息队列中。
mq_receive 接口分析
当使用 mq_receive 接收消息时,比如如下调用,
接收消息时,最终会调用如下函数
mq_timedreceive(mqd, msg, msg_len, &msg_prio, NULL)具体实现如下
asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char __user *u_msg_ptr, size_t msg_len, unsigned int __user *u_msg_prio, const struct timespec __user *u_abs_timeout)
{
...
//获取file结构
filp = fget(mqdes);
inode = filp->f_path.dentry->d_inode;
//获取inode 下的 mqueue_inode_info
info = MQUEUE_I(inode);
audit_inode(NULL, filp->f_path.dentry);
spin_lock(&info->lock);
//消息队列当前没有消息
if (info->attr.mq_curmsgs == 0) {
//若非阻塞,则返回
if (filp->f_flags & O_NONBLOCK) {
spin_unlock(&info->lock);
ret = -EAGAIN;
msg_ptr = NULL;
//若超时时间小于0,则返回超时时间
} else if (unlikely(timeout < 0)) {
} else {
//阻塞调用,则进程休眠,把wait加入到info中的RECV等待队列中
wait.task = current;
wait.state = STATE_NONE;
ret = wq_sleep(info, RECV, timeout, &wait);
msg_ptr = wait.msg;
}
} else {
//从消息队列的最高优先级中获取一个消息
msg_ptr = msg_get(info);
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
//由于从消息队列中已经取出一个消息了,有剩余的空间,因此把等待SEND队列上的进程的消息挂到消息队里中,然后唤醒发送消息的进程
pipelined_receive(info);
spin_unlock(&info->lock);
ret = 0;
}
if (ret == 0) {
ret = msg_ptr->m_ts;
//把消息拷贝到用户态
if ((u_msg_prio && put_user(msg_ptr->m_type, u_msg_prio)) ||
store_msg(u_msg_ptr, msg_ptr, msg_ptr->m_ts)) {
ret = -EFAULT;
}
//释放消息空间
free_msg(msg_ptr);
}
out_fput:
fput(filp);
out:
return ret;
}
sys_mq_timedreceive 功能如下:
- 获取消息前,先判断消息队列中是否有消息,若没有,则根据条件进行是否阻塞。
- 若消息队列中有消息,则从消息队列的获取一个最高优先级的消息。
- 唤醒因为消息队列满而阻塞的发送进程。
- 把消息拷贝到用户缓冲区中。
mq_unlink 接口分析
asmlinkage long sys_mq_unlink(const char __user *u_name)
{
...
//引用计数减1,删除目录项
err = vfs_unlink(dentry->d_parent->d_inode, dentry);
return err;
}
该函数作用很简单,就是减少引用计数,删除目录项。
经过上述的分析,有关消息队列的内存结构可以总结如下: