前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ发布订阅模式

RabbitMQ发布订阅模式

原创
作者头像
会洗碗的CV工程师
发布2024-04-27 15:46:59
2160
发布2024-04-27 15:46:59
举报
文章被收录于专栏:消息中间件

一、概念

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)

特点:

  1. 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
  2. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

RabbitMQ的发布订阅模式是一种消息传递的方式,用于在分布式系统中实现消息的广播和接收。在这种模式下,生产者(发布者)将消息发送到交换机(Exchange),然后交换机根据一定的规则将消息路由到一个或多个队列(Queue)。消费者(订阅者)则从队列中接收并处理这些消息。

发布订阅模式的核心特点是消息的多播性,即一条消息可以被多个消费者接收。这使得该模式特别适用于需要将消息广播给多个接收者的场景,如实时数据分析、日志收集、消息通知等。

在RabbitMQ中,发布订阅模式的组成元素主要包括生产者、交换机、队列和消费者。生产者负责将消息发送到交换机,交换机则根据配置的路由规则将消息分发给相应的队列。每个队列可以有多个消费者,它们从队列中取出消息并进行处理。

RabbitMQ提供了多种类型的交换机,包括直接交换机(Direct)、主题交换机(Topic)、扇形交换机(Fanout)和头部交换机(Headers)。不同类型的交换机具有不同的路由规则,可以根据实际需求选择合适的交换机类型。

发布订阅模式的优点在于其灵活性和可扩展性。生产者可以灵活地发送消息到交换机,而无需关心具体的消费者是谁或有多少。同时,消费者可以动态地添加或移除,不会影响到整个系统的运行。此外,通过增加队列和消费者的数量,可以轻松地实现系统的水平扩展,提高消息的处理能力。

然而,发布订阅模式也存在一些潜在的问题。由于消息会被广播给所有匹配的队列,因此可能会导致不必要的消息冗余和浪费。此外,如果消费者处理消息的速度跟不上生产者发送消息的速度,可能会导致队列的堆积和消息的延迟处理。

总的来说,RabbitMQ的发布订阅模式是一种强大的消息传递机制,适用于需要将消息广播给多个接收者的场景。在实际应用中,需要根据具体需求选择合适的交换机类型、配置路由规则,并合理管理队列和消费者的数量,以确保系统的稳定性和高效性。

二、编写生产者

这里编写消费者主要是通过交换机绑定多个队列,向客户发送消息提醒如下:

代码语言:javascript
复制
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建链接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        // 2.建立连接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();

        /**
         * 4.创建交换机
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
        channel.exchangeDeclare("exchange_lyl-lsj", BuiltinExchangeType.FANOUT,true);

        /**
         * 5.创建队列,如果队列已存在,则使用该队列
         * 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数
         */
        channel.queueDeclare("GET_LOVE",true,false,false,null);
        channel.queueDeclare("SEND_SORRY",true,false,false,null);
        channel.queueDeclare("GET_REFUSE",true,false,false,null);

        /**
         * 6.绑定队列
         * 参数1:队列名
         * 参数2:交换机名
         * 参数3:路由关键字,发布订阅模式写""即可
         */
        channel.queueBind("GET_LOVE","exchange_lyl-lsj","");
        channel.queueBind("SEND_SORRY","exchange_lyl-lsj","");
        channel.queueBind("GET_REFUSE","exchange_lyl-lsj","");

        // 7.发送消息
        for (int i=1; i<10; i++) {
            /**
             * 参数1:交换机名,""表示默认交换机
             * 参数2:路由键,简单模式就是队列名
             * 参数3:其他额外参数
             * 参数4:要传递的消息字节数组
             */
            channel.basicPublish("exchange_lyl-lsj", "", null, ("对不起我错了,第"+i+"次").getBytes());
        }
        // 6.关闭信道和连接
        channel.close();
        connection.close();
        System.out.println("--- 发送成功 ---");
    }
}

运行成功之后,如下图出现新的交换机且成功绑定了三个队列:

三、编写消费者

这里设置了三个消费者,一个是求爱消费者,一个是道歉消费者,一个是拒绝消费者,三个同时都能接收生产者发送的消息,

求爱消费者代码:

代码语言:javascript
复制
public class Consumer_love {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建链接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        // 2.建立连接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();
        /**
         * 4.监听队列
         * 参数1:监听的队列名
         * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
         * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
         */
        channel.basicConsume("GET_LOVE",true, new DefaultConsumer(channel){
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
               String message = new String(body,"UTF-8");
               System.out.println("接受求爱,消息为: "+message);
           }
        });

    }
}

道歉消费者代码:

代码语言:javascript
复制
public class Consumer_sorry {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建链接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        // 2.建立连接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();
        /**
         * 4.监听队列
         * 参数1:监听的队列名
         * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
         * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
         */
        channel.basicConsume("SEND_SORRY",true, new DefaultConsumer(channel){
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
               String message = new String(body,"UTF-8");
               System.out.println("发送道歉,消息为: "+message);
           }
        });

    }
}

拒绝消费者代码:

代码语言:javascript
复制
public class Consumer_refuse {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建链接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        // 2.建立连接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();
        /**
         * 4.监听队列
         * 参数1:监听的队列名
         * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
         * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
         */
        channel.basicConsume("GET_REFUSE",true, new DefaultConsumer(channel){
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
               String message = new String(body,"UTF-8");
               System.out.println("得到拒绝,消息为: "+message);
           }
        });

    }
}

运行结果分别如下图:

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概念
  • 二、编写生产者
  • 三、编写消费者
相关产品与服务
短信
腾讯云短信(Short Message Service,SMS)可为广大企业级用户提供稳定可靠,安全合规的短信触达服务。用户可快速接入,调用 API / SDK 或者通过控制台即可发送,支持发送验证码、通知类短信和营销短信。国内验证短信秒级触达,99%到达率;国际/港澳台短信覆盖全球200+国家/地区,全球多服务站点,稳定可靠。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档