同一个程序当中不能定义重复的消费组。
当多个消费者同时订阅了同一个 topic,则接受消息时会跟广播模式类似。
如果后注册的消费者订阅同一个 topic 且不在一个消费组,在程序启动时由于需要同步 offset,后注册的消费者会消费之前的消息直到 offset 同步。
多实例的消息负载机制是通过 queue 来控制的。每多一个订阅同一个 topic,同一个消费组的实例,就会被分配到 queue,消息通过轮训 queue 来达到负载效果,如果实例数超过 queue 的数量,则会有一些实例分配不到queue。
SendResult syncSend(String destination, Message<?> message)
SendResult syncSend(String destination, Message<?> message, long timeout)
SendResult syncSend(String destination, Collection messages)
SendResult syncSend(String destination, Collection messages, long timeout)
SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel)
SendResult syncSend(String destination, Object payload)
SendResult syncSend(String destination, Object payload, long timeout)
同步发送支持以上方法发送:
指定发送的 topic 或者 tag,其他 topic 和 tag 以 : 分割。eg:destination = "testTopic:test";
tag 可省略。
简单构造 message,Message msg = new Message("testTopic","hello".getBytes()); 也可以使用 MessageBuilder 构造消息(这个类型是 org.springframework.messaging.Message)。
同时解释一下其他构造方法中参数的意思:
tags:表示消息的标签,消费者在消费时,可以根据标签进行过滤,需要注意的是,一个生产者,只能指定一个tag
keys:用于建立索引,之后可以通过命令工具/API/或者管理平台查询key,可以为一个消息设置多个key,用空格""进行分割
waitStoreMsgOK:表示发送消息后,是否需要等待消息同步刷新到磁盘上。如果broker配置为ASYNC_MASTER,那么只需要消息在master上刷新到磁盘即可;如果配置为SYNC_MASTER,那么还需要等待slave也刷新到磁盘。需要注意的是,waitStoreMsgOK默认为false,只有将设置为true的情况下,才会等待刷盘成功再返回。
发送超时时间。单位是 毫秒/millis。
延时级别,支持 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,级别从 1 - 18 个级别。
发送内容,可以为任意类型。
void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,int delayLevel)
void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout)
void asyncSend(String destination, Message<?> message, SendCallback sendCallback)
void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
void asyncSend(String destination, Object payload, SendCallback sendCallback)
发送回调,需要实现 onSuccess 和 onException 方法。
void sendOneWay(String destination, Message<?> message)
void sendOneWay(String destination, Object payload)
不关心发送结果。
以上三种方式都有相应的顺序发送 API,分别是 syncSendOrderly、asyncSendOrderly、sendOneWayOrderly。其中多了个 String hashKey 的参数,通过这个选择使用哪个 topic queue,默认的选择算法是根据参数的 hash 值来选择哪个队列的,其中 topic 默认有 4 个 queue。
使用多个 hashKey 发送时,在每个 queue 中是有序的,如果想要全局有序,那么只使用一个 hashkey发送就行。
在单机的情况下,一个消费者同时只会消费一条消息,但是在不同的 queue 时会使用不同的线程去消费。
同时需要消费者的 RocketMQMessageListener 注解配置 CONCURRENTLY 要改成 consumeMode = ConsumeMode.ORDERLY。
同步和异步发送可以通过设置 delayLevel 达到延时发送的效果。
同步发送选择带 Collection messages 参数的 API 发送,批量发送消息大小不应该超过 4M。
RocketMQ 支持 TAG 和 SQL92 两种过滤模式。
tag 模式下,只需要发送的时候指定 tag,然后在消费端指定 selectorExpression 的表达式(tagA || tagB)就可以接收指定的 tag 消息。
而 SQL92 使用起来有些条件,只有在 push 模式下才能使用当前模式,在消息发送时需要在消息的头部指定发送条件,然后消费端使用 SQL92 模式表,broker 必须开启 enablePropertyFilter 。在conf的 broker.conf 增加 enablePropertyFilter = true
一行,然后运行 broker 的时候必须指定 配置文件。-c .\conf\broker.conf
。表达式支持以下语法:
数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415;
字符,比如:'abc',必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE
eg:
Message<String> build = MessageBuilder.withPayload("haha").setHeader("a", 1).build();
rocketMqTemplate.syncSend("testTopic", build);
@RocketMQMessageListener(consumerGroup = "testGroup", topic = "testTopic",selectorType = SelectorType.SQL92,selectorExpression = "a = 1")
TransactionSendResult sendMessageInTransaction(final String destination,final Message<?> message, final Object arg)
事务消息共有三种状态,提交状态、回滚状态、中间状态:
TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。 TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。 TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
使用限制:
在消费者中的 messageModel 改成 BROADCASTING 广播模式。
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。