Spring和RabbitMQ消息队列(AMQP)整合详解 官方主页 Spring AMQP 一、概述 消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。...上一篇《Spring和ActiveMq消息队列整合详解》介绍了ActiveMq的整合 本篇通过介绍下RabbitMQ的整合过程。 建议访问首发地址查看,自动生成目录树方便查看章节。...本项目将RabbitMQ的exchange三种模式的生产者和消费者都放在一个项目中,通过调用web接口发送消息,并监听每个队列的消息。 2.2.1 maven依赖 队列、exchange的管理,这个过程是由Spring自动完成的。 队列:rabbit:queue定义了一个队列,队列只是负责接收消息。...容器:jmsContainer是将消息队列和监听bean整合起来,这样就保证用的时候能找到对应的bean。
交换器(exchange):负责将生产者发来的消息按照特定的路由关键字(routing key)投递到相应的队列。 队列(queue):代理节点中存储将要被消费的消息的载体。...绑定(binding):交换器与队列之间的映射关系,可以理解为消息的路由规则。 AMQP实体(AMQP entity):交换器、队列和绑定三者合起来就称为一个AMQP实体,图中未示出。...扇出交换器(fanout exchange) 扇出交换器比直连交换器更简单,它会直接将消息路由到所有与它绑定的队列中。 ?...如果交换器无法将消息路由到队列该怎么办呢?AMQP给出了几种解决方法,一是直接丢弃,二是返还给生产者,三是放入死信队列中等待进一步处理。这由消息头部中的属性来决定。...AMQP规范下的队列和消费者都同时支持推模式和拉模式消费。前者即AMQP实体将消息投递到消费者,后者即消费者主动地从队列中获取消息。无论推模式还是拉模式,每个消费者也有一个标识,称为tag。
前言 消息队列在现今数据量超大,并发量超高的系统中是十分常用的。本文将会对现时最常用到的几款消息队列框架 ActiveMQ、RabbitMQ、Kafka 进行分析对比。...目录 一、RabbitMQ 与 AMQP 的关系 二、RabbitMQ 的实现原理 三、RabbitMQ 应用实例 四、Producer 端的消息发送与监控 五、Consumer 端的消息接收与监控 六...、死信队列 七、持久化操作 一、RabbitMQ 与 AMQP 的关系 1.1 AMQP简介 AMQP(Advanced Message Queue Protocol 高级消息队列协议)是一个消息队列协议...2.2 交换器(Exchange)、队列(Queue)、信道(Channel)、绑定(Binding)的概念 2.2.1 交换器 Exchange Producer 建立连接后,并非直接将消息投递到队列...也就是说,用于路由的属性是取自于消息 Header 属性,当消息 Header 的值与队列绑定时指定的值相同时,消息就会路由至相应的队列中。
前言 上面章节已为大家介绍 RabbitMQ 在 Spring 框架下的结构及实现原理,这章里将(从Producer 端的事务、回调函数(ConfirmCallback / ReturnCallback...目录 一、RabbitMQ 与 AMQP 的关系 二、RabbitMQ 的实现原理 三、RabbitMQ 应用实例 四、Producer 端的消息发送与监控 五、Consumer 端的消息接收与监控 六...、死信队列 七、持久化操作 四、Producer 端的消息发送与监控 前面一节已经介绍了RabbitMQ的基本使用方法,这一节将从更深入的层面讲述 Producer 的应用。...在第四节主要介绍了 Producer 端的队列发送与监控,它只能管理 Producer 与 Broker Server 之间的通信,但并不能确认 Consumer 是否能成功接收到队列,在这节内容将介绍...server 将暂停向 consumer 发送消息,待消息处理后再继续。
前言 消息队列在现今数据量超大,并发量超高的系统中是十分常用的。本文将会对现时最常用到的几款消息队列框架 ActiveMQ、RabbitMQ、Kafka 进行分析对比。...目录 一、RabbitMQ 与 AMQP 的关系 二、RabbitMQ 的实现原理 三、RabbitMQ 应用实例 四、Producer 端的消息发送与监控 五、Consumer 端的消息接收与监控 六...当消息在一个队列中变成死信后,它能被重新被发送到特定的交换器中,这个交换器就是DLX ,绑定DLX 的队列就称之为死信队列。...7.2 Message 持久化 设置了Queue 持久化以后,当 RabbitMQ 服务重启之后,队列依然存在,但消息已经消失,可见单单设置队列的持久化而不设置消息持久化显得毫无意义,所以通常列队持久化会与消息持久化共同使用...而在 Spring 框架下,由于在使用回调函数时需要把 Message 重新返回队列再进行处理,所以 Message 默认已经是持久化的。 ?
组件分享之后端组件——基于Golang实现的高级消息队列协议 (AMQP) 的消息代理garagemq 背景 近期正在探索前端、后端、系统端各类常用组件与工具,对其一些常见的组件进行再次整理一下,形成标准化组件专题...,后续该专题将包含各类语言中的一些常用组件。...组件基本信息 组件:garagemq 开源协议:MIT license 内容 本节我们分享一个基于Golang实现的高级消息队列协议 (AMQP) 的消息代理garagemq。...run --name garagemq -p 5672:5672 -p 15672:15672 amplitudo/garagemq 2、配置参数 # Proto name to implement (amqp-rabbit...or amqp-0-9-1) proto: amqp-rabbit # User list users: - username: guest password: 084e0343a0486ff05530df6c705c8bb4
个节点都是存活的,然后我又让运维确认了下队列的消费者情况,结果发现消费者列表中只有 2 个节点的消费者,其他 4 个节点的消费者不见了,所以消息消费不过来,导致了消息积压!...-1 收到消息,业务处理的时候 OOM 了,Spring 中止该线程,消息未被手动确认,回到队列等待被消费 消费者线程 taskMessageListenerContainer-2 收到消息,业务处理的时候又...OOM,Spring 中止该线程,消息未被手动确认,回到队列等待被消费 消费者线程 taskMessageListenerContainer-3 收到消息,业务处理的时候扔 OOM,Spring 中止该线程...,消息未被手动确认,回到队列等待被消费 全部的 3 个消费者线程都被 Spring 中止了,对应的 3 个队列消费者也就都无了,消息最终回到队列,等待下一个就绪的消费者消费 我们不是 catch 了...,并没有 Spring 的错误日志,此时队列消费者情况如下 当然,这只是缓兵之计,最终解决方案还是要分析 OOM 的原因,然后对症下药 总结 示例代码:spring-boot-rabbitmq OOM
1、前言 消息队列(Message Queue,简称 MQ)是一种异步的消息传递中间件,它解耦了应用程序之间的通信。应用程序可以将消息发送到队列,而无需知道谁会接收这些消息。...接收应用程序可以从队列中检索消息,而无需知道谁发送了这些消息。消息队列是一种重要的中间件,它可以帮助应用程序之间进行异步、可靠、可扩展的通信。...2、什么是RabbitMQ RabbitMQ 是一个开源的消息队列服务器,它实现了 AMQP (高级消息队列协议) 标准。...* deliveryTag:表示消息投递序号。 * requeue:值为 true 消息将重新入队列。...* requeue:值为 true 消息将重新入队列。
String message = "hello, spring amqp!"...修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常: 我们打个断点用 Debug运行。再回到浏览器查看,发现消息已经没了。 ...我们回到代码区往下执行。 测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。 4.2.演示auto模式 再次把确认机制修改为auto: 然后我们再往队列发送一条消息。 ...本地重试 我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。...比较好的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
:需要设置类型为direct的交换机,交换机和队列进⾏绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列 通配符模式 Topic: 需要设置类型为...topic的交换机,交换机和队列进⾏绑定,并且指定通配符⽅式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。...6、Exchange三种模式 Exchange有常⻅以下3种类型: Fanout ⼴播:将消息交给所有绑定到交换机的队列, 不处理路由键。只需要简单的将队列绑定到交换机上。...相当于mysql的db。Virtual Name⼀般以/开头。 1....⽣产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
(内容来源:Spring中国教育管理中心) 本指南将引导您完成设置发布和订阅消息的 RabbitMQ AMQP 服务器以及创建 Spring Boot 应用程序以与该 RabbitMQ 服务器交互的过程...方法中定义的 beanlistenerAdapter()被注册为容器中的消息监听器(定义在 中container())。它侦听spring-boot队列中的消息。...JMS 队列和 AMQP 队列具有不同的语义。例如,JMS 仅将排队的消息发送给一个消费者。虽然 AMQP 队列做同样的事情,但 AMQP 生产者并不直接将消息发送到队列。...相反,一条消息被发送到一个交换器,该交换器可以发送到单个队列或扇出到多个队列,模拟 JMS 主题的概念。 消息侦听器容器和接收器 bean 是您侦听消息所需的全部内容。...在这种情况下,我们使用主题交换,并且队列与路由键绑定foo.bar.#,这意味着以 开头的路由键发送的任何消息都会foo.bar.被路由到队列。
RabbitMq -JCccc 黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是 rabbitMq的服务器,然后经过服务器里面的交换机、队列等各种关系(后面会详细讲)将数据处理入列后,...常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种: Direct Exchange 直连型交换机,根据消息携带的路由键将消息投递给对应队列...和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 @Bean Binding...和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 @Bean Binding...第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。 同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。
// 拒绝签收,第三个参数重回队列,如果设置为true,则消息重新回到队列 } public void onMessage(Message message) {...打开这个管理面板,可以看到没有队列,这里提前已经删除掉之前的创建好的队列和交换机了,为的是为了是运行展示后的效果比较明显一些。 交换机和队列都是可以在程序中创建和绑定的。...现在我们去面板看,可以看到这里就自动创建出来队列和生产了一条消息,当然交换机的创建和队列的绑定也是执行了。 现在我们在消费者去消费,执行的话,我们就去执行启动类就好。...因为我们这个类加上了这个注解,其实就是已经实例化给spring了。表明了已经成为spring的一个组件,所以直接去启动启动运行类就好了。...我们这里出现异常,第二个参数为true,代表不确认,第三个代表重新让它回到队列,设置为true该行消息重新回到队列,但是我们这里会持续接收进行接收消费,于是来来回回就形成了死循环。
AMQP定义了这些特性: 消息方向 消息队列 消息路由(包括:点到点和发布-订阅模式) 可靠性 安全性 RabbitMQ 本文要介绍的RabbitMQ就是以AMQP协议实现的一种中间件产品,它可以支持多种操作系统...,用户可以回到上面的安装内容,在管理页面中创建用户。...通过注入 AmqpTemplate接口的实例来实现消息的发送, AmqpTemplate接口定义了一套针对AMQP协议的基础操作。在Spring Boot中会根据配置来注入其具体实现。...通过 @RabbitListener注解定义该类对 hello队列的监听,并用 @RabbitHandler注解来指定对消息的处理方法。...所以,该消费者实现了对 hello队列的消费,消费操作为输出消息的字符串内容。
该参数的作用是,当消息的mandatory设置为true时,消息投递到Exchange之后,如果Exchange无法将该消息路由到任何一个队列,那么该消息将返回给生产者。...当设置为false,RabbitMQ将直接丢弃该消息。 在了解了这个背景之后,分为使用和不使用spring-boot-starter-amqp两种场景。...在未使用spring-boot-starter-amqp的场景下,我们直接给channel设置监听器并且将消息的mandatory设置为true,即可实现消息无法路由之后通过该channel将消息return...而在使用spring-boot-starter-amqp的场景下,除了设置mandatory,还需要设置spring.rabbitmq.publisher-returns,这个参数的作用是什么呢。...在添加回调监听器的地方打上断点 回到SpringBoot环境下Debug: 分析监听器是如何被加入到集合的。
#,那么可以匹配到例如:topic.message、topic.message.detail等,以topic.开头的路由键都可以匹配到。 ...#,凡是topic.开头的routingKey消息都发送到此队列 @Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind...RabbitMQ消息的确认机制 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的生产者在将消息发送出去之后...:是否批量. true:将一次性ack所有小于deliveryTag的消息。 ...multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。 requeue:是否重新入队列。
,然后经过服务器里面的交换机、队列等各种关系(后面会详细讲)将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。...常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种: Direct Exchange 直连型交换机,根据消息携带的路由键将消息投递给对应队列...和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 @Bean Binding...和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 @Bean Binding...第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。 同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。
confirm机制 confirm模式需要基于channel进⾏设置, ⼀旦某条消息被投递到队列之后,消息队列就会发送⼀个确认信息给⽣产者,如果队列与消息是可持久化的, 那么确认消息会等到消息成功写...confirm的性能⾼,主要得益于它是异步的.⽣产者在将第⼀条消息发出之后等待确认消息的同时也可以继续发送后续的消息.当确认消息到达之后,就可以通过回调⽅法处理这条确认消息....手动应答机制: 只有在消息消费者将消息处理完,才会通知消息服务器将该条消息删除 消费者发起成功通知 DeliveryTag: 消息的唯⼀标识 channel+消息编号 第⼆个参数:是否开启批量处理。...第⼆个boolean: true当前消息会进⼊到死信队列。false重新回到原有队列中,默认回到头部。...,false代表只有当前消费者拒绝 * 第⼆个boolean true当前消息会进⼊到死信队列,false重新回到原有队列中,默认回到头部 */
五、TTL 1.业务场景 2.定义 3.实现步骤 4.通过RabbitMQ管理控制台页面实现Demo 1.创建消息 2.创建交换机 3.将交换机和消息绑定 4.发送消息 5.通过代码实现TTL 六、死信队列...如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端 */ System.out.println("=====业务处理异常...4.通过RabbitMQ管理控制台页面实现Demo 1.创建消息 2.创建交换机 3.将交换机和消息绑定 4.发送消息 超过5秒没有消费者消费,就自动失效了。...说明:死信交换机和死信队列和普通的没有区别,当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。...2.消息成为死信的三种情况 队列消息长度到达限制; 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false; 原队列存在消息过期设置,
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp SpringAMQP提供了三个功能: 自动声明队列、交换机及其绑定关系 基于注解的监听器模式...一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。...Exchange有以下3种类型: Fanout:广播,将消息交给所有绑定到交换机的队列 Direct:定向,把消息交给符合指定routing key 的队列 Topic:通配符,把消息交给符合routing...接收publisher发送的消息 将消息按照规则路由到与之绑定的队列 不能缓存消息,路由失败,消息丢失 FanoutExchange的会将消息路由到每个绑定的队列 声明队列、交换机、绑定关系的Bean是什么...Fanout交换机将消息路由给每一个与之绑定的队列 Direct交换机根据RoutingKey判断路由给哪个队列 如果多个队列具有相同的RoutingKey,则与Fanout功能类似 基于@RabbitListener
领取专属 10元无门槛券
手把手带您无忧上云