Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?

RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?

作者头像
程序新视界
发布于 2020-12-01 02:55:02
发布于 2020-12-01 02:55:02
5.1K00
代码可运行
举报
文章被收录于专栏:丑胖侠丑胖侠
运行总次数:0
代码可运行

针对以上问题,有两个场景:使用阿里云的云服务器的RocketMQ和使用自己搭建的RocketMQ。但无论采用这两种的任何一种,都是可以在同一个topic下,通过tag来进行业务区分的。

网上有很多分析相关使用方式的文章,虽然分析的结果都是“不可以”,但我们可以通过其他的一些方案来进行解决。

自主搭建的RocketMQ

通过自主搭建RocketMQ,然后通过SpringBoot进行集成实现,可以参考在公众号【程序新视界】中的文章《Spring Boot快速集成RocketMQ实战教程》,可关注公众号搜索,也可以关注公众号之后回复“1003”,完整的实战步骤。

这里我们只摘取其中消费者的部分代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC
        , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED
        , selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED)
public class MqRegisteredListenerDemo implements RocketMQListener<String> {

    private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);

    @Override
    public void onMessage(String message) {
        log.info("received registered message: {}", message);
    }
}

这是其中一个消费者,消费的topic为MqTopicConstant.DEMO_TOPIC,consumerGroup为REGISTERED的,tag便是selectorExpression指定的REGISTERED的tag。

针对同一的topic,另外一个tag的消费者的实现如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC
        , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_MODIFY
        , selectorExpression = MqTopicConstant.DEMO_TAG_MODIFY)
public class MqModifyListenerDemo implements RocketMQListener<String> {

    private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);

    @Override
    public void onMessage(String message) {
        log.info("received modify message: {}", message);
    }
}

我们可以看到topic是同一个,但consumerGroup和tag不同。这说明什么?这说明只要消费者的consumerGroup不同,那么topic相同的情况下,也可以通过tag进行区分的。

关于其他源码就不再这里贴出了,详情可关注公众号看对应文章。

基于云服务的RocketMQ

基于云服务的RocketMQ与自主搭建的基本一致,我们只要确保groupId(阿里云的叫法)不同,那么同一topic下的tag是可以进行区分处理的。

具体处理这里也只贴出部分代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class ConsumerClient {

    @Resource
    private MqConfigProperties mqConfigProperties;

    @Resource
    private EquipmentMessageListener equipmentMessageListener;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        // 配置文件
        Properties properties = mqConfigProperties.getMqProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfigProperties.getGroupId());
        // 将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        // 订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();

        // --------业务板块开始--------
        Subscription subscription = new Subscription();
        // 设置需要消费的消息所属的topic
        subscription.setTopic(MqConfigProperties.getInnerTopic());
        // 设置需要消费的消息所属的tag
        subscription.setExpression(MqConfigProperties.getEquipmentMonitorTag());
        // 实现MessageListener接口,并且在consume方法中实现消费逻辑
        subscriptionTable.put(subscription, equipmentMessageListener);
        //订阅多个topic如上面设置
        // --------业务板块结束--------

        // 将订阅者消息放入consumerBean中,在Spring初始加载该bean时,监听MQ中的Topic和tag下的消息
        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
}

在上面的代码中,重点是业务板块部分的代码,如果在订阅关系中重新将业务板块内的代码copy一份,然后修改对应的Expression值(也就是tag值),那么基本上是不会成功的。

往往发送大量消息,只能够收到一部分。其他的会被覆盖掉。当然,如果你想采用不同的topic来处理,只需将业务板块中的内容重新修改,添加到subscriptionTable中即可。

那么,如何解决标题中的问题呢?思路与第一种方案一样,阿里云这里只是创建了一个ConsumerBean,而上面的自主搭建时采用了多个Consumer。那么解决方案就是:初始化多个ConsumerBean,每个ConsumerBean中的配置不同的groupId和tag,同时注册不同的监听器。

如此一来,就可以监听一个topic下的不同tag了。

原理分析

两个一样的ConsumerGroup的Consumer订阅同一个Topic,但是是不同的tag,Consumer1订阅Topic的tag1,Consumer2订阅Topic的tag2,然后分别启动。这时候往Topic的tag1里发送10条数据,Topic的tag2里发送10条。目测应该是Consumer1和Consumer2分别收到对应的10条消息。结果却是只有Consumer2收到了消息,而且只收到了4-6条消息,不固定。

这种现象的原因是:消息的分配是Broker决定的,而不是Consumer端,Consumer端发心跳给Broker,Broker收到后存到consumerTable里(就是个Map),key是GroupName,value是ConsumerGroupInfo。ConsumerGroupInfo里面是包含topic等信息的,但是问题就出在上一步骤,key是groupName,同GroupName的话Broker心跳最后收到的Consumer会覆盖前者的。

这样同key,肯定产生了覆盖。所以Consumer1不会收到任何消息,但是Consumer2为什么只收到了一半(不固定)消息呢?

那是因为:集群模式消费,它会负载均衡分配到各个节点去消费,所以一半消息(不固定个数)跑到了Consumer1上,结果Consumer1订阅的是tag1,所以不会任何输出。

如果换成BROADCASTING,那后者会收到全部消息,而不是一半,因为广播是广播全部Consumer。

如果还有其他相关问题,也可关注公众号“程序新视界”,相互沟通学习。

原文链接:《RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/11/26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
重学SpringBoot3-集成RocketMQ(二)
今天介绍下如何在 Spring Boot 3 中与 RocketMQ 整合实现分布式事务。RocketMQ 提供了类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA 是一种分布式事务解决方案,一种分布式事务处理模式。下面详细介绍下 RocketMQ 如何实现事务消息。
CoderJia
2024/10/18
1590
重学SpringBoot3-集成RocketMQ(二)
RocketMQ为什么要保证订阅关系的一致性?
前段时间有个朋友向我提了一个问题,他说在搭建 RocketMQ 集群过程中遇到了关于消费订阅的问题,具体问题如下:
张乘辉
2019/07/30
1.9K0
Rocketmq整合Spring中的推消费和litepull消费
RocketMQ整合Spring的一个项目在apache中可以看到是rocketmq-spring。其提供了整合spring的方便使用方式。
路行的亚洲
2023/08/31
8940
Rocketmq整合Spring中的推消费和litepull消费
聊聊rocketmq的订阅关系
org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
code4it
2023/05/08
4101
spring boot使用rocketmq
org.apache.rocketmq.spring.core.RocketMQTemplate.doSent
路过君
2020/06/24
3.5K0
RocketMQ生产消费指南
RocketMQ是一款可靠性非常强的一款消息中间件,概念相比如RabbitMQ来讲也相对简单,只有一个生产消费的概念并不涉及多种消费订阅模式.
姜同学
2022/10/27
4660
RocketMQ生产消费指南
RocketMQ(四):重复消费、消息重试、死信消息的解决方案
冬天vs不冷
2025/01/21
1.1K0
RocketMQ(四):重复消费、消息重试、死信消息的解决方案
RocketMQ 源码分析 —— 集成 Spring Boot
在开始分享 https://github.com/apache/rocketmq-spring 项目(RocketMQ 集成到 Spring Boot 中),我们先恶趣味的看一段历史:
芋道源码
2020/05/19
2.1K0
RocketMQ 源码分析 —— 集成 Spring Boot
Docker 安装 RocketMQ 并结合 SpringBoot 使用实例
在之前的《浅入浅出消息队列》一文中,我们了解了消息队列的作用、优缺点和使用场景,相信你对消息队列已经有了一个大致的概念,文末给自己埋的坑说日后会写一篇实战教程,正好现在实习结束了,也许久没有写实战教程了,于是这就来填坑了。
出其东门
2020/11/03
1.5K0
Docker 安装 RocketMQ 并结合 SpringBoot 使用实例
RocketMQ多个消费组消费同一个topic,其中有一个组正常消费,其余均异常堆积
RocketMQ中,如果不同消费组消费同一个Topic,理论上每个消费组应该只消费该Topic的消息一次。然而,确实有可能出现某个消费组偶尔消费不到消息的情况,这可能是由以下原因导致的:
翎野君
2024/11/21
2650
详解RocketMQ不同类型的消费者
根据使用者对读取操作的控制情况,分为两种类型。一个是DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;另一个是DefaultMQPullConsumer,读取操作中的大部分功能由使用者自主控制。 1.DefaultMQPushConsumer的使用 使用DefaultMQPushConsumer主要是设置好各种参数和传入处理消息的函数。系统收到消息后自动调用处理函数来处理消息,自动保存Offset,而且加入新的DefaultMQPushConsumer后会自动做负载均衡。下面结合org.apache.rocketmq.example.quickstart包中的源码来介绍。 代码清单1-1 DefaultMQPushConsumer示例 public class QuickStart {
全栈程序员站长
2021/06/10
8300
RocketMQ学习-消息发布和订阅
前面一篇文章分析了broker的启动过程,浏览了broker的基本功能。接下来的几篇文章,准备按照十分钟入门RocketMQ一文中提到的一系列特性,依次进行学习。这篇文章准备分析RocketMQ作为MQ的最基本的功能:消息的发布(publish)和订阅(subscribe)。首先,我参考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控这篇文章完成了一个简单的例子。
阿杜
2018/08/06
6K0
RocketMQ学习-消息发布和订阅
重学SpringBoot3-集成RocketMQ(一)
Spring Boot 3 与 RocketMQ 整合,可以通过 Spring Messaging 结合 RocketMQ 的 rocketmq-spring-boot-starter 实现。在这个整合过程中,RocketMQ 作为消息队列系统,Spring Boot 负责提供应用框架,整合可以让开发者更加便捷地使用 RocketMQ 的生产和消费功能。今天就先介绍下SpringBoot3整合RocketMQ5.x,并给出常见消息类型代码示例。
CoderJia
2024/10/18
1.1K0
重学SpringBoot3-集成RocketMQ(一)
RocketMQ消息丢失如何排查?
其实借助RocketMQ-Dashboard就能高效的排查,里面有很多你想象不到的功能
Java识堂
2022/04/06
2.5K0
RocketMQ消息丢失如何排查?
RocketMQ详解(12)——RocketMQ的重试机制
由于MQ经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响。所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持。
张申傲
2020/09/03
6.8K0
RocketMQ详解(13)——RocketMQ的消息模式
先启动2个Consumer程序,再启动单个Producer程序,两个Consumer端控制台输出如下:
张申傲
2020/09/03
2.6K0
【RocketMq实战第四篇】不同类型消费者DefaultMQPushConsumerDefaultMQPullConsumer
前言 生产者和消费者是消息队列的两个重要角色,生产者向消息队列写人数据, 消费者从消息队列里读取数据。本篇讲解两种类型的消费者,一个是 DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传人的 处理方法来处理;另 一个是 DefaultMQPullConsumer,读取操作中的大部分功 能由使用者自主控制 。
胖虎
2019/06/26
3.1K0
【RocketMq实战第四篇】不同类型消费者DefaultMQPushConsumerDefaultMQPullConsumer
芋道 Spring Boot 消息队列 RocketMQ 入门
如果胖友还没了解过分布式消息队列 Apache RocketMQ ,建议先阅读下艿艿写的 《芋道 RocketMQ 极简入门》 文章。虽然这篇文章标题是安装部署,实际可以理解成《一文带你快速入门 RocketMQ》,哈哈哈。
芋道源码
2020/05/19
3.3K0
【RocketMQ】004-Spring Boot 集成 RocketMQ
https://juejin.cn/post/7220075270664405052
訾博ZiBo
2025/01/06
3850
【RocketMQ】004-Spring Boot 集成 RocketMQ
Spring boot集成RocketMQ
之前安装好了RocketMQ,这一篇就简单记录一下Spring boot是怎么集成RocketMQ的,如果有需要安装RocketMQ的同学看这一篇,Linux在线安装RocketMQ,如果没有linux环境的同学也可以本地启动,只需要有java环境即可。
余生大大
2022/11/02
1.6K0
相关推荐
重学SpringBoot3-集成RocketMQ(二)
更多 >
领券
💥开发者 MCP广场重磅上线!
精选全网热门MCP server,让你的AI更好用 🚀
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验