前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ消息的发布确认机制详解

RabbitMQ消息的发布确认机制详解

作者头像
九转成圣
发布2024-06-05 14:18:53
1870
发布2024-06-05 14:18:53
举报
文章被收录于专栏:csdncsdn

RabbitMQ发布确认机制确保消息从生产者成功传输到交换机和队列,提高系统可靠性。在Spring Boot项目中,通过配置publisher-confirm-typepublisher-returns,启用发布确认和消息返回机制。配置RabbitTemplate的确认回调和返回回调,可以捕捉消息传输状态,处理不同传输结果。测试场景包括消息无法到达交换机、消息到达交换机但无法到达队列以及消息成功到达队列。通过合理设置和优化,可以确保高并发环境下的消息可靠传输,适用于金融支付、电商系统等对消息传输可靠性要求高的场景。

1. RabbitMQ发布确认机制概述

发布确认(Publisher Confirms)是RabbitMQ提供的一种机制,用于确保消息从生产者发送到RabbitMQ服务器并被成功处理。与事务机制不同,发布确认的性能开销更小,非常适合高吞吐量的场景。发布确认机制提供了两种类型的确认:

  • 消息到达交换机(Exchange)后的确认
  • 消息从交换机路由到队列(Queue)后的确认

2. 配置文件中添加发布确认相关配置

在Spring Boot项目中,通过配置文件来启用发布确认机制非常方便。以下是需要添加到application.propertiesapplication.yml中的配置:

代码语言:javascript
复制
# 消息到达交换机后会回调发送者
spring.rabbitmq.publisher-confirm-type=correlated
# 消息无法路由到队列时回调发送者
spring.rabbitmq.publisher-returns=true

配置解释:

  • publisher-confirm-type:设置为correlated表示使用CorrelationData来关联确认与发送的消息。
  • publisher-returns:设置为true表示启用消息返回机制,当消息无法路由到队列时会触发回调。

3. 发布确认类型

在Spring AMQP中,发布确认类型通过ConfirmType枚举类来定义:

代码语言:javascript
复制
public enum ConfirmType {
    SIMPLE,     // 使用 RabbitTemplate#waitForConfirms() 或 waitForConfirmsOrDie()
    CORRELATED, // 使用 CorrelationData 关联确认与发送的消息
    NONE        // 不启用发布确认
}

4. 配置RabbitTemplate

为了使用发布确认机制,需要配置RabbitTemplate,包括设置确认回调和返回回调:

代码语言:javascript
复制
@Slf4j
@Configuration
public class RabbitTemplateConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);

        // 设置mandatory为true,当找不到队列时,broker会调用basic.return方法将消息返还给生产者
        rabbitTemplate.setMandatory(true);

        // 设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息已经到达Exchange");
            } else {
                log.info("消息没有到达Exchange");
            }
            if (correlationData != null) {
                log.info("相关数据:" + correlationData);
            }
            if (cause != null) {
                log.info("原因:" + cause);
            }
        });

        // 设置返回回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息无法到达队列时触发");
            log.info("ReturnCallback:     " + "消息:" + message);
            log.info("ReturnCallback:     " + "回应码:" + replyCode);
            log.info("ReturnCallback:     " + "回应信息:" + replyText);
            log.info("ReturnCallback:     " + "交换机:" + exchange);
            log.info("ReturnCallback:     " + "路由键:" + routingKey);
        });

        return rabbitTemplate;
    }
}

5. 配置测试交换机和队列

为了测试发布确认机制,我们需要配置相应的交换机和队列:

代码语言:javascript
复制
@Slf4j
@Configuration
public class ConfirmConfig {

    @Bean
    public Queue confirmQueue() {
        return new Queue(Constant.CONFIRM_QUEUE, false);
    }

    @Bean
    DirectExchange confirmExchange() {
        DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, false, false);
        directExchange.addArgument("alternate-exchange", Constant.CONFIRM_BACKUP_EXCHANGE);
        return directExchange;
    }

    @Bean
    Binding bindingConfirm() {
        return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(Constant.CONFIRM_ROUTING_KEY);
    }

    @Bean
    FanoutExchange backupExchange() {
        return new FanoutExchange(Constant.CONFIRM_BACKUP_EXCHANGE, false, false);
    }

    @Bean
    public Queue backupQueue() {
        return new Queue(Constant.CONFIRM_BACKUP_QUEUE, false);
    }

    @Bean
    public Queue warningQueue() {
        return new Queue(Constant.CONFIRM_WARNING_QUEUE, false);
    }

    @Bean
    Binding bindingConfirmBackup() {
        return BindingBuilder.bind(backupQueue()).to(backupExchange());
    }

    @Bean
    Binding bindingConfirmWarning() {
        return BindingBuilder.bind(warningQueue()).to(backupExchange());
    }
}

6. 测试场景及分析

6.1 消息无法到达交换机

测试代码:

代码语言:javascript
复制
@Autowired
RabbitTemplate rabbitTemplate;
String msg = "一条用于发布确认的消息";

@GetMapping("/noExchange")
public void noExchange() {
    rabbitTemplate.convertAndSend("noExchange", "noExchange", msg);
}

配置了rabbitTemplate.setMandatory(true),当消息无法到达交换机时会回调:

代码语言:javascript
复制
ConfirmCallback 消息没有到达Exchange
ConfirmCallback 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noExchange' in vhost '/', class-id=60, method-id=40)
6.2 消息到达交换机但无法到达队列

测试代码:

代码语言:javascript
复制
@GetMapping("/toExchange")
public void toExchange() {
    rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, "xxx.xxx.xxx", msg);
}

输出:

代码语言:javascript
复制
ConfirmCallback 消息已经到达Exchange

没有收到无法到达队列的消息,是因为配置了备份队列,消息被路由到了备份队列。

6.3 注掉备份队列再试

修改配置:

代码语言:javascript
复制
@Bean
DirectExchange confirmExchange() {
    DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, true, false);
    return directExchange;
}

测试结果:

代码语言:javascript
复制
消息无法到达队列时触发
ReturnCallback:     消息:(Body:'一条用于发布确认的消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback:     回应码:312
ReturnCallback:     回应信息:NO_ROUTE
ReturnCallback:     交换机:myConfirmExchange
ReturnCallback:     路由键:xxx.xxx.xxx
ConfirmCallback 消息已经到达Exchange

此时,ConfirmCallbackReturnCallback都被调用了。

6.4 成功到达队列

测试代码:

代码语言:javascript
复制
@GetMapping("/toQueue")
public void toQueue() {
    rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, Constant.CONFIRM_ROUTING_KEY, msg);
}

输出:

代码语言:javascript
复制
ConfirmCallback 消息已经到达Exchange

7. 发布确认流程

下图展示了RabbitMQ发布确认流程:

8. 深入解析RabbitMQ发布确认机制

8.1 事务机制与发布确认机制的对比

事务机制和发布确认机制都是确保消息可靠投递的手段,但它们在实现和性能方面有明显区别:

  • 事务机制:通过txSelecttxCommittxRollback实现,性能开销较大,不适合高并发场景。
  • 发布确认机制:通过异步确认消息是否成功到达交换机和队列,性能开销小,适合高并发场景。
8.2 发布确认机制的优缺点
优点
  1. 性能高:相比事务机制,发布确认机制对性能的影响较小。
  2. 异步处理:使用回调函数处理确认结果,不阻塞消息发送。
  3. 可靠性高:确保消息成功到达交换机和队列,提高系统可靠性。
缺点
  1. 实现复杂:需要配置和处理回调函数,增加了代码复杂度。
  2. 延迟高:确认机制引入了额外的网络延迟。
8.3 发布确认机制的应用场景
  1. 金融支付系统:确保支付消息的可靠传输,避免重复支付或支付丢失。
  2. 电商系统:确保订单消息的可靠传输,避免订单丢失或重复处理。
  3. 日志系统:确保日志消息的可靠传输,避免日志丢

失。

8.4 发布确认机制的最佳实践
  1. 合理设置超时时间:在高并发场景下,设置合理的超时时间,避免消息发送阻塞。
  2. 优化回调函数:回调函数中避免复杂逻辑,确保回调处理快速完成。
  3. 监控和报警:建立监控机制,及时发现和处理消息投递失败问题。

9. 总结

本文详细介绍了RabbitMQ消息的发布确认机制,包括配置、实现及其在不同场景下的表现。通过合理配置和使用发布确认机制,可以有效提高消息传输的可靠性,确保消息在高并发环境下的可靠投递。希望本文能够帮助读者深入理解并应用RabbitMQ的发布确认机制,提高系统的可靠性和性能。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-06-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. RabbitMQ发布确认机制概述
  • 2. 配置文件中添加发布确认相关配置
  • 3. 发布确认类型
  • 4. 配置RabbitTemplate
  • 5. 配置测试交换机和队列
  • 6. 测试场景及分析
    • 6.1 消息无法到达交换机
      • 6.2 消息到达交换机但无法到达队列
        • 6.3 注掉备份队列再试
          • 6.4 成功到达队列
          • 7. 发布确认流程
          • 8. 深入解析RabbitMQ发布确认机制
            • 8.1 事务机制与发布确认机制的对比
              • 8.2 发布确认机制的优缺点
                • 优点
                • 缺点
              • 8.3 发布确认机制的应用场景
                • 8.4 发布确认机制的最佳实践
                • 9. 总结
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档