生产者发送消息之后,到达消费端之后,可能会有以下情况:

RabbitMQ 向消费者发送消息之后,就会把这条消息删除掉
如何确保消费端已经成功接收,并正确处理了呢?
RabbitMQ 提供了消息确认机制(messageacknowledgement)消费者在订阅队列的时候,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种:
autoAck=true,RabbitMQ 会自动把发出去的消息设置为确认,然后从内存(或磁盘) 中删除,而不管消费者是否真正地消费到了这些消息 autoAck=false,RabbitMQ 会等待消费者显式地调用 Basic.Ack 命令,恢复确认信号后才从内存(硬盘) 中移去消息 当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分
RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者(也有可能还是原来那个消费者)
从 RabbitMQ 的 Web 管理平台上,也可以但看当前队列中 Ready 状态和 Unacked 状态的消息数

Ready:等待投递给消费者的消息数Unacked:已经投递给消费者,但是未收到消费者确认信号的消息数消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ 也提供了不同的确认应答的的方式,消费者客户端可以调用与其对应的 channel 的先关方法,共有三种
肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了delivertTag 64 位的长整型值。Channel) 独立维护的,所以在每个通道上都是唯一的ack) 一条消息时,必须使用对应的通道上进行确认multiple: deliveryTag 进行批量确认true,则会一次性 ack 所有小于或等于指定 deliveryTag 的消息false,则只确认当前指定 deliveryTag 的消息
deliveryTag是RabbitMQ中消息确认机制的一个重要组成部分,它确保了消息传递的可靠性和顺序性
否定确认:Channel.basicReject(long deliveryTag, boolean requeue)
RabbitMQ 在 2.0.0 版本开始引入了 Basic.Reject 这个命令,消费者客户端可以调用 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息deliveryTag:参考 channel.basicAckrequeue requeue 参数设为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者requeue 参数设为 false,则 RabbitMQ 会把消息从队列中移除,而不会把它发送给新的消费者否定确认:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject 一次只能拒绝一条消息,如果想要批量拒绝,则可以使用 Basic.Nack 这个命令channel.basicNack 方法来实现multiple 参数设置为 true ,则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息我们基于 Spring Boot 来演示消息的确认机制,使用方式和使用 RabbitMQ Java Client 有一定差异
Spring-AMQP 对消息确认机制提供了三种策略
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}AcknowledgeMode.NONE RabbitMQ 都会自动确认消息,从 RabbitMQ 队列中移除消息AcknowledgeMode.AUTO(默认) AcknowledgeMode.MANUAL basicAck 方法来确认消息RabbitMQ 会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息主要流程:
RabbitMQ 都会自动确认消息,从 RabbitMQ 队列中移除消息spring:
rabbitmq:
addresses: amqp://guest:guest@127.0.0.1:5672/coding
listener:
simple:
acknowledge-mode: none队列,交换机配置
public class Constant {
public static final String ACK_EXCHANGE_NAME = "ack_exchange";
public static final String ACK_QUEUE = "ack_queue";
}import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
public class RabbitMQConfig {
// 1. 交换机
@Bean("ackExchange")
public Exchange ackExchange() {
return ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();
}
// 2. 队列
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
// 3. 队列和交换机绑定 Binding
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange, @Qualifier("ackQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();
}
}通过接口发送消息:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProductController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", "consumer ack test...");
return "发送成功!";
}
}import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {
// 指定监听队列的名称
@RabbitListener(queues = Constant.ACK_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
// 模拟处理失败
int num = 3 / 0;
System.out.println("处理完成");
}
}这个代码运行结果是正常的,运行后消息会被签收:Ready 为 0,unacked 为 0
调用接口,发送消息: http://127.0.0.1:8080/producer/ack
开启消费者,控制台输出:

管理界面:

RabbitMQ 中移除配置确认机制
spring:
rabbitmq:
addresses: amqp://guest:guest@127.0.0.1:5672/coding
listener:
simple:
acknowledge-mode: auto调用接口,发送消息: http://127.0.0.1:8080/producer/ack
可以看到队列中有一条消息,unacked 的为 0(需要先把消费者注掉,注掉相关注解即可)

开启消费者,控制台不断输出错误信息

管理界面

RabbitMQ 会不断重发nack,就一直是 unacked 状态,导致消息积压配置确认机制
spring:
rabbitmq:
addresses: amqp://guest:guest@127.0.0.1:5672/coding
listener:
simple:
acknowledge-mode: manualimport com.example.rabbit_features.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {
// 指定监听队列的名称
@RabbitListener(queues = Constant.ACK_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 接收消息
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
// 2. 处理业务逻辑
System.out.println("处理业务逻辑");
// 手动设置一个异常,来测试异常拒绝机制
// int num = 3 / 0;
// 3. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 4. 异常了就拒绝签收
// 第三个参数 requeue,是否重新发送,如果为 true,则会重发;若为 false,则直接丢弃
channel.basicNack(deliveryTag, true, true);
}
}
}Read 为 0,unacked 为 0控制台输出:

管理界面:

主动设置异常
@Component
public class AckQueueListener {
// 指定监听队列的名称
@RabbitListener(queues = Constant.ACK_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 接收消息
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
// 2. 处理业务逻辑
System.out.println("处理业务逻辑");
// 手动设置一个异常,来测试异常拒绝机制
int num = 3 / 0;
// 3. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 4. 异常了就拒绝签收
// 第三个参数 requeue,是否重新发送,如果为 true,则会重发;若为 false,则直接丢弃
channel.basicNack(deliveryTag, true, true);
}
}
}运行结果:消费异常时不断重试,deliveryTag 从 1 递增
控制台日志:

管理界面:unacked 也变成了 1
