百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

进程间通信(IPC) 系列 Posix 消息队列

toyiye 2024-06-21 12:21 14 浏览 0 评论

消息队列是 Linux IPC 中很常用的一种通信方式,今天分析一下Posix消息队列,本文中所讲的消息队列均为Posix消息队列。

什么是 Posix 消息队

消息队列可以认为它是一个消息链表,有足够写权限的进程可以往队列中发送消息,有足够读权限的进程可以往队列中接收消息。

每个消息都是一个记录,它由发送者赋予一个优先级。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。这也就说明消息队列具有随内核的持续性,也就是说进程关闭后,消息队列依然存在,除非内核重新自举。


Posix 消息队列有如下特点

  • 对Posix消息队列的读总是返回优先级最高最早的消息。
  • 当往空的消息队列中放置一个消息时,Posix消息队列允许产生一个信号或者启动一个接收线程。


Posix 消息队列中的每条消息通常具有以下属性

  • 一个表示优先级的整数;
  • 消息的数据部分的长度;
  • 消息数据本身;


消息队列的基本操作

打开或创建一个 posix 消息队列操作接口


mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
Link with -lrt.

参数 name 为 posix IPC 名字, 即将要被打开或创建的消息队列对象,为了便于移植,需要指定为“/name”的格式。

参数oflag必须要有O_RDONLY(只读)、标志O_RDWR (读写), O_WRONLY(只写)之一,除此之外还可以指定 O_CREAT(没有该对象则创建)、O_EXCL(如果 O_CREAT 指定,但 name 不存在,就返回错误),O_NONBLOCK(以非阻塞方式打开消息队列,在正常情况下mq_receive和mq_send 函数会阻塞的地方,使用该标志打开的消息队列会返回 EAGAIN 错误)。

当操作一个新队列时,使用 O_CREAT 标识,此时后面两个参数需要被指定,参数 mode 为指定权限位,attr 指定新创建队列的属性。


关闭进程描述符操作接口

int mq_close(mqd_t mqdes);  

关闭之后告诉进程不在使用该描述符,但消息队列不会从系统中删除。


系统中删除某个消息队列操作接口

int mq_unlink(const char *name);  

参数为mq_open()函数第一个参数,调用该接口后删除会马上发生,即使该队列的描述符引用计数仍然大于0。


关于消息队列中设置和和获取消息队列属性接口

mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr); 


每个消息队列有四个属性, mq_getattr返回所有的这些属性, mq_setattr设置其中的某个属性

消息队列的消息具体属性如下

struct mq_attr {
long mq_flags; /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue */
}


指向mq_attr的指针可以作为mq_open函数的第四个参数传递, 从而在创建队列初就设置好每个消息的最大长度和允许存在的最大消息数量, 另外两个成员被忽略。

mq_setattr给所指定队列设置属性, 但是只使用由attr指向的mq_attr结构的mq_flags成员, 以设置或清除非阻塞标志, 其他三个成员则被忽略(其中两个只能在创建队列时指定, 还有一个及时获取)。当然, mq_setattr的最后一个参数用于接收之前的属性和当前状态


向消息队列放置和取走消息的操作接口

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); 
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); 


参数msg_ptr为指向消息的指针。

msg_len为消息长度,该值不能大于属性值中mq_msgsize的值。

msg_prio为优先级,消息在队列中将按照优先级大小顺序来排列消息。

如果消息队列已满,mq_send()函数将阻塞,直到队列有可用空间再次允许放置消息或该调用被信号打断;如果O_NONBLOCK被指定,mq_send()那么将不会阻塞,而是返回EAGAIN错误。

如果队列空,mq_receive()函数将阻塞,直到消息队列中有新的消息;如果O_NONBLOCK被指定,mq_receive()那么将不会阻塞,而是返回EAGAIN错误。

消息队列的原理分析

消息队列的初始化

static int __init init_mqueue_fs(void)
{
        ...

        //注册消息队列文件系统
        error = register_filesystem(&mqueue_fs_type);

        //构建struct vfsmount结构主要是获取文件系统的super_block对象与根目录的inode与dentry对象,并将这些对象加入到系统链表
        if (IS_ERR(mqueue_mnt = kern_mount(&mqueue_fs_type))) {
            ...
        }

        queues_count = 0;
        spin_lock_init(&mq_lock);

        return 0;

out_filesystem:

out_sysctl:

        return error;
}

__initcall(init_mqueue_fs);


消息队列文件系统初始化很简单,主要工作如下:

  • 注册文件系统,把 mqueue_fs_type 加入到 file_systems 链表中。
  • 构建struct vfsmount结构,把获取的文件系统的super_block对象与根目录的 inode 与 dentry 对象,并将这些对象加入到系统链表中。


mq_open 接口分析

asmlinkage long sys_mq_open(const char __user *u_name, int oflag, mode_t mode, struct mq_attr __user *u_attr)
{

        ...


        //获取一个未使用的文件描述符
        fd = get_unused_fd();

        mutex_lock(&mqueue_mnt->mnt_root->d_inode->i_mutex);
        //获取一个名字为name的dentry结构  
        dentry = lookup_one_len(name, mqueue_mnt->mnt_root, strlen(name));

        mntget(mqueue_mnt);
        //若是新创建
        if (oflag & O_CREAT) {
                if (dentry->d_inode) {  /* entry already exists */
                        audit_inode(name, dentry);
                        error = -EEXIST;
                        if (oflag & O_EXCL)
                                goto out;
                        //若已经存在,则直接打开file
                        filp = do_open(dentry, oflag);
                } else {
                        //创建一个file结构,把inode和dentry与之关联
                        filp = do_create(mqueue_mnt->mnt_root, dentry,
                                                oflag, mode, u_attr);
                }
        } else { //否则,直接获取
                error = -ENOENT;
                if (!dentry->d_inode)
                        goto out;
                audit_inode(name, dentry);
                filp = do_open(dentry, oflag);
        }

        if (IS_ERR(filp)) {
                error = PTR_ERR(filp);
                goto out_putfd;
        }

        //给描述符设置close_on_exec标志
        set_close_on_exec(fd, 1);

        //文件描述符与file进行关联
        fd_install(fd, filp);

        goto out_upsem;

        ...

        return fd;
}


mq_open 的操作很简单,操作如下:

  • 获取一个未使用的文件描述符 fd;
  • 根据参数name获取一个 dentry;
  • 根据 oflag 表示判断是否是新创建还是使用已存在的 file,若新创建,则生成一个 file 结构,同时与 fd 、 inode、dentry 进行关联;
  • 否则打开一个已存在的file。


mq_send 接口分析

当使用 mq_send 发送消息时,比如如下调用,

mq_send(mqd, msg, msg_len, msg_prio) 

发送消息时,最终会调用如下函数

mq_timedsend(mqd, msg, msg_len, msg_prio, NULL)


asmlinkage long sys_mq_timedsend(mqd_t mqdes, const char __user *u_msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec __user *u_abs_timeout)
{ 
...

//获取file结构
filp = fget(mqdes);
if (unlikely(!filp))
goto out;

inode = filp->f_path.dentry->d_inode;
//获取inode 下的 mqueue_inode_info
info = MQUEUE_I(inode);

//把用户传来的消息转换成内核消息链表,若用户消息长度大于PAGE_SIZE,则链表结构为 msg_msg-> msg_msgseg -> msg_msgseg 每个节点都已一个页大小,除了最后一个节点
//若用户消息大小小于PAGE_SIZE,链表只有一个节点 msg_msg 
// msg_ptr 指向链表头节点 msg_msg
msg_ptr = load_msg(u_msg_ptr, msg_len);
//msg_msg 中记录消息的总长度,和 消息优先级
msg_ptr->m_ts = msg_len; 
msg_ptr->m_type = msg_prio;

spin_lock(&info->lock);
//若消息数量达到最大值
if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) {
//若非阻塞,则返回
if (filp->f_flags & O_NONBLOCK) {
spin_unlock(&info->lock);
ret = -EAGAIN;
//若超时时间小于0,则返回超时时间
} else if (unlikely(timeout < 0)) {
spin_unlock(&info->lock);
ret = timeout;
} else {
//阻塞调用,则进程休眠,把wait加入到info中的SEND等待队列中
wait.task = current;
wait.msg = (void *) msg_ptr;
wait.state = STATE_NONE;
ret = wq_sleep(info, SEND, timeout, &wait);
}

} else {
//若info 接收队列中有阻塞的进程,则把要发送的数据挂到阻塞的进程的消息节点上,然后唤醒阻塞的接收进程
receiver = wq_get_first_waiter(info, RECV);
if (receiver) {
pipelined_send(info, msg_ptr, receiver);
} else {
//把消息挂到消息队列上,并进行通知
msg_insert(msg_ptr, info);
__do_notify(info);
}
inode->i_atime = inode->i_mtime = inode->i_ctime =
CURRENT_TIME;
spin_unlock(&info->lock);
ret = 0;
}

return ret;
}

sys_mq_timedsend 功能如下:

  • 发送消息前先把用户消息转换成消息链表。
  • 若消息队列满了,则根据条件进行是否阻塞。
  • 消息队列未满,若接收队列中存在阻塞的接收进程,则把要发送的数据挂到阻塞的进程的消息节点上,然后唤醒阻塞的接收进程;否则把消息加到消息队列中。


mq_receive 接口分析

当使用 mq_receive 接收消息时,比如如下调用,

接收消息时,最终会调用如下函数

mq_timedreceive(mqd, msg, msg_len, &msg_prio, NULL)具体实现如下
asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char __user *u_msg_ptr, size_t msg_len, unsigned int __user *u_msg_prio, const struct timespec __user *u_abs_timeout)
{

        ...

        //获取file结构
        filp = fget(mqdes);

        inode = filp->f_path.dentry->d_inode;
        //获取inode 下的 mqueue_inode_info
        info = MQUEUE_I(inode);
        audit_inode(NULL, filp->f_path.dentry);

        spin_lock(&info->lock);
        //消息队列当前没有消息
        if (info->attr.mq_curmsgs == 0) {
                //若非阻塞,则返回
                if (filp->f_flags & O_NONBLOCK) {
                        spin_unlock(&info->lock);
                        ret = -EAGAIN;
                        msg_ptr = NULL;
                //若超时时间小于0,则返回超时时间
                } else if (unlikely(timeout < 0)) {

                } else {
                        //阻塞调用,则进程休眠,把wait加入到info中的RECV等待队列中
                        wait.task = current;
                        wait.state = STATE_NONE;
                        ret = wq_sleep(info, RECV, timeout, &wait);
                        msg_ptr = wait.msg;
                }
        } else {
                //从消息队列的最高优先级中获取一个消息
                msg_ptr = msg_get(info);

                inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;

                //由于从消息队列中已经取出一个消息了,有剩余的空间,因此把等待SEND队列上的进程的消息挂到消息队里中,然后唤醒发送消息的进程
                pipelined_receive(info);
                spin_unlock(&info->lock);
                ret = 0;
        }
        if (ret == 0) {
                ret = msg_ptr->m_ts;
                //把消息拷贝到用户态
                if ((u_msg_prio && put_user(msg_ptr->m_type, u_msg_prio)) ||
                        store_msg(u_msg_ptr, msg_ptr, msg_ptr->m_ts)) {
                        ret = -EFAULT;
                }
                //释放消息空间
                free_msg(msg_ptr);
        }
out_fput:
        fput(filp);
out:
        return ret;
}

sys_mq_timedreceive 功能如下:

  • 获取消息前,先判断消息队列中是否有消息,若没有,则根据条件进行是否阻塞。
  • 若消息队列中有消息,则从消息队列的获取一个最高优先级的消息。
  • 唤醒因为消息队列满而阻塞的发送进程。
  • 把消息拷贝到用户缓冲区中。


mq_unlink 接口分析

asmlinkage long sys_mq_unlink(const char __user *u_name)
{
        ...
        //引用计数减1,删除目录项
        err = vfs_unlink(dentry->d_parent->d_inode, dentry);

        return err;
}


该函数作用很简单,就是减少引用计数,删除目录项。


经过上述的分析,有关消息队列的内存结构可以总结如下:

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码