mq_open mq_close mq_unlink mq_setattr mq_getattr mq_send mq_receive
是 POSIX 消息队列(POSIX message queues)中用于发送和接收消息的函数。POSIX 消息队列是一种进程间通信(IPC)机制,允许进程以消息的形式交换数据。
哈哈哈哈,先了解一下函数,最后来个案例。
功能:打开(如果已存在)或创建一个消息队列。
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
返回值:成功时返回消息队列描述符,失败时返回 (mqd_t)-1 并设置 errno。
功能:用于将一条消息发送到指定的消息队列中。
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
mqdes:消息队列描述符,由 mq_open 返回。 msg_ptr:指向要发送的消息的指针。 msg_len:消息的长度(以字节为单位),必须小于或等于消息队列的 mq_msgsize 属性。 msg_prio:消息的优先级,一个无符号整数,值越大优先级越高。
如果函数成功,返回 0;如果失败,返回 -1 并设置 errno 以指示错误。
mq_receive 函数用于从指定的消息队列中接收一条消息。
#include <mqueue.h>
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
mqdes:消息队列描述符。 msg_ptr:指向接收消息的缓冲区的指针。 msg_len:缓冲区的长度(以字节为单位),应足够大以容纳可能接收到的最大消息。 msg_prio:如果非 NULL,则用于存储接收到的消息的优先级。
如果成功,mq_receive 返回接收到的消息的实际长度(以字节为单位)。如果失败,则返回 -1 并设置 errno。
功能:关闭消息队列描述符。
#include <mqueue.h>
int mq_close(mqd_t mqdes);
返回值:成功时返回 0,失败时返回 -1 并设置 errno。
功能:删除消息队列。
#include <mqueue.h>
int mq_unlink(const char *name);
name:消息队列的名称。
返回值:成功时返回 0,失败时返回 -1 并设置 errno。
功能:设置消息队列的属性。
#include <mqueue.h>
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
mqdes:消息队列描述符。 newattr:指向新的 mq_attr 结构体的指针。 oldattr:如果非 NULL,则用于存储旧属性的副本。
返回值:成功时返回 0,失败时返回 -1 并设置 errno。
功能:获取消息队列的属性。
#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat);
mqdes:消息队列描述符。 mqstat:指向 mq_attr 结构体的指针,用于存储队列的属性。
返回值:成功时返回 0,失败时返回 -1 并设置 errno。
接收端代码
#include <iostream>
#include <fcntl.h> // For O_* constants
#include <sys/stat.h> // For mode constants
#include <mqueue.h>
const char* QUEUE_NAME = "/my_message_queue";
const int MAX_MSG_SIZE = 1024;
int main() {
int rc;
struct mq_attr mqAttr;
printf ("Bringing up server.\n");
rc = mq_unlink (QUEUE_NAME);
if (rc < 0) {
printf (" Warning on server mq_unlink.\n");
}
mqAttr.mq_maxmsg = 10;
mqAttr.mq_msgsize = 1024;
// 创建消息队列
mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, S_IWUSR|S_IRUSR, &mqAttr);
if (mq == -1) {
perror("mq_open");
return 1;
}
// 阻塞通信阶段
std::cout << "Starting in blocking mode." << std::endl;
char buffer[MAX_MSG_SIZE];
ssize_t bytesRead;
// 尝试从消息队列读取消息(阻塞)
bytesRead = mq_receive(mq, buffer, MAX_MSG_SIZE, nullptr);
if (bytesRead == -1) {
perror("mq_receive in blocking mode");
mq_close(mq);
return 1;
}
std::cout << "Received in blocking mode: " << buffer << std::endl;
// 获取当前属性
struct mq_attr attr;
if (mq_getattr(mq, &attr) == -1) {
perror("mq_getattr");
mq_close(mq);
return 1;
}
// 切换到非阻塞通信阶段
std::cout << "Switching to non-blocking mode." << std::endl;
attr.mq_flags |= O_NONBLOCK;
if (mq_setattr(mq, &attr, nullptr) == -1) {
perror("mq_setattr");
mq_close(mq);
return 1;
}
// 尝试从消息队列读取消息(非阻塞)
bytesRead = mq_receive(mq, buffer, MAX_MSG_SIZE, nullptr);
if (bytesRead == -1) {
if (errno == EAGAIN) {
std::cout << "No message available in the queue in non-blocking mode." << std::endl;
} else {
perror("mq_receive in non-blocking mode");
}
} else {
std::cout << "Received message in non-blocking mode: " << buffer << std::endl;
}
// 开始销毁消息队列
std::cout << "开始销毁消息队列" << std::endl;
// 关闭消息队列
mq_close(mq);
// 删除消息队列(可选)
mq_unlink(QUEUE_NAME);
return 0;
}
发送端代码
#include <iostream>
#include <fcntl.h> // For O_* constants
#include <sys/stat.h> // For mode constants
#include <mqueue.h>
#include <string.h>
const char* QUEUE_NAME = "/my_message_queue";
const int MAX_MSG_SIZE = 1024;
int main() {
// 创建消息队列
mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0666, nullptr);
if (mq == -1) {
perror("mq_open");
return 1;
}
// 阻塞通信阶段
std::cout << "Starting in blocking mode." << std::endl;
char buffer[MAX_MSG_SIZE];
ssize_t bytesRead;
// 发送消息到消息队列(阻塞)
const char* messageToSend = "Hello from sender in blocking mode!";
if (mq_send(mq, messageToSend, strlen(messageToSend), 0) == -1) {
perror("mq_send in blocking mode");
mq_close(mq);
return 1;
}
// 获取当前属性
struct mq_attr attr;
if (mq_getattr(mq, &attr) == -1) {
perror("mq_getattr");
mq_close(mq);
return 1;
}
// 切换到非阻塞通信阶段
std::cout << "Switching to non-blocking mode." << std::endl;
attr.mq_flags |= O_NONBLOCK;
if (mq_setattr(mq, &attr, nullptr) == -1) {
perror("mq_setattr");
mq_close(mq);
return 1;
}
// 发送消息到消息队列(非阻塞)
const char* anotherMessageToSend = "Hello again in non-blocking mode!";
if (mq_send(mq, anotherMessageToSend, strlen(anotherMessageToSend), 0) == -1) {
if (errno == EAGAIN) {
std::cout << "Queue is full. Cannot send message in non-blocking mode." << std::endl;
} else {
perror("mq_send in non-blocking mode");
}
}
// 开始销毁消息队列
std::cout << "开始销毁消息队列" << std::endl;
// 关闭消息队列
mq_close(mq);
// 删除消息队列(可选)
mq_unlink(QUEUE_NAME);
return 0;
}
要运行这段代码实现进程间通信,可以按照以下步骤进行:
g++ mqsend.cpp -o mqsend g++ mqrecv.cpp -o mqrecv
2. 运行程序: 打开两个终端窗口,分别代表两个不同的进程。 在一个终端中运行编译后的程序:
./mqrecv
3. 在另一个终端中,稍等片刻后再次运行编译后的程序mqsend。这样两个进程就会尝试通过消息队列进行通信。
./mqsend
mq_recv出现如下错误 Message too long
添加图片注释,不超过 140 字(可选)
怎么解决的呢,大家对比一下前面 发送端案例 和接收端案例 时的 mq_open 。出现错误时, 发送端案例 和接收端案例 mq_open 设置都是如下。
int main() {
// 创建消息队列
mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0666, nullptr);
接收端案例 mq_open设置修改为如下后,就可以跑了。据说是mq_maxmsg和mq_msgsize的设置导致的。
int main() {
in rc;
struct mq_attr mqAttr;
printf ("Bringing up server.\n");
rc = mq_unlink (QUEUE_NAME);
if (rc < 0) {
printf (" Warning on server mq_unlink.\n");
}
mqAttr.mq_maxmsg = 10;
mqAttr.mq_msgsize = 1024;
// 创建消息队列
mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, S_IWUSR|S_IRUSR, &mqAttr);
当我把mqAttr.mq_maxmsg = 10;改为mqAttr.mq_maxmsg = 100;就报下面的错
添加图片注释,不超过 140 字(可选)
改回去就好。
mq_maxmsg限定消息队列中的最大消息数,mq_msgsize限定每个消息的最大字节数。
消息队列数据传输时,出现了数据不全或数据乱码的情况。
添加图片注释,不超过 140 字(可选)
添加图片注释,不超过 140 字(可选)
将代码中的sizeof修改为strlen就解决了。sizeof和strlen的区别参考这
获取char*字符串指针指向的数组长度时,记得用strlen,而不是sizeof-CSDN博客
当然在一个终端上可以收到另一个终端的消息啦
添加图片注释,不超过 140 字(可选)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。