操作背景
为了适用不同场景的需求,Pulsar 支持四种订阅模式:Exclusive、Shared、Failover、Key_Shared。
1. Exclusive 独占模式(默认模式):一个 Subscription 只能与一个 Consumer 关联,只有这个 Consumer 可以接收到 Topic 的全部消息,如果该 Consumer 出现故障了就会停止消费。
2. 共享模式(Shared):消息通过 round robin 轮询机制(也可以自定义)分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。
3. 灾备模式(Failover):当存在多个 consumer 时,将会按字典顺序排序,第一个 consumer 被初始化为唯一接受消息的消费者。当第一个 consumer 断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个 consumer。
4. KEY 共享模式(Key_Shared):当存在多个 Consumer 时,将根据消息的 Key 进行分发,Key 相同的消息只会被分发到同一个消费者。
本文以官网提供的 Demo 为例,介绍共享模式(Shared)订阅模式功能与使用方式。
操作步骤
步骤1. 在控制台新建 Pulsar 资源
1. 登录 TDMQ Pulsar 版控制台,创建一个集群和命名空间。
2. 在左侧导航栏选择 Topic 管理页签,当前集群和命名空间分别勾选创建好的集群和命名空间。
3. 单击新建,输入 Topic 名称和说明,其他选项可保持默认,单击保存,创建一个 topic。
4. 单击操作列的新增订阅,为刚新建好的 Topic 创建俩个订阅关系。
5. 单击操作列的更多 > 查看订阅/消费者,可看到刚创建好的订阅。
步骤2. 下载 Demo 并配置相关参数
1. 下载官方 Demo 并解压。
2. 修改 Constant.java 参数。
参数 | 说明 |
SERVICE_URL | 集群接入地址,可以在控制台集群管理页面查看并复制。 |
AUTHENTICATION | 角色的密钥,角色密钥可以在角色管理中复制。 |
步骤3. 生产消息
1. 进入/simple目录下,修改 SimpleProducer.java 参数。
topic:填写创建好的topic3名称,需要填入完整路径,即persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理页面直接复制。
2. 编译并运行 SimpleProducer.java 程序发送消息。运行结果如下,可以看到生产者已经向 topic 中生产了10条消息。
步骤4. 消费消息
1. 进入/submodle目录下,修改SharedConsumer1.java 和 SharedConsumer1.java 程序参数。
topic:填写创建好的 topic 名称,需要填入完整路径,即 persistent://clusterid/namespace/Topic,
clusterid/namespace/topic
的部分可以从控制台上 Topic 管理页面直接复制。
.subscriptionName:填写 topic 的订阅名称,可在 Topic 消费者界面查看。SharedConsumer1 中填写 sub1 订阅名称,SharedConsumer2 中填写sub2 订阅名称。
2. 编译并运行消息重试程序 SharedConsumer1.java 和 SharedConsumer2.java 。运行结果如下: