消息队列具有高性能,高可用性,高并发的特点,是后端程序员必备的技能,本文叙述常见的使用消息队列的问题和最佳实践
应用场景:消息队列最常被使用的三种场景:异步处理、流量控制和服务解耦
一手资料地址:
RabbitMQ 官方文档: https://www.rabbitmq.com/documentation.html
RocketMQ 官方文档: https://rocketmq.apache.org/docs/quick-start/
RocketMQ 中国开发者中心:http://rocketmq.cloud/zh-cn/
Kafka 官方文档: http://kafka.apache.org/documentation/
RabbitMQ Kafka RocketMQ
支持事务消息,Kafka,RocketMQ
技术选型:
优缺点:
RabbitMQ Erlang语言开发的
RabbitMQ Java
Kafka
比较项 | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|
开发语言 | Erlang | Java | Java | Scale |
支持量级 | 几万到十几万 | 几十万 | 几十万 |
是否支持事务 | 否 | 是 | 是 |
是否保证消息可靠 | 是 | 是 | 是 |
模式 | 消息队列 | 发布订阅 | 发布订阅 |
RabbitMQ当消息积压的时候,性能急剧下降
RocketMQ有很好的性能表现
RabbitMQ 消息队列模式,Exchange配置路由规则,发动到指定队列,如图
RocketMQ 发布订阅模式,订阅主题,满足不同系统对队列的需要,各组件如下:
Kafka:分区(partition)对应RocketMQ的queue
划重点:一个消费组内的消费者是竞争关系,一个队列只能让一个消费者实例消费,这是为了防止消息空洞,不同消费组互相不影响,
消费组、消费者和队列这几个概念的对应关系(RabbitMQ没有消费组)
每个消费组就是一份订阅,它要消费主题 MyTopic 下,所有队列的全部消息。注意,队列里的消息并不是消费掉就没有了,这里的“消费”,只是去队列里面读了消息,并没有删除,消费完这条消息还是在队列里面。
多个消费组在消费同一个主题时,消费组之间是互不影响的。比如我们有 2 个消费组:G0 和 G1。G0 消费了哪些消息,G1 是不知道的,也不用知道。G0 消费过的消息,G1 还可以消费。即使 G0 积压了很多消息,对 G1 来说也没有任何影响。
然后我们再说消费组的内部,一个消费组中可以包含多个消费者的实例。比如说消费组 G1,包含了 2 个消费者 C0 和 C1,那这 2 个消费者又是怎么和主题 MyTopic 的 5 个队列对应的呢?
由于消费确认机制的限制,这里面有一个原则是,在同一个消费组里面,每个队列只能被一个消费者实例占用。至于如何分配,这里面有很多策略,我就不展开说了。总之保证每个队列分配一个消费者就行了。比如,我们可以让消费者 C0 消费 Q0,Q1 和 Q2,C1 消费 Q3 和 Q4,如果 C0 宕机了,会触发重新分配,这时候 C1 同时消费全部 5 个队列。
再强调一下,队列占用只是针对消费组内部来说的,对于其他的消费组来说是没有影响的。比如队列 Q2 被消费组 G1 的消费者 C1 占用了,对于消费组 G2 来说,是完全没有影响的,G2 也可以分配它的消费者来占用和消费队列 Q2。
最后说一下消费位置,每个消费组内部维护自己的一组消费位置,每个队列对应一个消费位置。消费位置在服务端保存,并且,消费位置和消费者是没有关系的。每个消费位置一般就是一个整数,记录这个消费组中,这个队列消费到哪个位置了,这个位置之前的消息都成功消费了,之后的消息都没有消费或者正在消费。
从消费生产到消费的整个环节,都要保证消息不丢失:
生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。
上述3种中间件产品都适用
从生产者(比如Kafka)发送消息需要ACK手动应答
方式1:同步发送,并且如果捕获异常,需要重发
方式2:异步发送并且提供接口回查
存储阶段:主要为了防止机器故障,比如进程死掉了或者服务器宕机了,消费丢失
如果是单机,设置磁盘写成功后,返回ACK,例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘
如果是集群,需要设置集群2个以上的节点写入成功,再给客户端返回ACK
消费阶段:不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。
为了保证消息可靠,Broker和消费者都会存在重复消息,并且按着MQTT消息的质量标准要求,我们大部分的消息队列中间件采用At least once语义,Broker无法去除重复消息,只能依靠消费者在业务层进行幂等处理
从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。
常见的幂等处理方法:
1.版本号
将请求发来的消息数据解析后,在数据库更新的时候,比对现有数据库的版本号是否一致,如果一致更新数据库,并将版本号递增
2.将接受到的消息放到唯一性记录表中,并记录消费状态,业务属性+状态设置为唯一索引,利用局部唯一特性,只针对当前业务
如果该条消息查询成功,并且是消费成功了,则直接返回成功,而不是将其置为消费失败。(这个时候需要防止不同的消费组的消费者同时消费同一条记录做同样的业务操作,一般这种情况也不存在,毕竟不同的消费组就是定义不同的业务处理单元)
基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。
3.利用数据库的唯一约束是最后一道保证幂等的保证,同样,如果触发唯一约束,返回处理成功,ACK成功
4.先将消息标记记录,消费时候进行标记检查
全局唯一递增id标记消息,到消费者,需要先进行检查然后进行更新(更新需要是原子性)
在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。
比如说,对于同一条消息:“全局 ID 为 8,操作为:给 ID 为 666 账户增加 100 元”,有可能出现这样的情况:
t0 时刻:Consumer A 收到条消息,检查消息执行状态,发现消息未处理过,开始执行“账户增加 100 元”;
t1 时刻:Consumer B 收到条消息,检查消息执行状态,发现消息未处理过,因为这个时刻,Consumer A 还未来得及更新消息执行状态。
这样就会导致账户被错误地增加了两次 100 元,这是一个在分布式系统中非常容易犯的错误,一定要引以为戒。
对于这个问题,当然我们可以用事务来实现,也可以用锁来实现,但是在分布式系统中,无论是分布式事务还是分布式锁都是比较难解决问题。
查询与更新分为了两部分,更新前先检查查询之前的标记值
从整个链路分析,消费积压可能是生产者发送太快,或者是消费者消费速度跟不上
一般生产者可以批量发送或者并发发送消息即可
瓶颈一般是消费者消费速度跟不上
通过监控如果发现消费者速度跟不上,可以扩容消费者实例,
在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。
对于系统发生消息积压的情况,需要先解决积压,再分析原因,毕竟保证系统的可用性是首先要解决的问题,如果短时间无法定位问题和扩容无法解决积压问题,可以先记录,直接丢弃消息,低峰时间段再进行补偿(再次发送消息进行消费)
所以,我们在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。
下文继续讲解
怎么使用事务消息保证事务一致性
怎么保证消息顺序消费?
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。