消费者
package com.dance.redis.mq.rabbit.returnlistener;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Receiver4ReturnListener {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_returnlistener_exchange";
String queueName = "test_returnlistener_queue";
String routingKey = "return.#";
RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
RabbitMQHelper.queueDeclare(channel,queueName);
channel.queueBind(queueName, exchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, false, consumer);
//等待回调函数执行完毕之后,关闭资源。
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
生产者
package com.dance.redis.mq.rabbit.returnlistener;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import java.io.IOException;
public class Sender4ReturnListener {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_returnlistener_exchange";
String routingKey1 = "abcd.save";
String routingKey2 = "return.save";
String routingKey3 = "return.delete.abc";
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
System.out.println("**************handleReturn**********");
System.out.println("replyCode : " + replyCode);
System.out.println("replyText : " + replyText);
System.out.println("exchange : " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("body : " + new String(body));
});
String msg = "Hello World RabbitMQ 4 Return Listener Message ...";
boolean mandatory = true;
channel.basicPublish(exchangeName, routingKey1, mandatory, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, mandatory, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, mandatory, null, msg.getBytes());
}
}
启动消费者
启动生产者
可以看到, 没有匹配到路由键的消息会被监听回来
查看消费者
有匹配到的路由键, 已经被消费了