@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}")
public class MsgListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
private org.slf4j.Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(MessageExt msg) {
logger.debug("RECEIVE_MSG_BEGIN: " + msg.toString());
logger.debug(String.format("消费消息,消息ID:%s,消息KEY:%s,消息体:%s ", msg.getMsgId(), msg.getKeys(), new String(msg.getBody())));
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setInstanceName("testTopic-tag1");
}
}
原因
多消费组实例的场景下,只配置了一个通用的name server配置,导致有些消费组
consumer.setInstanceName("testTopic-tag1");正常连接到b-name server,但是使用默认配置连到了a-name server上面,导致关系错乱,无法正常消费消息
consumer.setInstanceName("testTopic-tag2");也是被错误的连接到了a-name server
在初始化的时候
问题排查思路
RocketMQ中,如果不同消费组消费同一个Topic,理论上每个消费组应该只消费该Topic的消息一次。然而,确实有可能出现某个消费组偶尔消费不到消息的情况,这可能是由以下原因导致的:
为了解决这个问题,你可以按照以下步骤进行排查和处理:
- 确认消费组的配置和订阅关系是否正确。
- 检查消费者的消费代码,特别是消息过滤和Tag匹配的部分。
- 调整消费者的消费线程数和消费参数,以适应实际的负载和性能需求。
- 监控Broker的状态和网络连接,确保其正常运行。
本篇文章如有帮助到您,请给「翎野君」点个赞,感谢您的支持。