消息队列(Message Queue)提供一个异步通信机制,消息的发送者不必苦苦等待着消息被处理完成,转而继续自己的工作。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候再用。消息队列在企业中应用很广泛,可选择的有ActiveMQ、RabbitMQ,Kafka,阿里巴巴自主开发RocketMQ等。本文讨论 RabbitMQ 。
RabbitMQ
欲了解 RabbitMQ 先要了解 MQ。 RabbitMQ 是 MQ 的一种实现。
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
它由这些组成:
作用:
RabbitMQ 是 MQ 的一种实现,下面介绍下 RabBMQ。
RabBMQ是一个广泛部署的开源消息代理。
RabbitMQ 流式管道
特点:
RabbitMQ 是一个实现了 AMQP协议 的工具软件,所以 AMQP 中的概念和准则也适用于 RabbitMQ。下面重点介绍AMQP,它能帮助我们深刻的理解。
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用 和消息中间件代理之间进行通信。
软件系统中存在不同厂商的不兼容产品的问题,异构系统的集成是非常昂贵和复杂的。早期的消息传递解决方案也非常昂贵,往往专门用于大公司负担得起。
AMQP 设计目标:
AMQP 不仅使这些不同的系统能够相互通信,而且能够实现不同的产品。
“消息代理” 收到 “消息生产者”并将它们 “路由” 到 “消费者”。
(发布它们的应用程序) --> 消息代理 ---> (处理它们的应用程序)
它工作过程如下图:
image.png
一些其他情况: 消息属性: 发布消息时可以给消息指定各种消息属性(message meta-data)。有些属性会被消息代理(brokers)使用,有些只能被接收消息的应用所使用。
回执(acknowledgement): 网络是不可靠的,有可能在处理消息的时候失败。AMQP 包含了一个消息确认的概念:当一个消息成功到达消费者后(consumer),消费者会通知一下消息代理(broker),这个可以是自动的也可以由开发者执行。
当“消息确认”被启用的时候,消息代理不会完全将消息从队列中删除,直到它收到来自消费者的确认回执(acknowledgement)。
无法到达 当一个消息无法被成功路由时,消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。
image.png
交换机是要掌握的重点,这一章节重点来讲。
交换机 用来传输消息的,交换机拿到一个消息之后将它路由给一个队列。
它的传输策略是由交换机类型和被称作绑定(bindings)的规则所决定的。
四种交换机:
Name(交换机类型) | 默认名称 |
---|---|
Direct exchange(直连交换机) | (空字符串) , amq.direct |
Fanout exchange(扇型交换机) | amq.fanout |
Topic exchange(主题交换机) | amq.topic |
Headers exchange(头交换机) | amq.match (and amq.headers in RabbitMQ) |
交换机状态 交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会。并不是所有的应用场景都需要持久化的交换机。
下面分别说明四种交换机类型
消息可以携带一个属性 “路由键(routing key)”,以辅助标识被路由的方式。直连型交换机(direct exchange)根据消息携带的路由键将消息投递给对应队列的。
它如何工作:
直连型交换机图例
总结: Binding 的 Routing Key 要和 消息的 Routing Key 完全匹配
扇型交换机将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。
如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
案例:
扇型交换机图例
总结 不管 消息的Routing Key,广播给这个交换机下的所有绑定队列。
主题交换机通过对消息的路由键
和 “绑定的主题名称” 进行模式匹配,将消息路由给匹配成功的队列。
它的工作方式:
主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
image.png
使用案例:
总结: 绑定 的 Routing Key 和 消息的 Routing Key 进行字符串的模糊匹配。
头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
在实际中并不常用。
队列 存储着即将被应用消费掉的消息。
名称 可以为队列指定一个名称。
队列持久化
绑定是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。
如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。
路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
消费者即使用消息的客户。
消费者标识 每个消费者(订阅者)都有一个叫做消费者标签的标识符。它可以被用来退订消息。
一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。
什么时候删除消息才是正确的?有两种情况
在显式模式下,由消费者来选择什么时候发送确认回执(acknowledgement)。
如果一个消费者在尚未发送确认回执的情况下挂掉了,那代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
拒绝消息
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。
应用可以向消息代理表明,本条消息由于“拒绝消息(Rejecting Messages)”的原因处理失败了(或者未能在此时完成)。当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。
消息的组成:
消息属性(Attributes)常见的有:
消息体:
AMQP 连接通常是长连接。AMQP是一个使用TCP提供可靠投递的应用层协议。AMQP使用认证机制并且提供TLS(SSL)保护。
当一个应用不再需要连接到AMQP代理的时候,需要优雅的释放掉AMQP连接,而不是直接将TCP连接关闭。
AMQP 提供了通道(channels)来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。
这可以应对有些应用需要建立多个连接的情形,开启多个TCP连接会消耗掉过多的系统资源。
在多线程/进程的应用中,为每个线程/进程开启一个通道(channel)是很常见的,并且这些通道不能被线程/进程共享。
通道号 通道之间是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念。
这跟Web servers虚拟主机概念非常相似,这为AMQP实体提供了完全隔离的环境。当连接被建立的时候,AMQP客户端来指定使用哪个虚拟主机。
https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html
http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html
https://rubydoc.info/github/ruby-amqp/amqp/master/file/docs/AMQP091ModelExplained.textile
https://www.rabbitmq.com/blog/tag/amqp-10/
https://github.com/rabbitmq/rabbitmq-amqp1.0
https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html
https://www.cnblogs.com/sgh1023/p/11217017.html
END