消费者中我们注册了一个监听器回调函数,当Consumer获取消息后,就会交给我们的回调函数来进行处理。如果处理完了,就返回一个ConsumeConcurrentlyStatus.CONSUME_SUCCESS,提交这批消息的offset到broker去,然后继续从broker获取下一批消息来进行处理。 如果上面代码回调函数中,对一批消息处理的时候,数据库宕机了就不能再能返回CONSUME_SUCCESS,如果你返回的话,下一次就会处理下一批消息,但是这批消息其实没处理成功,此时必然导致这批消息就丢失了;
consumer.registerMessageListener(
new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
// 后续消息处理逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 比如因为数据库宕机了,没法对消息完成处理
// 此时可以返回一个稍后重试的消费状态
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
}
);
如果消费端消费失败的话,过段时间再次给我这批消息重试;
只有当消费模式为集群模式时,Broker 才会自动进行重试,对于广播消息是不会重试的;
RocketMQ会有一个针对你这个ConsumerGroup的重试队列,如果你返回了RECONSUME_LATER状态,他就会把你这批消息放到你这个消费组的重试队列中去,比如消费组是"WMSConsumerGroup",那么就会有一个“%RETRY%WMSConsumerGroup”,这个名字的重试队列;
然后过一段时间,重试队列中的消息会再次进行处理,如果再次失败,又返回了RECONSUME_LATER,那么会再过一段时间让我们再次进行处理,默认最多重试16次;每次重试之间的间隔时间是不一样的,这个间隔时间可以如下进行配置:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
意思就是,第一次重试是1秒后,第二次重试是5秒后,第三次重试是10秒后,第四次重试是30秒后,第五次重试是1分钟后,以此类推,最多重试16次;
刚才我们说如果你返回了RECONSUME_LATER,消息就会进入重试队列,其实不完全准确;
当MQ接收到RECONSUME_LATER后,首先会完成消息的转换,把消息存到延时队列中,然后再根据消息的延时时间保存到重试队列中;
如果重试了16次之后依然无法处理,就会把这些消费放入死信队列。死信队列中的消息RocketMQ不会再做处理,这部分数据要怎么处理就要看我们的业务场景了,我们可以做一个后台线程去订阅这个死信队列,完成后续消息的处理;
如果在16次重试范围内消息处理成功了,自然就没问题了;但是如果对一批消息重试了16次还是无法成功处理,就需要另外一个队列了,叫做死信队列,死信队列的名字是“%DLQ%WMSConsumerGroup”;
对死信队列中的消息处理,这个就看具体需求,比如可以专门开一个后台线程,订阅“%DLQ%WMSConsumerGroup”这个死信队列,对死信队列中的消息进行不停的重试;