前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >08-RabbitMQ核心API-Topic Exchange

08-RabbitMQ核心API-Topic Exchange

作者头像
彼岸舞
发布2022-10-06 08:36:16
1700
发布2022-10-06 08:36:16
举报

Topic Exchange

简介

  • 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
  • Exchange将RouteKey和某Topic进行模糊匹配, 此时队列需要绑定一个Topic
  • 可以使用通配符进行模糊匹配
  • #: 匹配一个或多个词
  • *: 匹配不多不少一个词
    • log.# : 能够匹配 log.info.oa
    • log.* : 只会匹配 log.error

代码实现

消费者1

代码语言:javascript
复制
package com.dance.redis.mq.rabbit.topic;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
 
public class Receiver4TopicExchange1 {
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_topic_exchange";
        String queueName = "test_topic_queue";
//        String routingKey = "user.*";
        String routingKey = "user.#";
        RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
        RabbitMQHelper.queueDeclare(channel, queueName);
        channel.queueBind(queueName, exchangeName, routingKey);
        channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                System.err.println("consumer1 start.. ");
                System.out.println("recvive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, consumer);
        //等待回调函数执行完毕之后,关闭资源。
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
 
    }
}

消费者2

代码语言:javascript
复制
package com.dance.redis.mq.rabbit.topic;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;
 
public class Receiver4TopicExchange2 {
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey = "user.*";
//        String routingKey = "user.#";
        RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
        RabbitMQHelper.queueDeclare(channel,queueName);
        channel.queueBind(queueName, exchangeName, routingKey);
        channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                System.err.println("consumer2 start.. ");
                System.out.println("recvive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, consumer);
        //等待回调函数执行完毕之后,关闭资源。
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
}

生产者

代码语言:javascript
复制
package com.dance.redis.mq.rabbit.topic;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
public class Sender4TopicExchange {
 
    
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());     
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
        channel.close();  
        RabbitMQHelper.closeConnection();
    }
    
}

测试

测试#规则

确定目前是木有binding规则的, 如果没有启动过, 就不用看

启动消费者1

启动生产者

查看消费者1

三条消息全部被消费, 可以看到#是可以匹配多个词的

测试*规则

先去把之前的规则解绑

恢复到什么都没有的状态

启动消费者2

启动生产者

查看消费者2

可以看到3条消息只有2条消息被消费了, 所以*是只能匹配一个词的, 多个词的没有匹配到

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-10-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Topic Exchange
    • 简介
      • 代码实现
        • 测试
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档