Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >SpringBoot 整合 RabbitMQ(包含三种消息确认机制以及消费端限流)

SpringBoot 整合 RabbitMQ(包含三种消息确认机制以及消费端限流)

作者头像
海向
发布于 2019-09-23 06:55:26
发布于 2019-09-23 06:55:26
1.9K00
代码可运行
举报
文章被收录于专栏:海向海向
运行总次数:0
代码可运行

目录


说明

本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式

同时消费端也采取了限流的措施,如果对限流细节有兴趣请参照之前的文章阅读:消费端限流

生产端

首先引入 maven 依赖

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.1.4.RELEASE</version>
    </dependency>

Application.properties 中进行设置,开启 confirm 确认机制,开启 return 确认模式,设置 mandatory属性 为 true,当设置为 true 的时候,路由不到队列的消息不会被自动删除,从而才可以被 return 消息模式监听到。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

#开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
#开启 return 确认机制
spring.rabbitmq.publisher-returns=true
#设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
spring.rabbitmq.template.mandatory=true

创建队列和交换机,此处不应该创建 ConnectionFactory 和 RabbitAdmin,应该在 application.properties 中设置用户名、密码、host、端口、虚拟主机即可。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {
//    @Bean
//    public ConnectionFactory connectionFactory(){
//        return new CachingConnectionFactory();
//    }
//
//    @Bean
//    public RabbitAdmin rabbitAdmin(){
//        return new RabbitAdmin(connectionFactory());
//    }
    @Bean
    public Exchange bootExchange(){
        return new TopicExchange("BOOT-EXCHANGE-1", true, false);
    }

    @Bean
    public Queue bootQueue(){
        return new Queue("boot.queue1", true);
    }
}

如果程序有特殊的设置要求,追求更灵活的设置可以参考以下方式进行编码设置,从而不用在application.properties 指定。例如我们在测试环境和生产环境中配置的虚拟主机、密码不同、我们可以在程序中判断处于哪种环境,灵活切换设置。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        if("生产环境"){
          connectionFactory.set.....
        } else {
          ......
        }
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(){
        RabbitAdmin rabbitAdmin = new RabbitAdmin();
        rabbitAdmin.setAutoStartup(true);
        return new RabbitAdmin(connectionFactory());
    }

MQSender代码如下,包含发送消息以及添加 confirm 监听、添加 return 监听。如果消费端要设置为手工 ACK ,那么生产端发送消息的时候一定发送 correlationData ,并且全局唯一,用以唯一标识消息。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import com.anqi.mq.bean.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.Map;

@Component
public class MQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    final RabbitTemplate.ConfirmCallback confirmCallback= new RabbitTemplate.ConfirmCallback() {

        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("correlationData: " + correlationData);
            System.out.println("ack: " + ack);
            if(!ack){
                System.out.println("异常处理....");
            }
        }

    };

    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {

        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };

    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageProperties mp = new MessageProperties();
        //在生产环境中这里不用Message,而是使用 fastJson 等工具将对象转换为 json 格式发送
        Message msg = new Message(message.toString().getBytes(),mp);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一
        CorrelationData correlationData = new CorrelationData("1234567890"+new Date());
        rabbitTemplate.convertAndSend("BOOT-EXCHANGE-1", "boot.save", msg, correlationData);
    }
    //发送消息方法调用: 构建Message消息
    public void sendUser(User user) throws Exception {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一
        CorrelationData correlationData = new CorrelationData("1234567890"+new Date());
        rabbitTemplate.convertAndSend("BOOT-EXCHANGE-1", "boot.save", user, correlationData);
    }
}

消费端

在实际生产环境中,生产端和消费端一般都是两个系统,我们在此也将拆分成两个项目。

以下为消费端的 application.properties 中的配置,首先配置手工确认模式,用于 ACK 的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理。我们也可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况。我们要开启限流,指定每次处理消息最多只能处理两条消息。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spring.rabbitmq.host=localhost
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest


#设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=1
#消费之最大数量
spring.rabbitmq.listener.simple.max-concurrency=10

#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
spring.rabbitmq.listener.simple.prefetch=2

我们可以使用 @RabbitListener@RabblitHandler组合来监听队列,当然@RabbitListener 也可以加在方法上。我们这里是创建了两个方法用来监听同一个队列,具体调用哪个方法是通过匹配方法的入参来决定的,自定义类型的消息需要标注@Payload,类要实现序列化接口。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.anqi.mq.receiver;

import com.anqi.mq.bean.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;


@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "boot.queue1", durable = "true"),
                exchange = @Exchange(value = "BOOT-EXCHANGE-1", type = "topic", durable = "true", ignoreDeclarationExceptions = "true"),
                key = "boot.*"
        )
)
@Component
public class MQReceiver {

    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws IOException {

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //手工ack
        channel.basicAck(deliveryTag,true);
        System.out.println("receive--1: " + new String(message.getBody()));
    }

   @RabbitHandler
    public void onUserMessage(@Payload User user, Channel channel, @Headers Map<String,Object> headers) throws IOException {

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ack
        channel.basicAck(deliveryTag,true);
        System.out.println("receive--11: " + user.toString());
    }
}

消息的序列化与反序列化由内部转换器完成,如果我们要采用其他类型的消息转换器,我们可以对其进行设置SimpleMessageListenerContainer

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setMessageConverter(new Jackson2JsonMessageConverter());
        // 默认采用下面的这种转换器
        // container.setMessageConverter(new SimpleMessageConverter());
        return container;
    }

单元测试

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import com.anqi.mq.bean.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {

    @Autowired
    private MQSender mqSender;

    @Test
    public void send() {
        String msg = "hello spring boot";
        try {
            for (int i = 0; i < 15; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //mqSender.send(msg + ":" + i, null);
                mqSender.sendUser(new User("anqi", 25));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

测试结果如下,我们在消费方法使用了Thread.sleep(5000)来模拟消息的处理过程,故意的延长了消息的处理时间,从而更好的观察限流效果。我们可以发现Unacked一直是 2, 代表正在处理的消息数量为 2,这与我们限流的数量一致,说明了限流的目的已经实现。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-06-01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
手把手带你Springboot整合RabbitMq ,一篇讲完
该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct、Topic、Fanout的使用,消息回调、手动确认等。(但是关于rabbitMq的安装,就不介绍了)
java进阶架构师
2020/06/16
1.7K0
手把手带你Springboot整合RabbitMq ,一篇讲完
RabbitMQ如何保证消息的可靠投递?
github地址: https://github.com/erlieStar/rabbitmq-examples
Java识堂
2020/10/23
5820
快速尝鲜:RabbitMQ 搭建完就得用起来
上文我们已经完成了RabbitMQ的安装,安完就要让它发挥点作用,今天就在SpringBoot项目里集成一下子,尝尝鲜!
阿Q说代码
2022/04/07
2330
快速尝鲜:RabbitMQ 搭建完就得用起来
RabbitMQ与SpringBoot2.0整合
@RabbitListener注解如果没有存在exchange和queue会自动创建
用户1212940
2022/04/13
2890
​SpringBoot连接多RabbitMQ源
在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但是只能连接一个RabbitMQ,当需要连接多个RabbitMQ时,默认的配置就不太适用了,需要单独编写每个连接。
java之旅
2020/01/07
3K0
springboot使用rabbitMQ(带回调)
配置文件2:RabbitConstants(主要用于用户名、密码等值从配置文件获取,也可以用@Value方式)
小尘哥
2018/08/15
9370
springboot使用rabbitMQ(带回调)
一文搞懂Spring-AMQP
12//设置消息发送ack,默认noneconnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
爱撒谎的男孩
2020/03/11
1.2K0
SpringBoot与RabbitMQ详解与整合
Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
码哥字节
2021/07/27
7690
RabbitMQ集群整合SpringBoot2.x
RabbitMQ相信大家已经再熟悉不过了,作为业界四大主流消息中间件之一(Apache RocketMQ、Apache Kafka、Apache ActiveMQ、RabbitMQ),它具有非常好的性能和可靠性的集群模式,不仅仅在各大互联网大厂中广泛使用(比如同程艺龙、美团点评等),而且在互联网金融行业也常常被作为首选!
Criss@陈磊
2019/08/01
2.1K0
SpringBoot-RabbitMQ发送消息的监控
刚才我们发送消息,不管成功还是失败,都不报错,结果看效果时,发现有的没有发进去,那么如何知道消息是否发送成功呢,RabbitMQ提供了一个消费监视的功能。注意:RabbitMQ发送消息分为2个阶段,消息发送到交互机里面,可以监视,消息由交互机到队列里面,也可以监视。
程序员 NEO
2023/09/27
2920
SpringBoot-RabbitMQ发送消息的监控
RabbitMQ消息的发布确认机制详解
RabbitMQ发布确认机制确保消息从生产者成功传输到交换机和队列,提高系统可靠性。在Spring Boot项目中,通过配置publisher-confirm-type和publisher-returns,启用发布确认和消息返回机制。配置RabbitTemplate的确认回调和返回回调,可以捕捉消息传输状态,处理不同传输结果。测试场景包括消息无法到达交换机、消息到达交换机但无法到达队列以及消息成功到达队列。通过合理设置和优化,可以确保高并发环境下的消息可靠传输,适用于金融支付、电商系统等对消息传输可靠性要求高的场景。
九转成圣
2024/06/05
1.1K0
RabbitMQ消息的发布确认机制详解
RabbitMQ高级特性之消费端限流
假设我们现在有这么一个场景,我们的消费端由于某些原因导致全部宕机等不可用,导致RabbitMQ服务器队列中存储了大量消息未被消费掉,那么等恢复消费端服务器后,就会有巨大量的消息全部推送过来,但是我们单个客户端无法同事处理这么多消息,就是导致消费端一些不可预测错误,甚至又会重复发生宕机,所以在实际业务场景中,限流保护还是非常重要的。
黎明大大
2021/03/08
7740
RabbitMQ学习笔记(四)——RabbitMQ与SpringBoot适配
◆ 异步消息监听容器 ◆ 原生提供RabbitTemplate,方便收发消息 ◆ 原生提供RabbitAdmin,方便队列、交换机声明 ◆ Spring Boot Config原生支持RabbitMQ
不愿意做鱼的小鲸鱼
2022/09/26
1.6K0
RabbitMQ学习笔记(四)——RabbitMQ与SpringBoot适配
SpringBoot整合RabbitMQ(九)
在前面的系列文章中详细的介绍了RabbitMQ的技术栈知识,本文章主要详细的介绍SpringBoot整合RabbitMQ。
无涯WuYa
2022/04/27
3870
SpringBoot整合RabbitMQ(九)
RabbitMQ与Spring的框架整合之Spring Boot实战
首先创建maven项目的RabbitMQ的消息生产者rabbitmq-springboot-provider项目,配置pom.xml配置文件,如下所示:
别先生
2019/12/02
5270
rabbitmq实际使用案例_沉默的螺旋案例
大家好,又见面了,我是你们的朋友全栈君。 一.发布与订阅模式(队列–>交换机) yml配置: server: port: 8088 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest publisher-confirm-type: correlated #消息确认方式,通过 correlated 来确认(将来的消息中才会带 correlation_
全栈程序员站长
2022/11/10
6790
rabbitmq实际使用案例_沉默的螺旋案例
Springboot整合Rabbitmq,Direct、Fanout、Topic
https://www.rabbitmq.com/install-windows.html 注意安装Erlang,没有安装的话安装rabbitmq也会提示你跳转安装Erlang
鱼找水需要时间
2023/02/16
6920
Springboot整合Rabbitmq,Direct、Fanout、Topic
SpringACK对RabbitMQ消息的确认(消费)
之前已经简单介绍了基本是从发送方去确认的,我们需要在配置文件当中开启发送方确认模式,共育两种,一种是相对于交换机一个是相对于队列。
兰舟千帆
2022/08/11
6640
SpringACK对RabbitMQ消息的确认(消费)
整合RabbitMQ&Spring
RabbitAdmin类可以很好的操作RabbitMQ,在spring中直接进行注入即可
用户1212940
2022/04/13
2860
RabbitMQ 延迟队列,消息延迟推送
在上面两种场景中,如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:
海向
2019/09/23
2.2K0
RabbitMQ 延迟队列,消息延迟推送
相关推荐
手把手带你Springboot整合RabbitMq ,一篇讲完
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验