最近一段项目实践中大量使用了基于RabbitMQ的消息中间件,也积累的一些经验和思考,特此成文,望大家不吝赐教。 本文包括RabbitMQ基本概念、进阶概念、实践与思考等三部分,着重强调相关概念和基于RabbitMQ进行扩展开发的思路,并简要展示RabbitMQ客户端的编码,接下来通过一个思维导图来展示整体思路,红星表示重点部分。
官方文档: http://www.rabbitmq.com/#getstarted
进入详细介绍之前,先来看一张简化版的消息流转的模型图。
Exchange交换器
核心概念,可以简化理解为路由器,其不存储数据,其通常会和一个队列绑定,但也可以绑定到另一个交换器上。其包括4种类型的交换器类型,生产实践中主要使用可以精细管理的direct和topic两种。direct,路由规则为完全匹配;topic,支持完全匹配,也支持模糊匹配;fanout,会将消息转发到该交换器绑定的所有队列中;header,实际中无应用。
Queue队列
用于存储消息,和Kafka的消息模型完全不同,其会将消息存储在Topic中。因此在实现类似ConsumerGroup概念时差异很大,Kafka是可以回溯消息的,但Rabbit新绑定的队列的数据是空的,不能回溯。
Binding绑定
其通过绑定键将交换器和队列关联起来
RouteKey & BindingKey 路由键和绑定键
通常会将路由键和绑定键都称为路由键,其差异是路由键是包含在消息标识中的,而绑定键是用于在交换器和队列间建立绑定关系的,消息会通过它们的匹配情况进行路由。
通信实体
包括Connection连接和Channel通道,连接通常对应一个基于TCP的Socket,建立Connection的关键参数包括用户名、密码、虚拟主机、主机地址和端口。一个连接可以建立多个Channel实例,推荐控制数量(比如10个),但Channel实例不能在线程间共享,应用程序需要为每一个线程开辟一个Channel。
AMQP协议
Java技术栈汇中,关于消息通信听到比较多的是JMS,而AMQP协议相对更加严格一些,其包括Module Layer,Session Layer, Transport Layer三个层次,业务开发主要接触到的是Module Layer,客户端可以通过Queue.Declare、Basic.Consume等命令进行操作。
vhost
虚拟主机,可以在逻辑上看做一台RabbitMQ服务器,其拥有自己的交换器、队列和绑定关系等。RabbitMQ对权限的管理就是基于vhost进行的,默认会创建一个全局的/
虚拟主机,通常不推荐直接使用该vhost,而是需要自定义一个vhost便于管理。
User 对于某一个用户,通常包括3种类型的权限:read,允许读取队列数据;write,允许向队列发送数据;config,允许创建队列,如果客户端需要支持添加队列,需要添加该权限,否则会报无权限错误【踩过坑】。
TTL过期时间
目前在两个不同的粒度设置消息的TTL,分别是队列粒度和消息粒度。由于RabbitMQ实际机制的原因,通常都选择的是队列粒度,对于队列粒度来说,队列头的消息一定是最先失效的,因此可以高效的判断和丢弃。而对于消息粒度,其需要在消息真正投递到消费者时进行判断,如果该消息之前的消息并没有失效,那么它将一直存活。
死信交换器DLX
全称为 Dead-Letter-Exchange,也是RabbitMQ扩展开发的核心概念,当一条消息在一个队列中变成死信之后,它能自动的被转发到一个交换器中,这个交换器就是DLX,很多地方称和这个交换器绑定的队列是死信队列, 我并不是完全认同。
消息变为死信的原因
消息被拒绝Nack,Reject,并且requeue参数为false(重点强调一下,生产实践中通常不能打开requeue,因为打开后队列中的消息就会出现乱序的情况,且性能很差);消息过期;队列达到最大长度。 DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定 ,实际上就是设置某个队列的属性。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key" , "dlx-routing-key");
channel.exchangeDeclare("dlx_exchange" , "direct"); //创建 DLX:
channel.queueDeclare("normal_queue", false, false, false, args); //为队列normal_queue添加 DLX
命名规范:队列类型,[生产者.消费者.队列名后缀];Topic类型,[生产者.exchange.队列名后缀]
延迟队列
延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。使用延迟消息场景如在订单系统中,希望用户下单后30分钟内支付,否则取消订单。那么业务系统可以在下单后,发送延迟消息,到达指定时间后消费该消息来判断是否支持。该方式在数据量比较大的场景中比通过Job扫描数据表合适。
在AMQP协议中,或者RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过前面 所介绍的DLX和TTL模拟出延迟队列的功能,这部分在实践与思考部分进行介绍。
持久化
交换器和队列元数据持久化和消息的持久化,消息的持久化可以直接使用MessageProperties.PERSISTENT_TEXT_PLAIN
。
生产者客户端的代码比较简洁,如下所示。
byte[] messageBodyBytes = "Hello , Xionger!".getBytes();
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
从高可用HA的角度不经要问,消息的生产者将消息发送出去之后,Broker是否收到消息。RabbitMQ针对这个问题提供了两种解决方案,分别是事务机制和发送方确认PublisherConfirm。发送者确认的实现继续细分为3种形式,包括单条同步、批量同步和异步方式。事务机制和单条同步确认方式的性能都比较差,通常只能达到2000QPS左右,因此通常推荐使用发送方确认的批量方式和异步方式,其QPS可以达到8000QPS以上。其中批量方式也存在一个隐患,即发送一批消息到服务端时,如果有一条消息失败,那么该批次所有消息都需要重试。因此目前生产实践中 ,使用的是异步方式,简化的代码实践如下所示。
SortedSet confirmSet = Sets.newTreeSet();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag - 1);
} else {
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//omit
//消息重新投递处理
}
});
tip: 这部分在服务端ack时有一个优化,只会回传当前最大的标识,可以有效减少比对次数。
消费模式:拉模式,推模式,RabbitMQ推荐推模式,保持消息消费的有序性。
boolean autoAck = false;
channel.basicQos(64);//prefetchCount
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
});
tip:
对于消息生产者,过去还有一个消息投递不可达被返回的概念,涉及mandatory和immediate两个参数,但其在生产实践中并不常用。
安装:Mac环境 brew install rabbitmq
,非常简便
管理界面
在介绍了RabbitMQ主要知识后,扩展的分享一个简易的基于RabbitMQ消息中间件的思路。由于RabbitMQ是基于Erlang开发,虽然很棒但毕竟比较小众,Java技术栈的工程师一般很难去修改RabbitMQ的源码,因此通常只是通过构建一个合理的客户端SDK来支持业务开发。
生产者
生产者目标比较简单,需要实现健壮性强的的发送者确认机制【异步】和支持队列分片,队列分片可以给队列加上后缀标识,然后轮训处理即可。
消费者
消费者部分希望支持消费失败的重试机制、死信队列及其报警机制,以支持3次重试消费为例,整体思路如下图所示。【借助之前介绍的TTL和DLX】
RabbitMQ最大的特点是成熟度高,管理功能全面,近似开箱即用,二次开发实现一个简单靠谱的客户端就足以满足大部分的场景,尤其对于初创企业、中小企业来说是一个非常棒的选择。
[1]朱忠华.RabbitMQ实战指南[M].电子工业出版社:北京,2017.11:.
下期预告:深入理解MySQL索引机制 善良比聪明更重要--张小龙