RabbitMQ的路由模式是一种消息传递模式,它允许消息生产者将消息发送到一个或多个特定的消息队列。在路由模式中,消息生产者将消息标记为具有特定的路由键,然后消息代理(RabbitMQ)将根据路由键将消息路由到与之匹配的队列。
具体来说,路由模式涉及到一个生产者、一个direct类型的交换机和多个队列。生产者在发送消息到交换机时,会指定一个路由键。交换机接收到生产者的消息后,会根据路由键将消息递交给与之完全匹配的队列。只有当消费者发送消息的交换器、路由与生产者指定的交换器、路由一致时,消费者才能接收到生产者向指定路由的消费者发送的消息。
路由模式与发布订阅模式类似,但发布订阅模式是分发到所有绑定到交换机的队列,而路由模式只分发到绑定在交换机上面指定路由键的队列。因此,路由模式提供了更精确的消息传递控制。
在实际应用中,RabbitMQ的路由模式可以实现各种复杂的消息传递需求,如日志级别过滤、消息过滤等。通过使用路由模式,可以确保消息被准确地发送到特定的队列,从而实现更高效、更灵活的消息传递和处理。
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。
特点:
这里设计一个生产者,创建一个交换机,三个队列,分别是邮件队列,站内消息队列,短信队列,这里站内短信绑定两个路由,其余队列绑定一个路由,然后交换机往这两个路由发送消息;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
// 2.建立连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
/**
* 4.创建交换机
* 参数1:交换机名
* 参数2:交换机类型
* 参数3:交换机持久化
*/
channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
/**
* 5.创建队列,如果队列已存在,则使用该队列
* 参数1:队列名
* 参数2:是否持久化,true表示MQ重启后队列还在。
* 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
* 参数4:是否自动删除,true表示不再使用队列时自动删除队列
* 参数5:其他额外参数
*/
channel.queueDeclare("SEND_MAIL",true,false,false,null);
channel.queueDeclare("SEND_MESSAGE",true,false,false,null);
channel.queueDeclare("SEND_STATION",true,false,false,null);
/**
* 6.绑定队列
* 参数1:队列名
* 参数2:交换机名
* 参数3:路由关键字,发布订阅模式写""即可
*/
channel.queueBind("SEND_MAIL","exchange_routing","import");
channel.queueBind("SEND_MESSAGE","exchange_routing","import");
channel.queueBind("SEND_STATION","exchange_routing","import");
channel.queueBind("SEND_STATION","exchange_routing","normal");
// 7.发送消息
/**
* 参数1:交换机名,""表示默认交换机
* 参数2:路由键,简单模式就是队列名
* 参数3:其他额外参数
* 参数4:要传递的消息字节数组
*/
channel.basicPublish("exchange_routing", "import", null, "双十一大促活动!".getBytes());
channel.basicPublish("exchange_routing", "normal", null, "小心促销活动@".getBytes());
// 6.关闭信道和连接
channel.close();
connection.close();
System.out.println("--- 发送成功 ---");
}
}
接下来也编写三个消费者分别获取消息:这里三个消费者的代码基本一致就只写一个消费者了:
public class Consumer_station {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
// 2.建立连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
/**
* 4.监听队列
* 参数1:监听的队列名
* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
*/
channel.basicConsume("SEND_STATION",true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
String message = new String(body,"UTF-8");
System.out.println("发送站内短信,消息为: "+message);
}
});
}
}
执行结果应为,站内消息收到两条消息,其余途径只获取一条消息:如下图:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。