一,消息队列简介
消息队列是一种进程间的通信机制,用于在不同进程之间同步消息。通信期间,一个进程将消息放入该队列中,然后另一个进程就可以从该队列中取出这条消息。
消息队列可以是异步的,即发送方无需等待接收方的确认或回复就可以立即执行下一步的操作。
消息队列是一种缓冲机制,即使接收方当前无法处理某个消息,该消息也不会立即丢失,而是被存储在队列中。
消息队列的通信方式减少了进程间的耦合,提高了系统的可扩展性和可维护性。
消息队列还可以用于实现分布式的任务调度和负载均衡。
常见的消息队列框架有:
ZeroMQ, RabbitMQ, ActiveMQ, Apache Kafka, MQTT等。
二,ZeroMQ框架介绍
ZeroMZeroMQ,简称"zmq",是一种高效、开源的消息传递框架,它提供了多种消息传递模式和编程语言支持。相比于传统的Socket网络编程,ZeroMQ提供了更高层次的抽象,使得程序员能够更专注于业务逻辑的实现而非底层网络通信。
ZeroMQ提供了多种消息传递模式,包括Request-Reply、Publish-Subscribe、Push-Pull等。这些模式可用于不同的场景,例如,Request-Reply适用于客户端与服务器之间的交互,Publish-Subscribe适用于发布-订阅模式,Push-Pull适用于任务分发和负载均衡等。
在ZeroMQ中,消息是通过Socket进行发送和接收的,ZeroMQ支持多种Socket类型。
ZeroMQ支持多种编程语言,包括C/C++、Java、Python等,这使得不同语言编写的应用程序之间可以互相通信,进而可以实现跨平台或者跨设备的数据传输。
三,ZeroMQ基础用法
1.创建zmq上下文
zmq::context_t context(1);
2.创建zmq通信期间的socket套接字
server端:
zmq::socket_t socket(context, ZMQ_REP)
client端:
zmq::socket_t socket(context, ZMQ_REQ);
3. 绑定或连接到对应的socket
server端:
socket.bind("tcp://*:5555");
client端:
socket.connect("tcp://localhost:5555");
4. 发送或接收消息
zmq::message_t msg(5);
memcpy(msg.data(), "hello", 5);
socket.send(msg);
zmq::message_t reply;
socket.recv(&reply);
5.关闭socket和zmq上下文,释放资源
socket.close();
context.close();
四,ZeroMQ应用场景
1.分布式计算:ZeroMQ的分布式特性使其非常适用于构建分布式计算系统,可以在多个计算节点之间进行高效的消息传递和任务分发,从而加速计算过程。举个例子,某些区块链相关的应用就基于ZeroMQ实现了消息分发机制。
2.服务端开发:ZeroMQ可以用于构建轻量级的服务架构,服务之间通过ZeroMQ通信,可以实现高可用性和可扩展性。
3.日志框架开发:ZeroMQ可以用于实时日志收集,它可以在不同的进程之间传递日志消息,并将它们进行聚合和存储。
4.消息队列构建:ZeroMQ可以用于构建高性能的消息队列机制,多个生产者可以向一个队列发送消息,多个消费者可以从队列中取出消息进行处理。
5.实时通信:ZeroMQ可以用于构建实时通信系统,例如聊天应用、游戏服务器等,通过ZeroMQ可以进行高效的消息传递和实时状态同步。
6.跨设备数据传输:在嵌入式开发场景,可以通过ZeroMQ传输从各个传感器采集到的数据,也可以实现嵌入式设备到移动App端的通信。
五,ZeroMQ主要通信模式
1.请求-应答模式(Request-Reply)
用于服务端和客户端的直接通信。
客户端发送请求,服务端接收请求并给出响应。
2.发布-订阅模式(Publish-Subscribe)
以广播的方式传递消息,发布者将数据分发给多个订阅者。
发布者将消息发送到一个或多个主题,订阅者可以订阅特定的主题并接收消息。
3.异步队列模式(Push-Pull)
Push端将消息推到队列中,Pull端从队列中取出消息进行处理。
该模式也被称为管道模式(Pipeline)。
4.排他对接模式(Exclusive Pair)
点对点的模式,将两个套接字一对一地连接起来,用于两个节点之间的通信,这种模式应用场景很少。
六,ZeroMQ常用函数接口
zmq_ctx_new:创建zmq上下文对象。
zmq_socket:创建zmq套接字对象。
zmq_bind:将套接字绑定到指定端口上。
zmq_connect:将套接字连接到指定端口上。
zmq_send:往套接字上发送消息。
zmq_recv:从套接字上接收消息。
zmq_poll:等待多个套接字上的事件。
zmq_msg_init:初始化空的zmq消息。
zmq_msg_send:往套接字上发送消息,支持更复杂的操作。
zmq_msg_recv:从套接字上接收消息,支持更复杂的操作。
1.zmq消息的构造
//创建空的zmq消息
zmq::message_t msg;
//给消息分配内存空间
const size_t size = 1024;
zmq::message_t msg(size);
//使用外部数据初始化消息
zmq::message_t msg("hello world!", 12);
2.发送zmq消息
zmq::message_t msg = ...;
auto res = sock.send(msg, zmq::send_flags::none);
auto res = sock.send(std::move(msg), zmq::send_flags::none);
auto res = sock.send(zmq::str_buffer("hello world"), zmq::send_flags::none);
3.接收zmq消息
auto res = sock.recv(msg, zmq::recv_flags::none);
auto res = sock.recv(buf, zmq::recv_flags::none);
4.设置或读取套接字属性
sock.set(zmq::sockopt::immediate, false);
sock.set(zmq::sockopt::routing_id, "100");
auto rid = sock.get(zmq::sockopt::routing_id);
5.poller轮询器操作
zmq::poller_t<> in_poller, out_poller;
//创建两个输入poller,一个输出poller
in_poller.add(input_socket1, zmq::event_flags::pollout);
in_poller.add(input_socket2, zmq::event_flags::pollout);
out_poller.add(output_socket, zmq::event_flags::pollout);
const std::chrono::milliseconds timeout{100};
std::vector<zio::poller_event<>> in_events(2);
std::vector<zio::poller_event<>> out_events(1);
while (true) {
const auto nin = in_poller.wait_all(in_events, timeout);
if (!nin) {
std::cout << "input timeout, try again" << std::endl;
continue;
}
for (int ind=0; ind<nin; ++ind) {
zmq::message_t msg;
auto rres = in_events[ind].socket.recv(msg, zmq::recv_flags::none);
const auto nout = out_poller.wait_all(out_events, timeout);
if (!nout) {
std::cout << "output timeout, freakout" << std::endl;
abort();
}
auto sres = out_events[0].socket.send(msg, zmq::send_flags::none);
}
}
七,ZeroMQ的编码与集成
1.zmq的Linux版本安装
下载官方发行的Linux版本zmq代码,下载完成后在本地编译生成依赖库和头文件。
下载地址1:
https://github.com/zeromq/zeromq4-1/releases
进入zmq代码目录,使用以下命令进行编译:
sh autogen.sh
./configure
make
make install
运行完以上命令,不配置自定义路径的时候,会在"/user/local/"下生成对应的so文件和头文件。
├── include
│ ├── zmq.h
│ └── zmq_utils.h
├── lib
│ ├── libzmq.a
│ ├── libzmq.la
│ ├── libzmq.so -> libzmq.so.5.0.3
│ ├── libzmq.so.5 -> libzmq.so.5.0.3
│ ├── libzmq.so.5.0.3
│ └── pkgconfig
注:"/user/local/"路径很容易被编译器找到,因此,编译的时候,只需要在gcc或g++命令后面加上"-lzmq"参数即可。
2.C语言版本的zmq集成
a.操作步骤:
完成以上安装即可。
b.引入的头文件:
include <zmq.h>
3.C++语言版本的zmq集成
a.操作步骤:
1.完成以上安装。
2.下载并解压官方的cppzmq压缩包,从中拷贝需要依赖的hpp头文件到之前的include目录中。
下载地址2:
https://github.com/zeromq/cppzmq/archive/master.zip
cp zmq.hpp /user/local/include/
cp zmq_addon.hpp /user/local/include/
b.引入的头文件:
zmq.hpp: 包含zmq消息、上下文、缓冲区、套接字、监视器、轮询器等的具体实现。
zmq_addon.hpp:zeromq库的扩展,包含更多高级功能以及另一种形式的轮询器的实现。
include <zmq.h>
include <zmq.hpp>
//实现复杂功能会需要zmq_addon.hpp
//#include <zmq_addon.hpp>
4.完整的集成过程
a.项目结构:
── zmq_demo
├── CMakeLists.txt
├── zmq_client.cpp
└── zmq_server.cpp
b.CMakeLists.txt配置
cmake_minimum_required(VERSION 3.5)
project(zmq_demo)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
include_directories(/user/local/include)
link_directories(/user/local/lib)
add_compile_options(-Wno-error=unused-parameter)
#Add executable
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
add_executable(zmq_server zmq_server.cpp)
add_executable(zmq_client zmq_client.cpp)
#Link zmq library
target_link_libraries(zmq_server -lzmq)
target_link_libraries(zmq_client -lzmq)
c.服务端代码:zmq_server.cpp
#include <zmq.h>
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
std::string s_recv(zmq::socket_t & socket, int flags = 0) {
zmq::message_t message;
auto recv_flags = (flags ==0)? zmq::recv_flags::none: zmq::recv_flags::dontwait;
(void)socket.recv(message, recv_flags);
return std::string(static_cast<char*>(message.data()), message.size());
}
bool s_send(zmq::socket_t & socket, const std::string & string, int flags = 0) {
zmq::message_t message(string.size());
memcpy (message.data(), string.data(), string.size());
bool rc = socket.send (message, static_cast<zmq::send_flags>(flags)).has_value();
return (rc);
}
int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP);
socket.bind("tcp://*:5555");
int count=0;
while (true)
{
std::string recvStr;
recvStr = s_recv(socket);
std::cout << "server Received " << recvStr << " " << count << std::endl;
sleep(1);
//Send reply back to client
std::string sendStr = "World";
s_send(socket, sendStr);
std::cout << "server Send " << sendStr << " " << count << std::endl;
count++;
}
return 0;
}
d.客户端代码:zmq_client.cpp
#include <zmq.hpp>
#include <zmq.h>
#include <string>
#include <iostream>
std::string s_recv(zmq::socket_t & socket, int flags = 0) {
zmq::message_t message;
auto recv_flags = (flags ==0)? zmq::recv_flags::none: zmq::recv_flags::dontwait;
(void)socket.recv(message, recv_flags);
return std::string(static_cast<char*>(message.data()), message.size());
}
bool s_send(zmq::socket_t & socket, const std::string & string, int flags = 0) {
zmq::message_t message(string.size());
memcpy (message.data(), string.data(), string.size());
bool rc = socket.send (message, static_cast<zmq::send_flags>(flags)).has_value();
return (rc);
}
int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REQ);
std::cout << "Connecting to hello world server..." << std::endl;
socket.connect("tcp://localhost:5555");
for (int request_nbr = 0; request_nbr != 6; request_nbr++)
{
std::string sendStr = "Hello";
s_send(socket, sendStr);
std::cout << "client Send " << sendStr << " " << request_nbr << std::endl;
//Get the reply.
std::string recvStr;
recvStr = s_recv(socket);
std::cout << "client Received " << recvStr << " " << request_nbr << std::endl;
}
return 0;
}
编译过程:
Scanning dependencies of target zmq_server
[ 25%] Building CXX object CMakeFiles/zmq_server.dir/zmq_server.cpp.o
[ 50%] Linking CXX executable ../bin/zmq_server
[ 50%] Built target zmq_server
Scanning dependencies of target zmq_client
[ 75%] Building CXX object CMakeFiles/zmq_client.dir/zmq_client.cpp.o
[100%] Linking CXX executable ../bin/zmq_client
[100%] Built target zmq_client
运行结果:
客户端:
root@ubuntu:/home/zmq_demo/bin# ./zmq_client
Connecting to hello world server...
client Send Hello 0
client Received World 0
client Send Hello 1
client Received World 1
client Send Hello 2
client Received World 2
client Send Hello 3
client Received World 3
client Send Hello 4
client Received World 4
client Send Hello 5
client Received World 5
服务端:
root@ubuntu:/home/zmq_demo/bin# ./zmq_server
server Received Hello 0
server Send World 0
server Received Hello 1
server Send World 1
server Received Hello 2
server Send World 2
server Received Hello 3
server Send World 3
server Received Hello 4
server Send World 4
server Received Hello 5
server Send World 5
八,ZeroMQ代码实战
发布者端:
#include <iostream>
#include <string>
#include <unistd.h>
#include <zmq.h>
#include <zmq.hpp>
int main()
{
// Create ZMQ Context
zmq::context_t context ( 1 );
// Create the Publish socket
zmq::socket_t publisher ( context, ZMQ_PUB );
// Bind to a tcp socket
publisher.bind( "tcp://*:5556" );
usleep( 1000000 );
// Message to send to the subscribers
std::string msg = "msg from [pub]";
// loop 6 times
for ( int i = 1; i <= 6; i++ )
{
// Create zmq message
zmq::message_t request( msg.length() );
// Copy contents to zmq message
memcpy( request.data(), msg.c_str(), msg.length() );
// Publish the message
publisher.send( request );
std::cout << "sending: " << i << std::endl;
}
}
订阅者端:
#include <iostream>
#include <string>
#include <zmq.h>
#include <zmq.hpp>
int main()
{
// Create ZMQ Context
zmq::context_t context ( 1 );
// Create the Subscribe socket
zmq::socket_t subscriber ( context, ZMQ_SUB );
// Connect to a tcp socket
subscriber.connect( "tcp://localhost:5556" );
// Set the socket option to subscribe
subscriber.setsockopt( ZMQ_SUBSCRIBE, "", 0 );
// infinite loop to receive messages
for ( int i = 1; i > 0; i++ )
{
// Receive the message and convert to string
zmq::message_t update;
subscriber.recv( &update );
std::string msg = update.to_string();
// Print the message
std::cout << "Num: " << i << ", message: " << msg << std::endl;
}
}
运行结果:
发布者端:
root@ubuntu:/home/zmq_demo/bin# ./zmq_pub
sending: 1
sending: 2
sending: 3
sending: 4
sending: 5
sending: 6
订阅者端:
root@ubuntu:/home/zmq_demo/bin# ./zmq_sub
Num: 1, message: msg from [pub]
Num: 2, message: msg from [pub]
Num: 3, message: msg from [pub]
Num: 4, message: msg from [pub]
Num: 5, message: msg from [pub]
Num: 6, message: msg from [pub]
九,参考阅读
https://zeromq.org/socket-api/
git://github.com/imatix/zguide.git
https://zguide.zeromq.org/docs/chapter2/
https://wizardforcel.gitbooks.io/zmq-guide/content/chapter1.html