上篇博文,我们介绍了什么是RocketMQ,以及如何安装单机版的RocketMQ。在安装的过程了,我们主要安装了两个服务,NameServer和Broker。在发送和接收消息时,又接触了两个概念,生产者和消费者。
那这些又代表什么含义呢?
对于单机版本的RocketMQ架构,如下图所示:
主要分为四部分:
Name Server充当路由消息的提供者。生产者或消费者能够通过Name Server查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
Broker Server负责存储消息、转发消息。Broker在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。
对于上面的学习,我们知道了RocketMQ的核心模块以及相应的概念。那么,RocketMQ都有哪些发送消息的方式呢,又如何使用,使用的场景是什么,又是如何消费的?
在项目中添加MQ客户端依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>x.x.x</version>
</dependency>
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
public class SyncProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
//2.指定Nameserver地址
producer.setNamesrvAddr("localhost:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//5.发送同步消息,将消息发送给其中一个broker
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//6.关闭生产者producer
producer.shutdown();
}
}
上面的案例中设计到两个陌生的概念,含义如下所示:
生产者组(Producer Group):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。 标签(Tag):为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//2.指定Nameserver地址
producer.setNamesrvAddr("localhost:9876");
//3.启动producer
producer.start();
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.发送异步消息,SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
//6.关闭生产者producer
producer.shutdown();
}
}
keys:Message索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息对消息关键字的提取方便查询。
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
public class OneWayProducer {
public static void main(String[] args) throws Exception, MQBrokerException {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//2.指定Nameserver地址
producer.setNamesrvAddr("localhost:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("TopicTest", "TagA", ("Hello World,单向消息" + i).getBytes());
//5.发送单向消息
producer.sendOneway(msg);
//线程睡1秒
TimeUnit.SECONDS.sleep(5);
}
//6.关闭生产者producer
producer.shutdown();
}
}
此时,RocketMQ中已经有我们需要发送的消息了,我们使用RocketMQ来消费队列中的消息。接收消息有两种模式:
启动多个消费者,最直接的区别:模式不同,消费的消息不同。
默认模式,消费者采用负载均衡方式消费消息,相同消费者组的每个消费者共同消费队列中的消息即每个Consumer实例平均分摊消息,每个消费者处理的消息不同。消费进度存储在服务端。
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-14 15:22
*
* 异步消息,同步消息,单向消息 - 消费者 - 负载均衡模式
*/
public class ClusteringConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
//订阅指定 Topic 下的所有消息
consumer.subscribe("TopicTest", "*");
//负载均衡模式,默认
consumer.setMessageModel(MessageModel.CLUSTERING);
// Register callback to execute on arrival of messages fetched from brokers.
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消息消费者已启动");
}
}
消费者采用广播的方式消费消息,相同Consumer Group的每个消费者消费的消息都是相同的。消费进度存储在消费者本地。
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-19 19:02
*
*
* 异步消息,同步消息,单向消息 - 消费者 - 广播模式
*/
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
//设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//订阅指定 Topic 下的所有消息
consumer.subscribe("TopicTest", "*");
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (msgs != null) {
for (MessageExt ext : msgs) {
try {
System.out.println(new Date() + ext.toString() + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消息消费者已启动");
}
}
消费者组(Consumer Group):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-19 11:17
*
* 顺序消息-生产者
*/
public class ProducerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = new ProducerInOrder().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < orderList.size(); i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("OrderTopic", tags[i % tags.length], "KEY" + i, body.getBytes());
//自定义消息队列选取规则
// SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
// @Override
// public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// Long id = (Long) arg; //根据订单id选择发送queue
// long index = id % mqs.size();
// return mqs.get((int) index);
// }
// }, orderList.get(i).getOrderId());//订单id
//SelectMessageQueueByHash,官方提供的选取规则,还有其他实现,大家自行发现
SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(), orderList.get(i).getOrderId());
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-19 11:28
*
* 顺序消息-消费者
*/
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("localhost:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("OrderTopic", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消息消费者已启动");
}
}
注意:MessageListenerOrderly是顺序消息监听器,每个队列只有一个线程消费。
普通顺序消息(Normal Ordered Message) 普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。 严格顺序消息(Strictly Ordered Message) 严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
应用场景:
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-19 14:48
*
* 延时消息 - 生产者
*/
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("DelayTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-19 14:45
*
*
* 延时消息 - 消费者
*/
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topics
consumer.subscribe("DelayTopic", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消息消费者已启动");
}
}
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18,消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-19 15:10
*
* 批量消息 - 生产者
*/
public class SimpleBatchProducer {
public static void main(String[] args) throws Exception {
//1、创建一个消息生产者,并设置一个消息生产者组
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
//2、指定 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
//3.启动producer
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTopic";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-19 15:12
*
* 批量消息 - 消费者
*/
public class SimpleBatchConsumer {
public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
//设置 TagFilterConsumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe("BatchTopic", "*");
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new Date() + ext.toString() + " 内容:" + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
}
}
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
public class TagFilterProducer {
public static void main(String[] args) throws Exception{
//创建一个消息生产者,并设置一个消息生产者组
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//指定 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
//初始化 ProducerInOrder,整个应用生命周期内只需要初始化一次
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
//创建一条消息对象,指定其主题、标签和消息内容
Message msg = new Message(
"FilterTagTopic" /* 消息主题名 */,
tags[i % tags.length] /* 消息标签 */,
("sync producer Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
);
//发送消息并返回结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-19 15:19
*
* MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
* 需要添加配置
* # 开启对propertyFilter的支持
* enablePropertyFilter = true
*
*/
public class TagFilterConsumer {
public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
//设置 TagFilterConsumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe("FilterTagTopic", "TagA || TagB");
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new Date() + ext.toString() + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消息消费者已启动");
}
}
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。
使用Tag有一定的局限性,也可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
常量支持类型为:
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
发送消息时,你能通过putUserProperty
来设置消息的属性
public class SqlFilterProducer {
public static void main(String[] args) throws Exception{
//创建一个消息生产者,并设置一个消息生产者组
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//指定 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
//初始化 ProducerInOrder,整个应用生命周期内只需要初始化一次
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
//创建一条消息对象,指定其主题、标签和消息内容
Message message = new Message(
"FilterSQLTopic" /* 消息主题名 */,
tags[i % tags.length] /* 消息标签 */,
("sync producer Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
);
message.putUserProperty("a", String.valueOf(i));
//发送消息并返回结果
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
}
// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}
用MessageSelector.bySql来使用sql筛选消息
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-19 15:19
* MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
* 需要添加配置
* # 开启对propertyFilter的支持
* enablePropertyFilter = true
*/
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
//设置 TagFilterConsumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
//订阅指定 Topic 下的所有消息
// consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("a between 0 and 3"));
// Don't forget to set enablePropertyFilter=true in broker
consumer.subscribe("FilterSQLTopic",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new Date() + ext.toString() + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消息消费者已启动");
}
}
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
事务消息共有三种状态,提交状态、回滚状态、中间状态:
使用 TransactionMQProducer
类创建生产者,并指定唯一的 ProducerGroup
,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。
public class Producer {
public static void main(String[] args) throws Exception{
//创建一个消息生产者,并设置一个消息生产者组
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
//指定 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
//初始化 ProducerInOrder,整个应用生命周期内只需要初始化一次
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TransTopic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException e) {
e.printStackTrace();
}
}
// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}
当发送半消息成功时,我们使用 executeLocalTransaction
方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation
方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务");
System.out.println("消息内容 :" + msg.getKeys() + new String(msg.getBody()));
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener
类来修改这个行为。transactionMsgTimeout
参数。我们对于Rocketmq发送、消费消息的方式进行了全方面的解析,并给出了相应的案例,消息生产者和消费者可以进行简单的抽象:
本文示例读者可以通过查看下面仓库的中的rocketmq-simple
项目: