// Spring Boot 示例
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
// 消息未到达 Broker,触发重试或记录日志
System.out.println("消息发送失败: " + cause);
}
});
return template;
}
}mandatory 参数:当消息无法路由到队列时,Broker 会返回消息给生产者。// 声明备份交换机和队列
@Bean
public DirectExchange mainExchange() {
return ExchangeBuilder.directExchange("main-exchange")
.durable(true)
.withArgument("alternate-exchange", "backup-exchange") // 绑定备份交换机
.build();
}durable=true。deliveryMode=2(持久化模式)。 // 发送持久化消息
MessageProperties props = new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("Hello World".getBytes(), props);
rabbitTemplate.send("exchange", "routingKey", message);rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'@RabbitListener(queues = "my-queue")
public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 处理业务逻辑...
channel.basicAck(tag, false); // 处理成功,确认消息
} catch (Exception e) {
channel.basicNack(tag, false, true); // 处理失败,拒绝并重新入队
}
}spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
max-attempts: 3
initial-interval: 3000@Bean
public Queue mainQueue() {
return QueueBuilder.durable("main-queue")
.withArgument("x-dead-letter-exchange", "dlx-exchange") // 绑定死信交换机
.build();
}生产者 --Confirm机制--> Broker --持久化+镜像队列--> 消费者 --手动ACK+重试--> 处理完成
(失败重发) (磁盘/集群备份) (失败重试或记录)通过以上措施,可最大程度减少消息丢失风险,实现可靠的端到端消息传递。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。