配置共享订阅

最近更新时间:2026-01-09 11:25:01

我的收藏
共享订阅是一种负载均衡的消费方式,属于同一个共享订阅组的多个客户端以轮询方式分配并接收订阅到的消息。因此,通常也称为消费者负载均衡。
例如下图中,发布者发布了4条消息:M1、M2、M3、M4。客户端 Client-0 和 Client-1 以共享订阅的方式消费订阅的主题消息。这与默认的 MQTT Pub/Sub 不同,Client-0 仅消费到了 M1, M3; Client-1 仅消费到了 M2, M4

在标准的 MQTT 协议中,共享订阅(Shared Subscription)是 MQTT 5.0 新增的功能,但是腾讯云消息队列 MQTT 版在服务端做了功能增强,除了 5.0 版本的客户端外,同时支持 3.1 和 3.1.1 版本的客户端,3.1 和 3.1.1 版本的客户端直接按照共享订阅的使用规范进行订阅即可。
在使用共享订阅前,需要在页面先创建共享订阅组(ShareName),否则客户端在订阅时会订阅失败。

约束与限制

单集群下最多支持创建20个共享订阅组;单个共享订阅组中订阅表达式数量上限为10个;单个共享订阅 Group 订阅和取消订阅请求上限总和 TPS 为10;单个共享订阅组下的客户端数量为1024个;达到限额后,客户端订阅消息请求 (SUBSCRIBE) 报错。若铂金版集群对于限额有特殊需求,可以提交工单联系我们。

前提条件

使用共享订阅方式订阅消息

步骤1:创建共享订阅组

1. 登录 MQTT 控制台
2. 在左侧导航栏单击 资源管理 > 集群管理,选择好地域后,单击目标集群的“ID”,进入集群基本信息页面。
3. 选择共享订阅组页签,单击 新建 创建共享订阅组,根据要求在弹窗中填写如下字段:
​​参数项​​
​​说明​​
​​示例值​​
共享订阅组名称
设置共享订阅组名称,需符合命名规则:不能为空,1-64个字符,支持字母、数字。
order_processing_group
负载均衡策略
○ 随机(默认策略,即 random):在不同的订阅客户端间随机分发负载。
○ 分区哈希:在一定程度上保证消息负载时的消息顺序性。选择该模式后,需要另外填写负载均衡生效的时间。即新的消费者客户端加入后,要在经过指定时延后,才会加入到负载均衡策略中。(当前默认实现 Topic-hash,即共享订阅中优先保证同一Topic下的消息顺序,若您需要 ClientID-hash请提交工单联系我们)
随机
描述
填写共享订阅组的说明信息,选填
订单处理多客户端负载
4. 单击 提交 后创建完成。


步骤2:在客户端配置共享订阅

使用方式

当期望以共享订阅的方式订阅消息时,订阅 topic-filter 以如下方式配置:
$share/{ShareName}/{TopicFilter}
参数
说明
$share
协议指定的使用共享订阅时的标记,固定字符串。
{ShareName}
在控制台创建的共享订阅组的名称,不能包含“ / ”,“ + ” , “ # ”。
{TopicFilter}
客户端正常订阅时使用的 Topic Filter,与 MQTT TopicFilter 要求 和语义相同。

示例代码

package org.apache.rocketmq.mqtt.example.quickstart;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class SharedSubscriptionQuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
String serverUri = "tcp://127.0.0.1:1883";
String clientId = "shared-sub-0";
try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {
client.setTimeToWait(3000);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("YOUR-USERNAME");
options.setPassword("YOUR-PASSWORD".toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
client.connect(options);
int total = 1;
CountDownLatch latch = new CountDownLatch(total);

client.setCallback(new MqttCallback() {
public void messageArrived(String topic, MqttMessage message) {
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n", topic, message.getQos(),
new String(message.getPayload()));
latch.countDown();
}

public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}

public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete: " + token.isComplete());
}
});
// 共享订阅表达式
// {ShareName} 这里为 Group0
// {topic-filter} 这里为 home/#
String topic = "$share/Group0/home/#";

// Subscribe
client.subscribe(topic, 1);
TimeUnit.HOURS.sleep(1);
client.disconnect();
}
}
}

共享订阅离线消息保留策略

${ShareName}下如果存在有效的Session订阅${TopicFilter}, 符合${TopicFilter}的离线消息就会保留,订阅者再次上线时,会从上次进度恢复消费。

Session 有效期

MQTT 3.1、3.1.1 通过clean-session定义Session的生命周期。 当 clean-session = true, Session 的生命周期与传输层生命周期一致。 当 clean-session = false, Session 与传输层生命周期无关。为避免资源浪费,产品定义传输层断开后,Session 最大存续 3 天。


按照 MQTT 5.0 协议,有以下等价语义:
MQTT 3.1、3.1.1
MQTT 5
clean-session = true
clean-start = true
session-expiry-interval = 0
clean-session = false
clean-start = false
session-expiry-interval = 259200

查看共享订阅组

1. 登录 MQTT 控制台
2. 在左侧导航栏单击资源管理 > 集群管理,选择好地域后,单击目标集群的“ID”,进入集群基本信息页面。
3. 选择共享订阅管理页签,即可查看和管理当前集群的共享订阅组(ShareName)列表。
说明:
为了便于客户使用,列表页会自动展示当前集群已存在的共享订阅组的信息,即如果客户端使用了$share/{ShareName}/{topicfilter}进行共享订阅,则 ShareName 会自动出现在共享订阅组的列表,默认的负载均衡策略为随机策略。
4. 单击具体的共享订阅组名称进入查看共享订阅组的详情,如下图所示。详情页面会展示当前共享订阅的详情,当前共享订阅组(ShareName)下的订阅表达式(filter)以及点开后展示对应 filter 下的客户端和客户端状态。
说明:
在保证共享订阅消息负载的顺序性时,即负载均衡模式选择为“ 分区哈希”,客户端的在线状态会引入负载均衡时延的影响。如果客户端 session 已经不存在,只要在负载均衡的时效内,依然会展示为“在线”状态;客户端连接断开时间超出负载均衡生效时延后会展示为“离线”状态。


编辑共享订阅组

为了避免策略过渡期间出现消息重复或顺序性无法保证的问题,共享订阅组创建成功后,不支持修改共享订阅组名称和负载均衡策略。如果需要修改一个共享订阅组的负载均衡策略,可以在暂停客户端并删除共享订阅组后,重新创建并配置不同负载均衡策略的共享订阅组。
1. 登录 MQTT 控制台
2. 在左侧导航栏单击资源管理 > 集群管理,选择好地域后,单击目标集群的“ID”,进入集群基本信息页面。
3. 选择共享订阅管理页签,即可查看和管理当前集群的共享订阅组(ShareName)列表。
4. 在共享订阅组列表的操作列,单击编辑进行共享订阅组信息的修改。
如果负载均衡策略是“随机”,则仅能修改描述部分。
如果负载均衡策略是“分区哈希”,则可以修改描述和负载均衡生效时间。

删除共享订阅组

删除共享订阅组时,需谨慎评估对线上业务的影响。MQTT 会对共享订阅组是否存在有效订阅进行校验。为避免意外中断服务,建议确保所有相关客户端 Session 均已下线后再执行删除操作。
1. 登录 MQTT 控制台
2. 在左侧导航栏单击资源管理 > 集群管理,选择好地域后,单击目标集群的“ID”,进入集群基本信息页面。
3. 选择共享订阅管理页签,即可查看和管理当前集群的共享订阅组(ShareName)列表。
4. 在共享订阅组列表的操作列,单击删除,在弹窗中二次确认删除后即可完成删除。