消费者
package com.dance.redis.mq.rabbit.rqueue;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Receiver {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String queueName = "test001";
RabbitMQHelper.queueDeclare(channel,queueName,true);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
try {
System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
/**
* 手工ACK
* 消费失败, 但是消息不重回队列
* channel.basicNack(envelope.getDeliveryTag(), false, false);
* 消费失败, 将消息重新丢回消息队列尾部
* channel.basicNack(envelope.getDeliveryTag(), false, true);
* 消费成功
* channel.basicAck(envelope.getDeliveryTag(), false);
*/
if((Integer)properties.getHeaders().get("flag") == 0) {
//throw new RuntimeException("异常");
// 设置为false表示关闭重回队列
channel.basicNack(envelope.getDeliveryTag(), false, false);
// 设置为true表示开启重回队列 将这条消息重回放入队列
// channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
// 等待回调函数执行完毕之后,关闭资源。
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
生产者
package com.dance.redis.mq.rabbit.rqueue;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
public class Sender {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
//4 声明
String queueName = "test001";
RabbitMQHelper.queueDeclare(channel, queueName, true);
for (int i = 0; i < 5; i++) {
String msg = "Hello World RabbitMQ " + i;
Map<String, Object> headers = new HashMap<>();
headers.put("flag", i);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
channel.basicPublish("", queueName, props, msg.getBytes());
}
}
}
开启重回队列测试
启动消费者
启动生产者
查看消费者
可以看到flag=0的消息, 再一直被重回队列, 当然, 我们可以通过程序去控制这个是不是要重回队列
关闭重回队列测试
启动消费者
启动生产者
查看消费者
可以看到哪怕, 我们手工NACK之后, 消息也没有重回队列