前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >消息队列编程和案例,进程间通信 mq_open mq_close mq_unlink mq_setattr mq_getattr mq_send mq_rece

消息队列编程和案例,进程间通信 mq_open mq_close mq_unlink mq_setattr mq_getattr mq_send mq_rece

原创
作者头像
爱串门的小马驹
发布2024-09-01 11:05:03
1850
发布2024-09-01 11:05:03
举报
文章被收录于专栏:网络通信基础

一、介绍

mq_open mq_close mq_unlink mq_setattr mq_getattr mq_send mq_receive

是 POSIX 消息队列(POSIX message queues)中用于发送和接收消息的函数。POSIX 消息队列是一种进程间通信(IPC)机制,允许进程以消息的形式交换数据。

哈哈哈哈,先了解一下函数,最后来个案例。

二、mq_open

功能:打开(如果已存在)或创建一个消息队列。

代码语言:javascript
复制
#include <mqueue.h>  
#include <fcntl.h>  
#include <sys/stat.h>  

mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
  • name:消息队列的名称,必须是以斜杠(/)开头的绝对路径名。
  • oflag:操作标志,可以是 O_NONBLOCK(非阻塞模式)和 O_CREAT(如果队列不存在则创建)的组合。
  • mode:如果 O_CREAT 被设置,则指定新创建队列的权限。
  • attr:指向 mq_attr 结构体的指针,用于指定队列的属性(如最大消息大小和队列容量)。如果为 NULL,则使用默认属性。

返回值:成功时返回消息队列描述符,失败时返回 (mqd_t)-1 并设置 errno。

三、mq_send

功能:用于将一条消息发送到指定的消息队列中。

代码语言:javascript
复制
#include <mqueue.h>  

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);

mqdes:消息队列描述符,由 mq_open 返回。 msg_ptr:指向要发送的消息的指针。 msg_len:消息的长度(以字节为单位),必须小于或等于消息队列的 mq_msgsize 属性。 msg_prio:消息的优先级,一个无符号整数,值越大优先级越高。

如果函数成功,返回 0;如果失败,返回 -1 并设置 errno 以指示错误。

四、mq_receive

mq_receive 函数用于从指定的消息队列中接收一条消息。

代码语言:javascript
复制
#include <mqueue.h>  
  
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);

mqdes:消息队列描述符。 msg_ptr:指向接收消息的缓冲区的指针。 msg_len:缓冲区的长度(以字节为单位),应足够大以容纳可能接收到的最大消息。 msg_prio:如果非 NULL,则用于存储接收到的消息的优先级。

如果成功,mq_receive 返回接收到的消息的实际长度(以字节为单位)。如果失败,则返回 -1 并设置 errno。

五、mq_close

功能:关闭消息队列描述符。

代码语言:javascript
复制
#include <mqueue.h>  

int mq_close(mqd_t mqdes);
  • mqdes:消息队列描述符。

返回值:成功时返回 0,失败时返回 -1 并设置 errno。

六. mq_unlink

功能:删除消息队列。

代码语言:javascript
复制
​
#include <mqueue.h>  

int mq_unlink(const char *name);

name:消息队列的名称。

返回值:成功时返回 0,失败时返回 -1 并设置 errno。

七、mq_setattr

功能:设置消息队列的属性。

代码语言:javascript
复制
​#include <mqueue.h>  

int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);

mqdes:消息队列描述符。 newattr:指向新的 mq_attr 结构体的指针。 oldattr:如果非 NULL,则用于存储旧属性的副本。

返回值:成功时返回 0,失败时返回 -1 并设置 errno。

八、 mq_getattr

功能:获取消息队列的属性。

代码语言:javascript
复制
 ​#include <mqueue.h>  

int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat);

mqdes:消息队列描述符。 mqstat:指向 mq_attr 结构体的指针,用于存储队列的属性。

返回值:成功时返回 0,失败时返回 -1 并设置 errno。

九、消息队列案例

接收端代码

代码语言:javascript
复制
#include <iostream>
#include <fcntl.h>           // For O_* constants
#include <sys/stat.h>        // For mode constants
#include <mqueue.h>

const char* QUEUE_NAME = "/my_message_queue";
const int MAX_MSG_SIZE = 1024;

int main() {

    int rc;
    struct mq_attr mqAttr;

    printf ("Bringing up server.\n");
    rc = mq_unlink (QUEUE_NAME);
    if (rc < 0) {
        printf ("   Warning on server mq_unlink.\n");
    }

    mqAttr.mq_maxmsg = 10;
    mqAttr.mq_msgsize = 1024;

    // 创建消息队列
    mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, S_IWUSR|S_IRUSR, &mqAttr);
    if (mq == -1) {
        perror("mq_open");
        return 1;
    }

    // 阻塞通信阶段
    std::cout << "Starting in blocking mode." << std::endl;
    char buffer[MAX_MSG_SIZE];
    ssize_t bytesRead;

    // 尝试从消息队列读取消息(阻塞)
    bytesRead = mq_receive(mq, buffer, MAX_MSG_SIZE, nullptr);
    if (bytesRead == -1) {
        perror("mq_receive in blocking mode");
        mq_close(mq);
        return 1;
    }
    std::cout << "Received in blocking mode: " << buffer << std::endl;


    // 获取当前属性
    struct mq_attr attr;
    if (mq_getattr(mq, &attr) == -1) {
        perror("mq_getattr");
        mq_close(mq);
        return 1;
    }

    // 切换到非阻塞通信阶段
    std::cout << "Switching to non-blocking mode." << std::endl;
    attr.mq_flags |= O_NONBLOCK;
    if (mq_setattr(mq, &attr, nullptr) == -1) {
        perror("mq_setattr");
        mq_close(mq);
        return 1;
    }

    // 尝试从消息队列读取消息(非阻塞)
    bytesRead = mq_receive(mq, buffer, MAX_MSG_SIZE, nullptr);
    if (bytesRead == -1) {
        if (errno == EAGAIN) {
            std::cout << "No message available in the queue in non-blocking mode." << std::endl;
        } else {
            perror("mq_receive in non-blocking mode");
        }
    } else {
        std::cout << "Received message in non-blocking mode: " << buffer << std::endl;
    }


    // 开始销毁消息队列
    std::cout << "开始销毁消息队列" << std::endl;

    // 关闭消息队列
    mq_close(mq);

    // 删除消息队列(可选)
    mq_unlink(QUEUE_NAME);

    return 0;
}

发送端代码

代码语言:javascript
复制
 #include <iostream>
#include <fcntl.h>           // For O_* constants
#include <sys/stat.h>        // For mode constants
#include <mqueue.h>
#include <string.h>

const char* QUEUE_NAME = "/my_message_queue";
const int MAX_MSG_SIZE = 1024;

int main() {
    // 创建消息队列
    mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0666, nullptr);
    if (mq == -1) {
        perror("mq_open");
        return 1;
    }

    // 阻塞通信阶段
    std::cout << "Starting in blocking mode." << std::endl;
    char buffer[MAX_MSG_SIZE];
    ssize_t bytesRead;


    // 发送消息到消息队列(阻塞)
    const char* messageToSend = "Hello from sender in blocking mode!";
    if (mq_send(mq, messageToSend, strlen(messageToSend), 0) == -1) {
        perror("mq_send in blocking mode");
        mq_close(mq);
        return 1;
    }

    // 获取当前属性
    struct mq_attr attr;
    if (mq_getattr(mq, &attr) == -1) {
        perror("mq_getattr");
        mq_close(mq);
        return 1;
    }

    // 切换到非阻塞通信阶段
    std::cout << "Switching to non-blocking mode." << std::endl;
    attr.mq_flags |= O_NONBLOCK;
    if (mq_setattr(mq, &attr, nullptr) == -1) {
        perror("mq_setattr");
        mq_close(mq);
        return 1;
    }


    // 发送消息到消息队列(非阻塞)
    const char* anotherMessageToSend = "Hello again in non-blocking mode!";
    if (mq_send(mq, anotherMessageToSend, strlen(anotherMessageToSend), 0) == -1) {
        if (errno == EAGAIN) {
            std::cout << "Queue is full. Cannot send message in non-blocking mode." << std::endl;
        } else {
            perror("mq_send in non-blocking mode");
        }
    }

    // 开始销毁消息队列
    std::cout << "开始销毁消息队列" << std::endl;

    // 关闭消息队列
    mq_close(mq);

    // 删除消息队列(可选)
    mq_unlink(QUEUE_NAME);

    return 0;
}

要运行这段代码实现进程间通信,可以按照以下步骤进行:

  1. 编译代码: 使用 C++ 编译器(如 g++)编译代码。
代码语言:javascript
复制
 g++ mqsend.cpp -o mqsend g++ mqrecv.cpp -o mqrecv

2. 运行程序: 打开两个终端窗口,分别代表两个不同的进程。 在一个终端中运行编译后的程序:

代码语言:javascript
复制
 ./mqrecv

3. 在另一个终端中,稍等片刻后再次运行编译后的程序mqsend。这样两个进程就会尝试通过消息队列进行通信。

代码语言:javascript
复制
 ./mqsend

十、Message too long 和 Invalid argument错误处理

10.1 Message too long

mq_recv出现如下错误 Message too long

添加图片注释,不超过 140 字(可选)

怎么解决的呢,大家对比一下前面 发送端案例 和接收端案例 时的 mq_open 。出现错误时, 发送端案例 和接收端案例 mq_open 设置都是如下。

代码语言:javascript
复制
​int main() {
    // 创建消息队列
    mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0666, nullptr);

接收端案例 mq_open设置修改为如下后,就可以跑了。据说是mq_maxmsg和mq_msgsize的设置导致的。

代码语言:javascript
复制
int main() {

    in rc;
    struct mq_attr mqAttr;

    printf ("Bringing up server.\n");
    rc = mq_unlink (QUEUE_NAME);
    if (rc < 0) {
        printf ("   Warning on server mq_unlink.\n");
    }

    mqAttr.mq_maxmsg = 10;
    mqAttr.mq_msgsize = 1024;

    // 创建消息队列
    mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, S_IWUSR|S_IRUSR, &mqAttr);

10.2 Invalid argument

当我把mqAttr.mq_maxmsg = 10;改为mqAttr.mq_maxmsg = 100;就报下面的错

添加图片注释,不超过 140 字(可选)

改回去就好。

mq_maxmsg限定消息队列中的最大消息数,mq_msgsize限定每个消息的最大字节数。

10.3 数据不全和数据乱码

消息队列数据传输时,出现了数据不全或数据乱码的情况。

添加图片注释,不超过 140 字(可选)

添加图片注释,不超过 140 字(可选)

将代码中的sizeof修改为strlen就解决了。sizeof和strlen的区别参考这

获取char*字符串指针指向的数组长度时,记得用strlen,而不是sizeof-CSDN博客

十一、结果

当然在一个终端上可以收到另一个终端的消息啦

添加图片注释,不超过 140 字(可选)

queue - mq_receive:消息太长 - Stack Overflow

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、介绍
  • 二、mq_open
  • 三、mq_send
  • 四、mq_receive
  • 五、mq_close
  • 六. mq_unlink
  • 七、mq_setattr
  • 八、 mq_getattr
  • 九、消息队列案例
  • 十、Message too long 和 Invalid argument错误处理
    • 10.1 Message too long
      • 10.2 Invalid argument
        • 10.3 数据不全和数据乱码
        • 十一、结果
        相关产品与服务
        消息队列
        腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档