前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ高级特性

RabbitMQ高级特性

原创
作者头像
会洗碗的CV工程师
发布2024-05-02 09:50:01
2130
发布2024-05-02 09:50:01
举报
文章被收录于专栏:消息中间件

RabbitMQ的高级特性包括以下几个方面:

  1. 消息的可靠投递:RabbitMQ提供了两种方式用来控制消息的投递可靠性模式,包括confirm确认模式和return退回模式。这确保了消息的发送方能够避免任何消息丢失或投递失败的情况。
  2. Consumer Ack:消费者手动应答模式。当消费者接收到消息并处理完成后,会向RabbitMQ发送一个ack,RabbitMQ只有接收到ack后,才会从队列中删除该消息。这种机制确保了消息的消费可靠性。
  3. 消费端限流:在某些情况下,消费者可能由于某些原因(如系统维护、服务停止等)无法及时处理消息,导致大量消息在MQ中累积。消费端限流机制可以帮助控制这种情况,防止消息过载。
  4. TTL(存活时间/过期时间):RabbitMQ可以为队列或消息设置TTL,一旦超过这个时间,消息就会被自动删除。这有助于避免消息在队列中长时间滞留。
  5. 死信队列:当消息在队列中因为某些原因(如消费者拒绝消息、消息TTL过期等)无法被正常消费时,它们会被发送到死信队列中。这有助于对无法处理的消息进行集中处理。
  6. 延迟队列:RabbitMQ支持延迟队列,可以在消息中设置延迟时间,让消息在指定的时间后被消费。这可以用于实现定时任务、延迟重试等功能。
  7. 日志与监控:RabbitMQ提供了丰富的日志和监控功能,可以实时查看队列的状态、消息的数量、消费者的连接情况等,帮助开发者更好地管理RabbitMQ。
  8. 消息追踪:RabbitMQ支持消息追踪功能,可以跟踪消息的整个生命周期,包括消息的发送、接收、处理等过程。这有助于开发者快速定位问题并解决问题。
  9. 消息可靠性保障——消息补偿:在生产端,RabbitMQ通过消息落库、延迟投递、回调检查等方式实现消息的可靠性投递。同时,通过完善的消息补偿机制,确保在消息投递失败或丢失时能够重新发送消息。

除了以上高级特性外,RabbitMQ还支持多种消息队列模式,如点对点队列、工作队列模式等,以满足不同场景下的需求。

接下来详细说一下上面的这些高级特性

一、消费端限流

之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

消费端限流的写法如下:

生产者批量发送消息

代码语言:javascript
复制
// 批量发送消息
@Test
public void testSendBatch(){
    for (int i=0;i<10;i++){
        rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","我们已经第"+i+"天没有互发过消息了...");
    }
}

消费端配置限流机制

代码语言:javascript
复制
spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 开启手动签收
    listener:
      simple:
        acknowledge-mode: manual
        # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
        prefetch: 5
代码语言:javascript
复制
@RabbitListener(queues = "my_queue")
public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {
    // 1.获取信息
    System.out.println(new String(message.getBody()));
    // 2.模拟业务处理
    Thread.sleep(2000);
    // 3.签收消息
    //channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}

执行结果:

如果没有签收的话是最多只能获取五条信息,一旦签收,也就是把注释去掉,那就一直接收消息。

二、利用限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。使用方法如下:

消费端配置不公平分发

代码语言:javascript
复制
spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 开启手动签收
    listener:
      simple:
        acknowledge-mode: manual
        # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
        #prefetch: 5
        # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
        prefetch: 1

设置两个消费者,一个接收慢,一个接受快:

代码语言:javascript
复制
// 消费者1
@RabbitListener(queues = "my_queue")
public void listenMessage1(Message message, Channel channel) throws InterruptedException, IOException {
    // 1.获取信息
    System.out.println("消费者1接收: "+new String(message.getBody()));
    // 2.模拟业务处理(接收消息慢)
    Thread.sleep(2000);
    // 3.签收消息
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}

// 消费者2
@RabbitListener(queues = "my_queue")
public void listenMessage2(Message message, Channel channel) throws InterruptedException, IOException {
    // 1.获取信息
    System.out.println("消费者2接收: "+new String(message.getBody()));
    // 2.签收消息
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}

执行结果如下,消费者2不用等待消费者1处理。

三、消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

3.1 设置队列所有消息存活时间

在创建队列时设置其存活时间:

代码语言:javascript
复制
// 创建队列
@Bean(QUEUE_NAME)
public Queue getMessageQueue(){
    return QueueBuilder
            .durable(QUEUE_NAME)
            // 队列每条消息只能存活10s
            .ttl(10000)
            .build();
}

运行批量发送消息:执行效果如下:

这里截图慢了,过了十秒钟消息其实就已经丢弃了。

3.2 设置单条消息存活时间

代码如下:

代码语言:javascript
复制
@Test
public void testMessage2(){
    // 1.设置消息属性
    MessageProperties messageProperties = new MessageProperties();
    // 2.设置存活时间
    messageProperties.setExpiration("10000");
    // 3.创建消息
    Message message = new Message("我们已经3天没有互发过消息了...".getBytes(StandardCharsets.UTF_8),messageProperties);
    // 4.发送消息
    rabbitTemplate.convertAndSend("my_topic_exchange","my_routing",message);
}

执行效果如下:

这里打开管控台还可以看得到消息

过了时间之后就看不到消息了

注意:

  1. 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。
  2. 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

四、优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。优先级队列用法如下:

创建队列时:

代码语言:javascript
复制
// 创建队列
@Bean(QUEUE_NAME)
public Queue getMessageQueue(){
    return QueueBuilder
            .durable(QUEUE_NAME)
            // 设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
            .maxPriority(10)
            .build();
}

编写发送消息,这里第三条消息优先级设置比别的消息高

代码语言:javascript
复制
@Test
public void testPriority(){
    for (int i=0;i<10;i++){
        if(i==3){
            // 1.设置消息属性
            MessageProperties messageProperties = new MessageProperties();
            // 2.设置存活时间
            messageProperties.setPriority(9);
            // 3.创建消息
            Message message = new Message("我们已经3天没有互发过消息了...".getBytes(StandardCharsets.UTF_8),messageProperties);
            // 4.发送消息
            rabbitTemplate.convertAndSend("priority_exchange","my_routing",message);
        }
        else{
            rabbitTemplate.convertAndSend("priority_exchange","my_routing","我们已经"+i+"天没有互发过消息了...");
        }
    }
}

编写消费者

代码语言:javascript
复制
@Component
public class PriorityConsumer {
    @RabbitListener(queues = "priority_queue")
    public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {
        System.out.println(new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
}

执行结果如下:

OK,确实也是先获取第三条消息。

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、消费端限流
  • 二、利用限流实现不公平分发
  • 三、消息存活时间
    • 3.1 设置队列所有消息存活时间
      • 3.2 设置单条消息存活时间
      • 四、优先级队列
      相关产品与服务
      消息队列
      腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档