RabbitMQ的高级特性包括以下几个方面:
除了以上高级特性外,RabbitMQ还支持多种消息队列模式,如点对点队列、工作队列模式等,以满足不同场景下的需求。
接下来详细说一下上面的这些高级特性
之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。
消费端限流的写法如下:
生产者批量发送消息
// 批量发送消息
@Test
public void testSendBatch(){
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","我们已经第"+i+"天没有互发过消息了...");
}
}
消费端配置限流机制
spring:
rabbitmq:
host: 192.168.66.100
port: 5672
username: guest
password: guest
virtual-host: /
# 开启手动签收
listener:
simple:
acknowledge-mode: manual
# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
prefetch: 5
@RabbitListener(queues = "my_queue")
public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {
// 1.获取信息
System.out.println(new String(message.getBody()));
// 2.模拟业务处理
Thread.sleep(2000);
// 3.签收消息
//channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
执行结果:
如果没有签收的话是最多只能获取五条信息,一旦签收,也就是把注释去掉,那就一直接收消息。
在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。使用方法如下:
消费端配置不公平分发
spring:
rabbitmq:
host: 192.168.66.100
port: 5672
username: guest
password: guest
virtual-host: /
# 开启手动签收
listener:
simple:
acknowledge-mode: manual
# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
#prefetch: 5
# 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
prefetch: 1
设置两个消费者,一个接收慢,一个接受快:
// 消费者1
@RabbitListener(queues = "my_queue")
public void listenMessage1(Message message, Channel channel) throws InterruptedException, IOException {
// 1.获取信息
System.out.println("消费者1接收: "+new String(message.getBody()));
// 2.模拟业务处理(接收消息慢)
Thread.sleep(2000);
// 3.签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
// 消费者2
@RabbitListener(queues = "my_queue")
public void listenMessage2(Message message, Channel channel) throws InterruptedException, IOException {
// 1.获取信息
System.out.println("消费者2接收: "+new String(message.getBody()));
// 2.签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
执行结果如下,消费者2不用等待消费者1处理。
RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。
在创建队列时设置其存活时间:
// 创建队列
@Bean(QUEUE_NAME)
public Queue getMessageQueue(){
return QueueBuilder
.durable(QUEUE_NAME)
// 队列每条消息只能存活10s
.ttl(10000)
.build();
}
运行批量发送消息:执行效果如下:
这里截图慢了,过了十秒钟消息其实就已经丢弃了。
代码如下:
@Test
public void testMessage2(){
// 1.设置消息属性
MessageProperties messageProperties = new MessageProperties();
// 2.设置存活时间
messageProperties.setExpiration("10000");
// 3.创建消息
Message message = new Message("我们已经3天没有互发过消息了...".getBytes(StandardCharsets.UTF_8),messageProperties);
// 4.发送消息
rabbitTemplate.convertAndSend("my_topic_exchange","my_routing",message);
}
执行效果如下:
这里打开管控台还可以看得到消息
过了时间之后就看不到消息了
注意:
假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。优先级队列用法如下:
创建队列时:
// 创建队列
@Bean(QUEUE_NAME)
public Queue getMessageQueue(){
return QueueBuilder
.durable(QUEUE_NAME)
// 设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
.maxPriority(10)
.build();
}
编写发送消息,这里第三条消息优先级设置比别的消息高
@Test
public void testPriority(){
for (int i=0;i<10;i++){
if(i==3){
// 1.设置消息属性
MessageProperties messageProperties = new MessageProperties();
// 2.设置存活时间
messageProperties.setPriority(9);
// 3.创建消息
Message message = new Message("我们已经3天没有互发过消息了...".getBytes(StandardCharsets.UTF_8),messageProperties);
// 4.发送消息
rabbitTemplate.convertAndSend("priority_exchange","my_routing",message);
}
else{
rabbitTemplate.convertAndSend("priority_exchange","my_routing","我们已经"+i+"天没有互发过消息了...");
}
}
}
编写消费者
@Component
public class PriorityConsumer {
@RabbitListener(queues = "priority_queue")
public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {
System.out.println(new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
}
执行结果如下:
OK,确实也是先获取第三条消息。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。