消费者
package com.dance.redis.mq.rabbit.dlx;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class Receiver4DLXtExchange {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
// 声明正常的 exchange queue 路由规则
String queueName = "test_dlx_queue";
String exchangeName = "test_dlx_exchange";
String routingKey = "group.*";
RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
// 注意在这里要加一个特殊的属性arguments: x-dead-letter-exchange
Map<String, Object> arguments = new HashMap<>();
// 指定死信队列
arguments.put("x-dead-letter-exchange", "dlx.exchange");
// 指定死信队列的路由规则
arguments.put("x-dead-letter-routing-key", "dlx.*");
RabbitMQHelper.queueDeclare(channel,queueName,true,arguments);
channel.queueBind(queueName, exchangeName, routingKey);
// 声明死信队列
// dlx declare:
RabbitMQHelper.exchangeDeclare(channel,"dlx.exchange",RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
channel.queueDeclare("dlx.queue", false, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
// 拒绝并且不重回队列, 这样就会进入死信队列
channel.basicReject(envelope.getDeliveryTag(),false);
}
};
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
//等待回调函数执行完毕之后,关闭资源。
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
生产者
package com.dance.redis.mq.rabbit.dlx;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class Sender4DLXExchange {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_dlx_exchange";
String routingKey = "group.dlx";
Map<String, Object> headers = new HashMap<>();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
String msg = "Hello World RabbitMQ 4 DLX Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey, props, msg.getBytes());
}
}
启动消费者
启动生产者
查看消费者
应为直接拒绝了, 所以没有消费, 查看控制台
已经被加入到死信队列中了, 为啥是3呢, 应为我之前测试了两次, 这个时候, 如果是写业务的话, 就可以通过消费死信队列的消息, 完成消费失败的, 或者过期的补偿了~
我这里只是用了拒绝策略, TTL过期和队列满, 都会进入, 可以自己试一下