1、Topic交换器(主题,规则匹配),Topic交换器也称为主题交换器,特点是根据规则进行匹配,可以根据模糊进行匹配(即根据路由key进行模糊匹配),决定将那个信息放入到指定的队列里面去。
项目的结构如下所示:
2、由于使用的是SpringBoot项目结合Maven项目构建的,pom.xml的配置文件,如下所示,生产者和消费者的配置文件一致,这里只贴一份了。
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
5 https://maven.apache.org/xsd/maven-4.0.0.xsd">
6 <modelVersion>4.0.0</modelVersion>
7 <parent>
8 <groupId>org.springframework.boot</groupId>
9 <artifactId>spring-boot-starter-parent</artifactId>
10 <version>2.1.1.RELEASE</version>
11 <relativePath /> <!-- lookup parent from repository -->
12 </parent>
13 <groupId>com.bie</groupId>
14 <artifactId>rabbitmq-topic-provider</artifactId>
15 <version>0.0.1-SNAPSHOT</version>
16 <name>rabbitmq-topic-provider</name>
17 <description>Demo project for Spring Boot</description>
18
19 <properties>
20 <java.version>1.8</java.version>
21 </properties>
22
23 <dependencies>
24 <dependency>
25 <groupId>org.springframework.boot</groupId>
26 <artifactId>spring-boot-starter</artifactId>
27 </dependency>
28 <dependency>
29 <groupId>org.springframework.boot</groupId>
30 <artifactId>spring-boot-starter-web</artifactId>
31 </dependency>
32 <dependency>
33 <groupId>org.springframework.boot</groupId>
34 <artifactId>spring-boot-starter-test</artifactId>
35 <scope>test</scope>
36 </dependency>
37 <dependency>
38 <groupId>org.springframework.boot</groupId>
39 <artifactId>spring-boot-starter-amqp</artifactId>
40 </dependency>
41 </dependencies>
42
43 <build>
44 <plugins>
45 <plugin>
46 <groupId>org.springframework.boot</groupId>
47 <artifactId>spring-boot-maven-plugin</artifactId>
48 </plugin>
49 </plugins>
50 </build>
51
52 </project>
3、配置好pom.xml配置文件,就可以进行开发了,这里先约束一下配置文件,体现一下SpringBoot的魔力,约定大于配置。
1 # 给当前项目起名称.
2 spring.application.name=rabbitmq-topic-provider
3
4 # 配置端口号
5 server.port=8081
6
7 # 配置rabbitmq的参数.
8 # rabbitmq服务器的ip地址.
9 spring.rabbitmq.host=192.168.110.133
10 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.
11 spring.rabbitmq.port=5672
12 # rabbitmq的账号.
13 spring.rabbitmq.username=guest
14 # rabbitmq的密码.
15 spring.rabbitmq.password=guest
16
17 # 设置交换器的名称,方便修改.
18 # 生产者和消费者的交换器的名称是一致的,这样生产者生产的消息发送到交换器,消费者可以从这个交换器中消费.
19 rabbitmq.config.exchange=log.exchange.topic
模拟三个服务,用户服务、商品服务,订单服务,产生的各种日志信息,包含info、debug、trace、warn、error日志信息。不同的日志级别信息指定好路由键,将发送的消息绑定到交换器上面,发送消息。
1 package com.example.bie.provider;
2
3 import org.springframework.amqp.core.AmqpTemplate;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.beans.factory.annotation.Value;
6 import org.springframework.stereotype.Component;
7
8 /**
9 *
10 * @author biehl
11 *
12 * 生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.
13 *
14 * 这里使用的交换器类型使用的是topic主题模式,根据规则匹配。
15 *
16 */
17 @Component
18 public class RabbitMqUserLogProduce {
19
20 @Autowired
21 private AmqpTemplate rabbitmqAmqpTemplate;
22
23 // 交换器的名称Exchange
24 @Value(value = "${rabbitmq.config.exchange}")
25 private String exchange;
26
27 // 路由键routingkey
28 private String routingKeyInfo = "user.log.info";
29 private String routingKeyDebug = "user.log.debug";
30 private String routingKeyTrace = "user.log.trace";
31 private String routingKeyWarn = "user.log.warn";
32 private String routingKeyError = "user.log.error";
33
34 /**
35 * 发送消息的方法
36 *
37 * @param msg
38 */
39 public void producer(String msg) {
40 // 向消息队列发送消息
41 // 参数1,交换器的名称
42 // 参数2,路由键
43 // 参数3,消息
44 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "user.log.info......" + msg);
45 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "user.log.debug......" + msg);
46 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "user.log.trace......" + msg);
47 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "user.log.warn......" + msg);
48 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "user.log.error......" + msg);
49 }
50
51 }
1 package com.example.bie.provider;
2
3 import org.springframework.amqp.core.AmqpTemplate;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.beans.factory.annotation.Value;
6 import org.springframework.stereotype.Component;
7
8 /**
9 *
10 * @author biehl
11 *
12 * 生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.
13 *
14 * 这里使用的交换器类型使用的是topic主题模式,根据规则匹配。
15 *
16 */
17 @Component
18 public class RabbitMqProductLogProduce {
19
20 @Autowired
21 private AmqpTemplate rabbitmqAmqpTemplate;
22
23 // 交换器的名称Exchange
24 @Value(value = "${rabbitmq.config.exchange}")
25 private String exchange;
26
27 // 路由键routingkey
28 private String routingKeyInfo = "product.log.info";
29 private String routingKeyDebug = "product.log.debug";
30 private String routingKeyTrace = "product.log.trace";
31 private String routingKeyWarn = "product.log.warn";
32 private String routingKeyError = "product.log.error";
33
34 /**
35 * 发送消息的方法
36 *
37 * @param msg
38 */
39 public void producer(String msg) {
40 // 向消息队列发送消息
41 // 参数1,交换器的名称
42 // 参数2,路由键
43 // 参数3,消息
44 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "product.log.info......" + msg);
45 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "product.log.debug......" + msg);
46 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "product.log.trace......" + msg);
47 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "product.log.warn......" + msg);
48 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "product.log.error......" + msg);
49 }
50
51 }
1 package com.example.bie.provider;
2
3 import org.springframework.amqp.core.AmqpTemplate;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.beans.factory.annotation.Value;
6 import org.springframework.stereotype.Component;
7
8 /**
9 *
10 * @author biehl
11 *
12 * 生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.
13 *
14 * 这里使用的交换器类型使用的是topic主题模式,根据规则匹配。
15 *
16 */
17 @Component
18 public class RabbitMqOrderLogProduce {
19
20 @Autowired
21 private AmqpTemplate rabbitmqAmqpTemplate;
22
23 // 交换器的名称Exchange
24 @Value(value = "${rabbitmq.config.exchange}")
25 private String exchange;
26
27 // 路由键routingkey
28 private String routingKeyInfo = "order.log.info";
29 private String routingKeyDebug = "order.log.debug";
30 private String routingKeyTrace = "order.log.trace";
31 private String routingKeyWarn = "order.log.warn";
32 private String routingKeyError = "order.log.error";
33
34 /**
35 * 发送消息的方法
36 *
37 * @param msg
38 */
39 public void producer(String msg) {
40 // 向消息队列发送消息
41 // 参数1,交换器的名称
42 // 参数2,路由键
43 // 参数3,消息
44 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "order.log.info......" + msg);
45 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "order.log.debug......" + msg);
46 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "order.log.trace......" + msg);
47 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "order.log.warn......" + msg);
48 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "order.log.error......" + msg);
49 }
50
51 }
这里使用web工程,浏览器访问调用,方便测试。你也可以使用单元测试的方法。
1 package com.example.bie.controller;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.stereotype.Controller;
5 import org.springframework.web.bind.annotation.RequestMapping;
6 import org.springframework.web.bind.annotation.ResponseBody;
7
8 import com.example.bie.provider.RabbitMqOrderLogProduce;
9 import com.example.bie.provider.RabbitMqProductLogProduce;
10 import com.example.bie.provider.RabbitMqUserLogProduce;
11
12 /**
13 *
14 * @author biehl
15 *
16 */
17 @Controller
18 public class RabbitmqController {
19
20 @Autowired
21 private RabbitMqUserLogProduce rabbitMqUserLogProduce;
22
23 @Autowired
24 private RabbitMqProductLogProduce rabbitMqProductLogProduce;
25
26 @Autowired
27 private RabbitMqOrderLogProduce rabbitMqOrderLogProduce;
28
29 @RequestMapping(value = "/userLogInfo")
30 @ResponseBody
31 public String rabbitmqSendUserLogInfoMessage() {
32 String msg = "生产者===>生者的UserLogInfo消息message: ";
33 for (int i = 0; i < 50000; i++) {
34 rabbitMqUserLogProduce.producer(msg + i);
35 }
36 return "生产===> UserLogInfo消息message ===> success!!!";
37 }
38
39 @RequestMapping(value = "/productLogInfo")
40 @ResponseBody
41 public String rabbitmqSendProductLogErrorMessage() {
42 String msg = "生产者===>生者的ProductLogInfo消息message: ";
43 for (int i = 0; i < 50000; i++) {
44 rabbitMqProductLogProduce.producer(msg + i);
45 }
46 return "生产===> ProductLogInfo消息message ===> success!!!";
47 }
48
49 @RequestMapping(value = "/orderLogInfo")
50 @ResponseBody
51 public String rabbitmqSendOrderLogInfoMessage() {
52 String msg = "生产者===>生者的OrderLogInfo消息message: ";
53 for (int i = 0; i < 50000; i++) {
54 rabbitMqOrderLogProduce.producer(msg + i);
55 }
56 return "生产===> OrderLogInfo消息message ===> success!!!";
57 }
58
59 }
生产者的启动类如下所示:
1 package com.example;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 @SpringBootApplication
7 public class RabbitmqProducerApplication {
8
9 public static void main(String[] args) {
10 SpringApplication.run(RabbitmqProducerApplication.class, args);
11 }
12
13 }
4、生产者开发完毕就可以进行消费者的开发,也是先约束一下配置文件application.properties。
1 # 给当前项目起名称.
2 spring.application.name=rabbitmq-topic-consumer
3
4 # 配置端口号
5 server.port=8080
6
7 # 配置rabbitmq的参数.
8 # rabbitmq服务器的ip地址.
9 spring.rabbitmq.host=192.168.110.133
10 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.
11 spring.rabbitmq.port=5672
12 # rabbitmq的账号.
13 spring.rabbitmq.username=guest
14 # rabbitmq的密码.
15 spring.rabbitmq.password=guest
16
17 # 设置交换器的名称,方便修改.
18 # 路由键是将交换器和队列进行绑定的,队列通过路由键绑定到交换器.
19 rabbitmq.config.exchange=log.exchange.topic
20
21 # info级别的队列名称.
22 rabbitmq.config.queue.info=log.info.queue
23
24 # error级别的队列名称.
25 rabbitmq.config.queue.error=log.error.queue
26
27 # 全日志log级别的队列名称.
28 rabbitmq.config.queue.log=log.all.queue
约束好配置文件就可以进行消费者的开发了,这里是将用户服务、商品服务、订单服务产生的info、debug、trace、warn、error不同级别的日志信息,使用rabbitmq的主题topic模式进行规则配置,即,消费者可以专一消费info级别的消息,error级别的消息,或者全部级别的日志消息。
1 package com.example.bie.consumer;
2
3 import org.springframework.amqp.core.ExchangeTypes;
4 import org.springframework.amqp.rabbit.annotation.Exchange;
5 import org.springframework.amqp.rabbit.annotation.Queue;
6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
9 import org.springframework.stereotype.Component;
10
11 /**
12 *
13 * @author biehl
14 *
15 * 消息接收者
16 *
17 * 1、@RabbitListener bindings:绑定队列
18 *
19 * 2、@QueueBinding
20 * value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21 *
22 * 3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23 *
24 * 4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25 *
26 *
27 */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30
31 value = @Queue(value = "${rabbitmq.config.queue.info}", autoDelete = "true"),
32
33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC),
34
35 key = "*.log.info"))
36 public class LogInfoConsumer {
37
38 /**
39 * 接收消息的方法,采用消息队列监听机制.
40 *
41 * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42 *
43 * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44 *
45 * @param msg
46 */
47 @RabbitHandler
48 public void consumer(String msg) {
49 // 打印消息
50 System.out.println("All消费者===>消费: " + msg);
51 }
52
53 }
1 package com.example.bie.consumer;
2
3 import org.springframework.amqp.core.ExchangeTypes;
4 import org.springframework.amqp.rabbit.annotation.Exchange;
5 import org.springframework.amqp.rabbit.annotation.Queue;
6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
9 import org.springframework.stereotype.Component;
10
11 /**
12 *
13 * @author biehl
14 *
15 * 消息接收者
16 *
17 * 1、@RabbitListener bindings:绑定队列
18 *
19 * 2、@QueueBinding
20 * value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21 *
22 * 3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23 *
24 * 4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25 *
26 *
27 */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30
31 value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),
32
33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC),
34
35 key = "*.log.error"))
36 public class LogErrorConsumer {
37
38 /**
39 * 接收消息的方法,采用消息队列监听机制.
40 *
41 * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42 *
43 * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44 *
45 * @param msg
46 */
47 @RabbitHandler
48 public void consumer(String msg) {
49 // 打印消息
50 System.out.println("Error消费者===>消费: " + msg);
51 }
52
53 }
1 package com.example.bie.consumer;
2
3 import org.springframework.amqp.core.ExchangeTypes;
4 import org.springframework.amqp.rabbit.annotation.Exchange;
5 import org.springframework.amqp.rabbit.annotation.Queue;
6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
9 import org.springframework.stereotype.Component;
10
11 /**
12 *
13 * @author biehl
14 *
15 * 消息接收者
16 *
17 * 1、@RabbitListener bindings:绑定队列
18 *
19 * 2、@QueueBinding
20 * value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21 *
22 * 3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23 *
24 * 4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25 *
26 *
27 */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30
31 value = @Queue(value = "${rabbitmq.config.queue.log}", autoDelete = "true"),
32
33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC),
34
35 key = "*.log.*"))
36 public class LogAllConsumer {
37
38 /**
39 * 接收消息的方法,采用消息队列监听机制.
40 *
41 * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42 *
43 * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44 *
45 * @param msg
46 */
47 @RabbitHandler
48 public void consumer(String msg) {
49 // 打印消息
50 System.out.println("Info消费者===>消费: " + msg);
51 }
52
53 }
消费者的启动类,如下所示:
1 package com.example;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 @SpringBootApplication
7 public class RabbitmqConsumerApplication {
8
9 public static void main(String[] args) {
10 SpringApplication.run(RabbitmqConsumerApplication.class, args);
11 }
12
13 }
5、运行效果如下所示: