Spring-AMQP是Spring框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的POJO的消息监听等。
在SpringBoot项目中添加依赖:
<!-- 引入 rabbitmq 集成依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在同一个项目中,可能会出现开发、测试包括上线用的都是同一个消息队列,如果不进行隔离,很可能会出现开发环境不小心把线上环境的消息进行消费了,因此添加虚拟主机,达到一个消息隔离的效果。
http://localhost:15672/#/vhosts
在application.yml中进行配置
server:
port: 9090
spring:
application:
# 微服务系统有意义, 养成好习惯, 先写出来
name: rabbitmq-02-springboot
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /codingce
thymeleaf.cache: false
首先定义交换机和队列的名称,然后使用Bean注入的方式,注入交换机和队列对象,最后再绑定二者关系。
package cn.com.codingce.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
/**
* 交换机名称
*/
public static final String EXCHANGE_NAME = "health_hra3_exchange";
/**
* 队列名称
*/
public static final String QUEUE = "health_hra3_queue";
@Bean
public Exchange healthHra3Exchange() {
// 创建交换机,durable 代表持久化,使用Bean注入
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean
public Queue healthHra3Queue() {
// 创建队列,使用Bean注入
return QueueBuilder.durable(QUEUE).build();
}
/**
* 交换机和队列绑定关系
*
* @param queue 上面注入的队列Bean,如果你的项目又多个,记得给Bean取名字
* @param exchange 上面注入的交换机Bean
*/
@Bean
public Binding healthHra3Binding(Queue queue, Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("health.#").noargs();
}
}
package cn.com.codingce.controller;
import cn.com.codingce.common.utils.R;
import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("api")
@Slf4j
public class SendController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/testSend")
public R testSend() {
log.info("testSend");
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "health.#", "新HRA3报告来了!!");
return R.ok();
}
@GetMapping(value = "/default", produces = "text/html;charset=utf-8")
public String getDefault() {
return "队列服务运行正常...";
}
}
消息发送使用api/testSend
接口进行发送,消息接收我们创建消息监听类,进行消息接收。
package cn.com.codingce.listener;
import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConfig.QUEUE) // 监听的队列名称
public class healthHra3MQListener {
/**
* RabbitHandler会自动匹配消息类型(消息自动确认)
*
* @param msg 发送的是String类型,这里用String进行接收,RabbitHandler会自动进行匹配
* @param message
* @throws IOException
*/
@RabbitHandler
public void releaseCouponRecord(String msg, Message message) throws IOException {
log.info("releaseCouponRecord into"); // 监听到消息:消息内容,msg=新HRA3报告来啦!!
long msgTag = message.getMessageProperties().getDeliveryTag();
log.info("监听到消息:消息内容,msg={}", msg); // 监听到消息:消息内容,msg=新HRA3报告来啦!!
log.info("msgTag={}", msgTag); // msgTag=1
log.info("message={}", message.toString()); // message=(Body:'新HRA3报告来啦!!' MessageProperties [headers={}, ……
}
}
2023-11-12 14:19:19.431 INFO 15548 --- [io-9090-exec-17] c.c.codingce.controller.SendController : testSend
2023-11-12 14:19:19.432 INFO 15548 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener : releaseCouponRecord into
2023-11-12 14:19:19.432 INFO 15548 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener : 监听到消息:消息内容,msg=新HRA3报告来了!!
2023-11-12 14:19:19.432 INFO 15548 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener : msgTag=201
2023-11-12 14:19:19.432 INFO 15548 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener : message=(Body:'新HRA3报告来了!!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=health_hra3_exchange, receivedRoutingKey=health.#, deliveryTag=201, consumerTag=amq.ctag-nTiHnBWaCbD0ki3kZtnmFA, consumerQueue=health_hra3_queue])
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。