什么是消息的可靠性投递?即保证消息百分百发送到消息队列中去,消息发送端需要接受到mq服务端接受到消息的确认应答。除此之外还应有完善的消息补偿机制,发送失败的消息可以再感知并二次处理。 生产者到交换机通过
confirmCallback,交换机到队列通过
returnCallback
package cn.com.codingce.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
/**
* 交换机名称
*/
public static final String EXCHANGE_NAME = "health_hra3_exchange";
/**
* 队列名称
*/
public static final String QUEUE = "health_hra3_queue";
@Bean
public Exchange healthHra3Exchange() {
// 创建交换机,durable代表持久化,使用Bean注入
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean
public Queue healthHra3Queue() {
// 创建队列,使用Bean注入
return QueueBuilder.durable(QUEUE).build();
}
/**
* 交换机和队列绑定关系
*
* @param queue 上面注入的队列Bean,如果你的项目又多个,记得给Bean取名字
* @param exchange 上面注入的交换机Bean
*/
@Bean
public Binding healthHra3Binding(Queue queue, Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("health.#").noargs();
}
}
package cn.com.codingce.listener;
import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConfig.QUEUE) // 监听的队列名称
public class HealthHra3MQListener {
/**
* RabbitHandler会自动匹配消息类型(消息自动确认)
*
* @param msg 发送的是String类型,这里用String进行接收,RabbitHandler会自动进行匹配
* @param message
* @throws IOException
*/
@RabbitHandler
public void releaseCouponRecord(String msg, Message message) throws IOException {
log.info("releaseCouponRecord into"); // 监听到消息:消息内容,msg=新HRA3报告来啦!!
long msgTag = message.getMessageProperties().getDeliveryTag();
log.info("监听到消息:消息内容,msg={}", msg); // 监听到消息:消息内容,msg=新HRA3报告来啦!!
log.info("msgTag={}", msgTag); // msgTag=1
log.info("message={}", message.toString()); // message=(Body:'新HRA3报告来啦!!' MessageProperties [headers={}, ……
}
}
server:
port: 9090
spring:
application:
# 微服务系统有意义, 养成好习惯, 先写出来
name: rabbitmq-02-springboot
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /codingce
# 新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
publisher-confirm-type: correlated
thymeleaf.cache: false
confirmCallback是生产者到交换机,可以理解为确认消息是否发送成功。新版依赖可靠性投递默认是关闭的,使用以下方法开启:
#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated
yml
server:
port: 9090
spring:
application:
# 微服务系统有意义, 养成好习惯, 先写出来
name: rabbitmq-02-springboot
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /codingce
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
publisher-confirm-type: correlated
thymeleaf.cache: false
package cn.com.codingce.controller;
import cn.com.codingce.common.utils.R;
import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("api")
@Slf4j
public class SendController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 可靠性投递confirmCallback
*
* @return
*/
@GetMapping("/confirmCallback")
public R confirmCallback() {
log.info("可靠性投递 confirmCallback");
/*
correlationData:配置
ack:交换机是否收到消息,true是成功,false是失败
cause:失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("confirm==== ack={}", ack);
log.info("confirm==== cause={}", cause);
if (ack) {
log.info("发送成功,{}", cause);
} else {
log.error("发送失败,{}", cause);
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "health.new", "新HRA3报告来了!!");
return R.ok();
}
@GetMapping(value = "/default", produces = "text/html;charset=utf-8")
public String getDefault() {
return "队列服务运行正常...";
}
}
2023-11-12 15:48:31.782 INFO 6840 --- [nio-9090-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-11-12 15:48:31.782 INFO 6840 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2023-11-12 15:48:31.783 INFO 6840 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2023-11-12 15:48:42.146 INFO 6840 --- [nio-9090-exec-5] c.c.codingce.controller.SendController : 可靠性投递 confirmCallback
2023-11-12 15:48:42.155 INFO 6840 --- [nectionFactory1] c.c.codingce.controller.SendController : confirm==== ack=true
2023-11-12 15:48:42.156 INFO 6840 --- [nectionFactory1] c.c.codingce.controller.SendController : confirm==== cause=null
2023-11-12 15:48:42.156 INFO 6840 --- [nectionFactory1] c.c.codingce.controller.SendController : 发送成功,null
2023-11-12 15:48:42.159 INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener : releaseCouponRecord into
2023-11-12 15:48:42.159 INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener : 监听到消息:消息内容,msg=新HRA3报告来了!!
2023-11-12 15:48:42.159 INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener : msgTag=1
2023-11-12 15:48:42.159 INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener : message=(Body:'新HRA3报告来了!!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=health_hra3_exchange, receivedRoutingKey=health.new, deliveryTag=1, consumerTag=amq.ctag-7HDzHnEZ0foZ_MrQmGqFYQ, consumerQueue=health_hra3_queue])
returnCallback交换机到队列,消息从交换器发送到对应队列失败时触发:
spring.rabbitmq.publisher-returns=true #新版
# 为true,则交换机处理消息到路由失败,则会返回给生产者
spring.rabbitmq.template.mandatory=true
yml
server:
port: 9090
spring:
application:
# 微服务系统有意义, 养成好习惯, 先写出来
name: rabbitmq-02-springboot
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /codingce
# 新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
publisher-confirm-type: correlated
#########################################################################################
# 为true,则交换机处理消息到路由失败,则会返回给生产者
publisher-returns: true
thymeleaf.cache: false
package cn.com.codingce.controller;
import cn.com.codingce.common.utils.R;
import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("api")
@Slf4j
public class SendController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/returnCallback")
public R returnCallback() {
log.info("交换机到队列通过returnCallback 可靠性投递 returnCallback");
// 为true,则交换机处理消息到路由失败,则会返回给生产者,开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
rabbitTemplate.setReturnsCallback(returnedMessage -> {
int code = returnedMessage.getReplyCode();
log.info("returnCallback code={}", code);
log.info("returnCallback returned={}", returnedMessage);
});
// 这个routingKey是不存在的,它找不到这个路由,所以会出现异常从而触发上面的回调方法
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xxx.health.new", "新HRA3报告来了!!");
return R.ok();
}
@GetMapping(value = "/default", produces = "text/html;charset=utf-8")
public String getDefault() {
return "队列服务运行正常...";
}
}
2023-11-12 15:53:22.820 INFO 4848 --- [nio-9090-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-11-12 15:53:22.820 INFO 4848 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2023-11-12 15:53:22.821 INFO 4848 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2023-11-12 15:53:22.831 INFO 4848 --- [nio-9090-exec-1] c.c.codingce.controller.SendController : 交换机到队列通过returnCallback 可靠性投递 returnCallback
2023-11-12 15:53:22.839 INFO 4848 --- [nectionFactory1] c.c.codingce.controller.SendController : returnCallback code=312
2023-11-12 15:53:22.840 INFO 4848 --- [nectionFactory1] c.c.codingce.controller.SendController : returnCallback returned=ReturnedMessage [message=(Body:'新HRA3报告来了!!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=health_hra3_exchange, routingKey=xxx.health.new]
!!!开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议用消息确认机制
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。