前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ面试必备知识点及实战 - Exchange交换机类型详解

RabbitMQ面试必备知识点及实战 - Exchange交换机类型详解

作者头像
JavaEdge
修改2024-09-10 15:37:05
8450
修改2024-09-10 15:37:05
举报
文章被收录于专栏:Java

0 前言

Exchange:接收消息,并根据路由键转发消息所绑定的队列。交换机并非一个单独进程,而是一个有着“地址”的列表而已。

蓝区 - Send Message:把消息投递到交换机,由 RoutingKey 路由到指定队列。

交换机属性

声明交换机时可附带许多属性:

  • Name 交换机名称
  • Type 交换机类型,direct、topic、 fanout、 headers
  • Durability,是否需要持久化。 如果持久化,则RabbitMQ重启后,交换机还存在
  • Auto-delete 当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange
  • Internal 当前Exchange是否于RabbitMQ内部使用,默认为False

交换机类型

  1. Direct exchange(直连交换机)
  2. Fanout exchange(扇型交换机)
  3. Topic exchange(主题交换机)
  4. Headers exchange(头交换机)
  5. Dead Letter Exchange(死信交换机)

1 默认交换机

amq.* exchanges

1、一个队列对应了多个消费者,

2、默认,由队列对消息进行平均分配,消息会被分到不同的消费者手中。3、消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动

2 Direct Exchange

所有发送到DE的消息被转发到RouteKey中指定的Queue。

Direct模式可用RabbitMQ自带的Exchange:default Exchange,所以无需将Exchange进行任何绑定(binding),消息传递时,RouteKey须完全匹配才会被队列接收,否则该消息被丢弃。

Direct Exchange原理示意图
实战
代码语言:java
复制
/**
 * 直连模式-生产者
 *
 * @author JavaEdge
 */
public class ProducerDirectExchange {
    public static void main(String[] args) throws Exception {
       //1 创建ConnectionFactory
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.setHost("localhost");
       connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       
       //2 创建Connection
       Connection connection = connectionFactory.newConnection();

       //3 创建Channel
       Channel channel = connection.createChannel();

       //4 声明
       String exchangeName = "test_direct_exchange";
       // !!!!!!!!!!!!!!!!!!!!!!!!
       String routingKey = "test.direct";

       //5 发送
       String msg = "Hello JavaEdge RabbitMQ Direct Exchange Message ... ";
       channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    }
}
代码语言:java
复制
/**
 * 直连模式-消费者
 *
 * @author JavaEdge
 */
public class ConsumerDirectExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       // 自动重连(3s)
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // name和Pro中的一致
       String exchangeName = "test_direct_exchange";
       String exchangeType = "direct";
       String queueName = "test_direct_queue";
       String routingKey = "test.direct";

       channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
       channel.queueDeclare(queueName, false, false, false, null);
       //建立绑定关系
       channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        while(true){  
            //获取消息,如果没有消息,该步将会阻塞
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("Get Message:" + msg);
        } 
    }
}

路由key保持一致,分别启动:

看交换机:

看绑定关系:

看队列名:

看队列数据源的交换机:

3 Topic exchange

直接交换的局限性:不能做基于多个标准的路由。

如日志系统,可能不仅要根据严重性订阅日志,还要根据日志源订阅日志。

syslog unix工具根据严重性(info / warn / crit ...)和facility(auth / cron / kern ...)来路由日志。

这更具灵活性 - 可能想监听来自 cron 的关键错误及来自 kern 的所有日志。为了在日志记录系统实现这点,还需了解主题交换机。

  • *可匹配一个单词
  • #可匹配零或多个单词
  • 所有发送到Topic Exchange的消息会被转发到所有关心RouteKey中指定Topic的Queue
  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需绑定一个Topic

案例

将发送所有描述动物的消息。消息将与包含三个单词(两个点)的routing key一起发送。

代码语言:bash
复制
routing key中的第一个单词描述速度,第二颜色,第三是物种:“<speed>。<color>。<species>”。

创建三个绑定:

代码语言:bash
复制
Q1绑定了绑定键“* .orange.*”,Q2绑定了“*.*.rabbit”和“lazy.#”

这些绑定可总结为:

  • Q1对所有橙色动物感兴趣
  • Q2希望听到关于兔子的一切,以及关于懒惰动物的一切

routing key设置为“quick.orange.rabbit”的消息将传递到两个队列。消息“lazy.orange.elephant”也将同时发送给他们。另一方面

  • “quick.orange.fox”只会转到第一个队列
  • 而“lazy.brown.fox”只会转到第二个队列
  • “lazy.pink.rabbit”将仅传递到第二个队列一次,即使它匹配两个绑定
  • “quick.brown.fox”与任何绑定都不匹配,因此它将被丢弃。

如果我们违背我们的约定并发送带有一个或四个单词的消息,例如“orange” or “quick.orange.male.rabbit”,会发生什么?好吧,这些消息将不会匹配任何绑定,因此将丢失.

另一方面,“lazy.orange.male.rabbit”,虽然它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

实例图

实战

代码语言:java
复制
/**
 * @author JavaEdge
 */
public class Producer4TopicExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.javaedge";

        String msg = "Hello JavaEdge RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
        channel.close();
        connection.close();
    }
}
代码语言:java
复制
/**
 * 主题交换机-消费端
 *
 * @author JavaEdge
 */
public class Consumer4TopicExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
//     String routingKey = "user.#";
        String routingKey = "user.*";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while (true) {
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("Get Message:" + msg);
        }
    }
}

启动消费者:

启动生产者:

消费端收到了消息。

修改匹配格式,理论上只能接受前两个消息:

管控台,先将之前的匹配绑定取消:

显然仅能接受前两个消息:

小结

当队列绑定“#”(哈希)绑定key时,它将接收所有消息,而不管routing key,就像fanout交换机。

当特殊字符“*”(星号)和“#”(哈希)未在绑定中使用时,主题交换机的行为就像直接交换机。

4 Fanout Exchange

不处理路由键,只需简单的将队列绑定到交换机。发送到交换机的消息都会被转发到与该交换机绑定的所有队列。Fanout交换机转发消息是最快的:

实战

代码语言:java
复制
/**
 * @author JavaEdge
 */
public class Consumer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
       String exchangeName = "test_fanout_exchange";
       String exchangeType = "fanout";
       String queueName = "test_fanout_queue";
        // 不设置路由键
       String routingKey = "";
       channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
       channel.queueDeclare(queueName, false, false, false, null);
       channel.queueBind(queueName, exchangeName, routingKey);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while(true){
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("Get Message:" + msg);
        } 
    }
}
代码语言:java
复制
/**
 * @author JavaEdge
 */
public class Producer4FanoutExchange {
    public static void main(String[] args) throws Exception {
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.setHost("localhost");
       connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       Connection connection = connectionFactory.newConnection();
       Channel channel = connection.createChannel();
       String exchangeName = "test_fanout_exchange";
       for(int i = 0; i < 4; i ++) {
          String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
          channel.basicPublish(exchangeName, "", null , msg.getBytes());           
       }
       channel.close();  
        connection.close();  
    }
}

启动消费端

无需routing key

启动生产者后接收到的消息:

5 Header Exchange

根据消息头信息(headers)来路由消息,而非路由键(routing key)。要在消息头设置一些KV对,交换机会根据这些键值对来决定将消息路由到哪个队列。

代码语言:java
复制
/**
 * @author JavaEdge
 */
public class Producer4HeadersExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_headers_exchange";

        for (int i = 0; i < 4; i++) {
            String msg = "Hello World RabbitMQ 4 HEADERS Exchange Message ...";

            // 设置消息的头部信息
            Map<String, Object> headers = new HashMap<>();
          	// 指定匹配规则
            headers.put("x-match", "any"); // any 表示只要有一个头部信息匹配即可,all 表示所有头部信息都要匹配。
            headers.put("name", "JavaEdge");
            headers.put("age", "30");
						 // 将头部信息作为 BasicProperties 的一部分传递
            channel.basicPublish(exchangeName, "", new com.rabbitmq.client.AMQP.BasicProperties.Builder()
                    .headers(headers)
                    .build(), msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}
代码语言:java
复制
/**
 * @author JavaEdge
 */
public class Consumer4HeadersExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_headers_exchange";
        String exchangeType = "headers";
        String queueName = "test_headers_queue";

        // 声明交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);

        // 绑定队列到交换机,并设置头部匹配规则
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-match", "any");
        headers.put("name", "JavaEdge");
        headers.put("age", "30");

        channel.queueBind(queueName, exchangeName, "", headers);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("Get Message:" + msg);
        }
    }
}

关注我,紧跟本系列专栏文章,咱们下篇再续!

作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。 各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。 负责: 中央/分销预订系统性能优化 活动&券等营销中台建设 交易平台及数据中台等架构和开发设计 车联网核心平台-物联网连接平台、大数据平台架构设计及优化 LLM Agent应用开发 区块链应用开发 大数据开发挖掘经验 推荐系统项目 目前主攻市级软件项目设计、构建服务全社会的应用系统。

参考:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/11/13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0 前言
    • 交换机属性
      • 交换机类型
        • Direct Exchange原理示意图
        • 实战
    • 1 默认交换机
    • 2 Direct Exchange
    • 3 Topic exchange
      • 案例
        • 实例图
          • 实战
            • 小结
            • 4 Fanout Exchange
              • 实战
              • 5 Header Exchange
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档