近日阅读RocketMQ源代码时,在思考一个问题。集群中只有一组broker时,而这组的master停机或者宕机时,消费端又恰好出现了消费异常,会有什么现象?以及由此引发的其他问题。今天在这里做下记录也算是做下探讨,抛砖引玉,理解不对的地方,欢迎批评指正。
为方便层层递进解释问题,以1,2,3,4的序号描述每种状况。
首先说下消息消费的具体流程,我们都知道RocketMQ的消费端实际上是主动从RocketMQ的服务器上拉取消息进行消费的,客户端底层都是Pull模式。如果master正常,就从master上拉取消息;如果master挂掉,会从slave上拉取消息(虽然slave不能写消息,但是可以写消费进度)。下面我们会用到DefaultMQPushConsumer消费客户端,这个客户端是依赖Pull模式实现的长轮询Push消息模式。
首先是消费者拉取一个topic在某个broker上部分或者全部MessageQueue上面的消息。在这里,我们假定ConsumerGroupLusong 去broker-1(master,slave)上拉取topic为lusong(假如topic在broker-1上有4个queue)的消息,因为消费客户端会做负载均衡,默认均衡算法是AllocateMessageQueueAveragely,就是平均分配每一个MessageQueue。我们又假定ConsumerGroupLusong 下面的一台服务器consumerA, 分配到broker-1上topic为lusong的QueueId为1,和3的MessageQueue。
我们先来捋一捋broker正常、消费端也正常的情况。consumerA在消费拉取到的消息完成后,每消费完一批消息(批的大小consumeMessageBatchMaxSize在DefaultMQPushConsumer中可以设置,默认是1),就会把消费进度写到消费端的内存中,然后是定时(persistConsumerOffsetInterval,默认每隔5秒)向broker发起请求写这个MessageQueue的消息进度,broker是怎么处理进度呢?broker拿到某个topic下的messageQueue消费进度会存到内存,然后定时持久化到文件中。
假如consumerA消费消息出现异常了,抛出异常或者直接返RECONSUME_LATER状态,消费端也会认为这条异常消费已经消费了,消费进度加1,继续向broker发起写消费进度的请求。只不过会在写消费进度前向broker发送retry主题的消息,在这里topic为%RETRY%ConsumerGroupLusong。消费端这么做的主要意义是不阻碍消费进度,有错误先提交进度,出错的消息挪到后面去消费。
那么,消费端消费失败发生在broker宕机、停机时(这里只master)会有什么现象?假定broker-1宕机、停机时,consumerA正在处理的一条消息,是从broker-1上拉取的topic为lusong的消息,那么consumerA消费消息出现异常了,consumerA会向broker-1写消费进度,但是写不成功。consumerA也会向broker-1发送topic为%RETRY%ConsumerGroupLusong的消息,但是写消息又写不成功。没发送成功的消息会被放入processQueue继续消费,但是继续消费后有两种情况,有可能ConsumerA突然消费成功了,也可能还是消费失败。消费成功的情况,虽然消费成功,但提交不了消费进度;消费失败的情况,会再发出%RETRY%ConsumerGroupLusong的消息。通常情况下,是业务一直消费失败,所以会出现无限消费某条消息,无限发送retry主题的消息,无限发送不成功的情况,不眠不休,无限循环。直到broker-1的master启动成功,或者ConsumerA关闭,这种情况才会停止。
另外,因为slave作为master的备份,当master停机时,消费者会从slave上拉取消息进行消费,会提交消费进度到slave上。当然如果消费异常了,也会向slave写retry主题的消息,但是slave是不接受写消息的,也就是写retry主题的消息都是失败的。哈哈,和上面一样了,无限消费失败,无限写retry消息失败,无限循环。直到broker-1的master启动成功,或者ConsumerA关闭,这种情况才会停止。
以上逻辑实现在ConsumeMessageConcurrentlyService,具体在如下代码中:
上面的情况解释完了,下面说说另外一个消费的问题。
我们知道消费消息时,有的消息处理的慢,有的消息处理的快,这就衍生出2个问题,消费进度怎么提交,进度提交失败时会有什么问题。
我们还用到上面的一些假设,假定ConsumerGroupLusong 下面的一台服务器consumerA, 分配到broker-1上topic为lusong的QueueId为1,和3的MessageQueue。又假设ConsumerA一次拉取到Queue=1下面的10条消息,10条消息的QueueOffset是从1到10。
又消费QueueOffset为1的消息耗时10分钟,消费剩下的消息耗时都在几秒内,那么消费进度怎么提交呢?Consumer是拿到这批已处理完的消息的最大QueueOffset作为消费进度提交的,并且比这个Offset小的消息都已经处理完了(处理失败、处理成功都算处理完)。不对,是拿这个QueueOffset加1作为消费进度提交到broker上。所以上面的情况,在处理QueueOffset为1的消息时,消费进度一直没法提交,因为最小QueueOffset没有处理完,就相当于卡死在这里了。
这种消费客户端设计和传统的一条message单独ack的方式有本质的区别,性能是大幅提升了,但是导致可能重复消费的问题。假如在处理QueueOffset为1的消息时,消费端挂掉了,消费进度没有提交也没法提交。这MessageQueue会立即分配给其他消费者,新的消费者会重新拉取到QueueOffset从1到10的消息,重复消费后面的9条消息。
DefaultMQPushConsumer为缓解这种严重的重复消费消息问题加了一个配置consumeConcurrentlyMaxSpan,默认值2000,当消费客户端发现自己已经拉取超过了2000条消息没处理,就会触发流控,减缓拉取消息的速度,但是又有个屁用?:)
所以提交消费进度的问题和retry无限重试的问题,说明业务实现消费幂等性有多重要!窃以为这也是RocketMQ说的投递消息至少会投递一次,RocketMQ不保证不重复的原因。
我想了个解决方案,没实现,只是一个想法。主要想法是这样:Consumer端对每条处理中的消息都建一个定时task,例如定时10分钟后检查,检查消费进度有没有超过这条消息,如果超过说明这条消费成功了或者失败了,至少没有停在这里;如果消费进度没有超过这条消息,那就重新生成一条消息出来(形成重复消息),之所以生成消息是担心这条阻碍进度的消息,随consumer宕机而丢了。先把消费进度提交了再说,再想办法去处理那条生成出来的重复消息(如果前面的那条阻碍进度的消息没有丢,重复消费就过滤掉;如果被consumer丢了,就再消费那条重复消息)。每条消息都建定时task可能不靠谱,还是建定时任务来统一扫描consumer中的所有消息靠谱。
消费失败发生在RocketMQ master停机时会有什么问题?
有的消息处理的慢,有的消息处理的快,消费进度怎么提交,进度提交失败时会有什么问题?
以上是在思考的两个问题,欢迎拍砖指教。谢谢!
领取专属 10元无门槛券
私享最新 技术干货