主题发布消息有一个前提,即需要有订阅者订阅主题,如果没有订阅者存在,那么主题中的消息不会被投递,此时发布消息这一操作就失去了意义。
1. 创建主题
endpoint='' //CMQ 的域名secretId ='' // 用户的 ID 和 keysecretKey = ''account = Account(endpoint,secretId,secretKey)topicName = 'TopicTest8B'my_topic = account.get_topic(topicName)topic_meta = TopicMeta()my_topic.create(topic_meta)
2. 发布消息
您可以通过 SDK 或控制台两种方式发布消息。
使用 SDK 发送消息
message = Message()message.msgBody = "this is a test message"my_topic.publish_message(message)
使用控制台发布消息
Topic 目前支持消息过滤,即消息标签、消息类型,用来区分某个 CMQ 的 Topic 下的消息分类。MQ 允许消费者按照标签对消息进行过滤,确保消费者最终只消费到他关心的消息类型。该功能默认不开启,未开启时,所有消息向所有订阅者发送;增加标签后,订阅者将仅能收到带该标签的信息。消息过滤标签描述了该订阅中消息过滤的标签(标签一致的消息才会被推送)。单个标签不超过16个字符,单个 Message 可最多添加5个标签。
Topic 目前支持标签过滤和 routingKey 过滤两种方式。过滤规则如下图所示:
批量发布消息:
vmsg = []for i in range(6):message = Message()message.msgBody = "this is a test message"vmsg.append(message)my_topic.batch_publish_message(vmsg)
3. 消息处理
主题发布消息之后,会自动将消息推送给订阅,当推送失败时,有两种重试策略:
退避重试:重试3次,间隔时间为10 - 20s之间的一个随机值,超过3次后,该条消息对于该订阅者丢弃,不会再重试。
衰退指数重试:重试176次,总计重试时间为1天,间隔时间依次为:2^0 , 2^1, …, 512, 512, …, 512秒。默认为衰退指数重试策略。
使用队列处理消息
订阅者可以填写一个 Queue,使用队列来接收发布的消息。
subscription_name = "subsc-test"my_sub = my_account.get_subscription(topic_name, subscription_name)subscription_meta = SubscriptionMeta()# 填写订阅名称,这里填写队列名称subscription_meta.Endpoint = "queue name "subscription_meta.Protocal = "queue"my_sub.create(subscription_meta)
使用其他方式处理消息
4. 路由匹配
Binding key 、Routing key 是组合使用的,兼容 rabbitmq topic 匹配模式。发消息时配的 Routing key 是客户端发消息带的, 必须为字符串,不能有匹配符。创建订阅关系时配的 Binding key 是 topic 和订阅者的绑定关系。
使用限制:
Binding key 的数量不超过5个。单个 binding key 的长度 ≤ 64字节,用于表示发送消息的路由路径,最多含有15个“.”,即最多16个词组。
Routing key 的数量由1个字符串组成。单个 Routing key 的长度 ≤ 64字节,用于表示发送消息的路由路径,最多含有15个“.”,即最多16个词组。
通配符说明:
*(星号):可以替代一个单词(一串连续的字母串)。
#(井号):可以匹配一个或多个字符。
rabbitmq的特例:routing_key为空字符串时,不匹配 * ,但可以匹配 #。
示例:
订阅者是『1.*.0』,此时消息为『1.任意字符.0』,则订阅者都能收到消息。
订阅者是『1.#.0』,此时消息为『1.2.3.4.4.2.2.0』,则订阅者都能收到消息(消息中间元素随意) 。
使用路由匹配功能
endpoint='' //CMQ 的域名secretId ='' // 用户的 ID 和 keysecretKey = ''account = Account(endpoint,secretId,secretKey)topicName = 'TopicTest'my_topic = account.get_topic(topicName)topic_meta = TopicMeta()topic_meta.filterType =2 //表示消息投递给订阅的时候采用路由匹配//若 filterType =1 则表示使用标签过滤my_topic.create(topic_meta)subscription_name = "subsc-test"my_sub = my_account.get_subscription(topic_name, subscription_name)subscription_meta = SubscriptionMeta()//填写订阅名称,这里填写队列名称subscription_meta.Endpoint = "queue name "subscription_meta.Protocal = "queue"subscription_meta.bindingKey=[1.*.0] //若消息的标签为[1.任意字符.0],则该订阅者都能收到该标签my_sub.create(subscription_meta)
发布消息
message = Message()message.msgBody = "this is a test message"message.routingKey = '1.test.0' //该消息会被投递到 my_sub 订阅的地址my_topic.publish_message(message)