“分享RocketMQ5.X Pop,Ack源码解读。内容较多建议PC上对照代码查看,手机你可能会晕”
01 RocketMQ5 架构
RocketMQ已经开启5.X时代,4.X已成为LTS版本。
各大云厂商也推出支持RocketMQ5.X版本的产品,在介绍Pop,ack之前需要先了解RocketMQ5的架构。
如上图,在RocketMQ5.X的架构中,新增组件如下:
注意:社区很多朋友咨询这个客户端是否可以访问4.X 集群,答案是不支持。
5.X其实有2个客户端,优先推荐使用gRPC客户端:
5.X gRPC客户端源码: https://github.com/apache/rocketmq-clients
5.X remoting客户端源码:
https://github.com/apache/rocketmq/tree/develop/client
5.X gRPC客户端使用gRPC协议访问proxy,5.X remoting客户端可以使用remoting协议访问namesrv和proxy。
其余组件是RocketMQ4.X的原组件。想要了解更多5.X和4.X的差别,请看:RocketMQ 5.0 vs 4.9.X 图解架构对比。
pop主要解决push、pull消费者常见的4种问题:
下图是pop消费者订阅consumer queue的情况:
从上图可以看出来,每个pop client消费全部broker的全部consumer queue。
如果pop client2卡住了,其他的pop client会消费全部的consumer queue,在push消费中queue由于消费卡住或者无人消费而堆积的问题得到解决。
02 Pop流程
在Broker中,Pop的实现代码从
PopMessageProcessor.processRequest(ChannelHandlerContext, RemotingCommand)方法开始的。
笔者把pop message主要分为5个流程,这里忽略数据校验、pop前参数准备。
这里大家直接看代码PopMessageProcessor.processRequest()中调用了popMsgFromQueue()方法,此方法为pop消息的实现关键,笔者将其分解为5步:
在pop消息的时候,调用queueLockManager.tryLock(lockKey)方法,实现加锁。 锁key的格式如下:
String lockKey =
topic + // topic名字
PopAckConstants.SPLIT + // 分隔符
requestHeader.getConsumerGroup() + // 消费者组
PopAckConstants.SPLIT + // 分隔符
queueId; // consumer queue id
可以得知:一个consumer queue同时只会被同一个消费者组中的某一个1个消费者实例锁住。
如果同一个消费者组中同时有2个消费者实例来pop消息,只有一个会锁成功。
这个锁的实现代码如下:
关于这把锁有2点需要注意:
pop offset表示当前需要从这个consumer queue的哪个offset开始拉取消息。
在pop消息流程中有2处地方计算了pop offset。
读取消息调用this.brokerController.getMessageStore().getMessageAsync()方法读取,下次有机会再细聊。
check point消息(简称ck消息),其中记录了每次pop的消息信息。
在读取完消息后,将生成一个check point消息(简称ck消息)。ck消息将写入一个buffer中。调用代码如下:
写入buffer后,这些消息将进入不可见时间,也就是同一个消费者组的其他消费者实例无法再读取到,为什么呢?
在写入buffer后,其他消费者实例计算pop offset时,会把buffer中已经pop的消息计算在内,所以就不会读取到消息。
如果不可见时间到了,用户也没有ack,这些被pop 的消息会被revive服务再次恢复到用户的topic中被用户消费
这个没啥说的,直接看加锁的那个图。
pop offset是每个消费者实例在pop消息的时候会计算的,被pop 的queue中可以被消费的消息的起始位点。
rocketmq会用这个pop offset去存储中读取消息。
读取消息的流程和4.X差不多,这里不再赘述。
下图展示了pop offset是如何计算的:
pop offset的值计算有3个来源:
这个是5.X中新增的逻辑, 如果broker配置
useServerSideResetOffset=true,则通过admin api可以直接重置位点, 重置的位点会临时保存,提供给pop这个时候使用。
数据结构如下:
public class PopCheckPoint implements Comparable<PopCheckPoint> {
// 本次pop消息d的起始consumer queue offset
private long startOffset;
// 本次pop时的时间戳,单位毫秒
private long popTime;
// 本次pop消息d的不可见时间,单位毫秒
// 一般来自pop客户端请求的request header
private long invisibleTime;
// 特别重要
// 记录本次pop消息的ack情况
private int bitMap;
// 本次pop消息d的条数
private byte num;
// 本次pop的consumer queue id
private int queueId;
// 本次pop 的topic
private String topic;
// 本次pop 的消费者组
private String cid;
// 特别重要
// revieve topic的位点,后面详细讲解
private long reviveOffset;
// 特别重要
// 本次拉取消息d的每个消息的queue offset 减去 pop offset
// 的差值
private List<Integer> queueOffsetDiff;
// 本次pop 消息所在d的broker
private String brokerName;
}
下面将一些特别重要的字段详细说明:
通过上图我们可以知道,所谓的句柄其实是消息的一堆属性拼接起来的一个字符串。
这个字符串实际长这个样子:
这个数据结构主要在proxy中被用到,用来帮助构造pop_ck, 也就是pop消息的句柄。因为数据简单, 样例大家自行debug看看吧。
proxy中使用的代码如下:
上文不是说句柄broker已经构造了, 为什么proxy还需要再构造一次呢? 大家可以思考下。
这个值是一次pop一个值, 记录了pop的每个消息的位点信息, 实际格式如下:
这个数据结构主要在proxy中被用到,用来帮助构造pop_ck, 也就是pop消息的句柄。因为数据简单, 样例大家自行debug看看吧。
proxy中使用的代码如下:
proxy中使用的代码如下:
通过以上核心数据结构,我们可以看出来:broker针对pop输出了很多数据结构给proxy使用。
这里笔者也有一个疑问:这些数据结构加大了proxy与broker的耦合逻辑,这使得proxy做纯粹的无状态变得困难。
是否可以只做到接口耦合,不用做逻辑耦合?
03 Ack流程
ack是针对pop的, 一次pop可以pop出多条消息, 但是ack的需要解决以下几种情况:
上面3种情况的结果有4种:
基于上面的几种场景, RocketMQ ack是如何实现的呢?
笔者总结了ack的流程:
在broker中, ack的入口是
AckMessageProcessor.processRequest()方法,其中虚线是异步的流程,实线是同步流程。笔者将其分为以下5步。
用户提交ack请求,ack请求被Broker的AckMessageProcessor.processRequest(Channel, RemotingCommand, boolean)方法处理,并解析AckMessageRequestHeader。
AckMessageRequestHeader中包含pop ck信息, 这里逻辑上区分单个消息ack还是批量消息ack:
标记1: 单个消息ack
标记2: 批量消息ack
标记3: appendAck()方法是ack核心逻辑,后面的全部逻辑都在这个方法中实现。
标记4: 批量执行appendAck()方法。
可以看到标记4处理非原子操作是一种风险,批量提交结果未知,以最终结果一致为准。
经过第一步后,我们知道核心逻辑在appendAck()中: rocketmq将ack request header解析为AckMsg,并且调用PopBufferMergeService.addAk()将ack msg写入PopBufferMergeService的缓存中。
PopBufferMergeService顾名思义,是一个在内存中提供合并的服务。
合并什么呢, 合并ack和ck消息,也就是用ack 的consumer queue offset去标记ck中的bitmap。
其实就是标记一个ck中的哪些消息被ack了,也就是标记了消费进度。
下面讲解一些关键变量:
point:是当前ack对应的pop check point对象,里面有一个bitmap用来标记每个消息是否被ack,
具体如何标记呢:
图示如下:
我们从pop check point对象初始化的时候可以知道, bitmap是一个int,并且初始化的值为0。将0转化为二进制,可以知道每一个bit都是0。
我们用这个bitmap的前4个bit来举例说明是如何标记每条消息是否ack的。
将int转化为bitmap,是一个bit数组,每个数组元素的下标表示pop的消息的下标。
比如pop了4条消息,按照consumer queue offset从小到大排序就会有4个consumer queue offset的下标。
假如在时间t1pop了4条消息,consumer queue offset为100, 101, 102, 103。
如果第一次ack了100,则bitmap中下标=0的bit设置为1。
bit数组的结果就是上图第一列。
如果第一次ack了101,则bitmap中下标=1的bit设置为1。
bit数组的结果就是上图第列二列。
如果分别ack了第一个、第三个消息,则bitmap的结果如上图最右一列。
每次ack后,bitmap都可以转化为int,并且将这个int保存到pop check中。
这里会有3个问题
这些问题在下一步会被处理。
在上一步中, 如果消息全部被ack了, 这个是正常情况, 将最终的消费位点提交到consumer offset manager中,consumer offset manager会定时自动持久化消费位点。
如果用户在允许的时间内,没有ack完成全部的消息, 此时pop check point会被删除,这些消息用户可以继续pop。
下面介绍了这个超时时间是如何计算的:来自pop时间和不可见时间。这里可以解释不可见时间超过后, 为什么可以再次pop到消息了。
如果用户在ack的时候, pop check point消息不存在了怎么办?
首先是为什么pop check point会不存在?
超过后, pop check point消息会直接持久化到revieve topic。
如果check point不存在了, 则将ack消息保存到revieve topic中,方便与持久化的pop check point再次匹配标记哪些消息被ack了。
经过上一步,我们知道有一些check point信息和ack信息会被持久化到revieve topic。
PopBufferMergeService服务是一个后台服务, 会消费revieve topic中的ack、ck信息,然后做异步匹配, 来标记ck信息中的用户消息哪些被ack了。
这里细节特别多, 建议大家debug查看,这里如果需要细讲大家留言我们再出一期。
经过scan后,可以知道哪些ck中的用户消息被全部ack了, 就会提交消费位点到 consumer queue offset manager:
如果经过这一步,还是有ck没有完全被ack呢?请看下一步。
如果经过上一步还有ck没有被ack完全匹配,此时这些ck对应的用户消息将被重新可见,用户可以重新pop。
这个过程是在 PopReviveService服务中实现的, 这也是一个后台服务, 会定时检查哪些ck没有被完全ack, 然后根据ck将这个ck包含的全部消息重新恢复到重试topic中。
04 结语
PopBufferMergeService还有大量的细节, 建议大家通过在每个关键点打日志,然后生产消费模拟ack的几种情况再查看日志输出,再结合代码很快就会了解更多的细节。
结尾也留2个问题,欢迎大家讨论
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。