前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Spring Boot】集成RabbitMQ

【Spring Boot】集成RabbitMQ

原创
作者头像
后端码匠
发布2023-11-12 14:22:15
2970
发布2023-11-12 14:22:15
举报
文章被收录于专栏:后端码匠

【Spring Boot】集成RabbitMQ

Spring-AMQP是Spring框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的POJO的消息监听等。

  • 提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象,最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发
  • 总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter
  • 我们使用 spring-boot-starter-amqp 进行开发

在SpringBoot项目中添加依赖:

代码语言:html
复制
<!-- 引入 rabbitmq 集成依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

项目目录

添加虚拟主机

在同一个项目中,可能会出现开发、测试包括上线用的都是同一个消息队列,如果不进行隔离,很可能会出现开发环境不小心把线上环境的消息进行消费了,因此添加虚拟主机,达到一个消息隔离的效果。

代码语言:shell
复制
http://localhost:15672/#/vhosts

SpringBoot配置RabbitMQ

在application.yml中进行配置

代码语言:yaml
复制
server:
  port: 9090

spring:
  application:
    # 微服务系统有意义, 养成好习惯, 先写出来
    name: rabbitmq-02-springboot
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /codingce

  thymeleaf.cache: false

创建配置类RabbitMQConfig

首先定义交换机和队列的名称,然后使用Bean注入的方式,注入交换机和队列对象,最后再绑定二者关系。

代码语言:java
复制
package cn.com.codingce.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    /**
     * 交换机名称
     */
    public static final String EXCHANGE_NAME = "health_hra3_exchange";

    /**
     * 队列名称
     */
    public static final String QUEUE = "health_hra3_queue";


    @Bean
    public Exchange healthHra3Exchange() {
        // 创建交换机,durable 代表持久化,使用Bean注入
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    @Bean
    public Queue healthHra3Queue() {
        // 创建队列,使用Bean注入
        return QueueBuilder.durable(QUEUE).build();
    }

    /**
     * 交换机和队列绑定关系
     *
     * @param queue    上面注入的队列Bean,如果你的项目又多个,记得给Bean取名字
     * @param exchange 上面注入的交换机Bean
     */
    @Bean
    public Binding healthHra3Binding(Queue queue, Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("health.#").noargs();
    }

}

发送消息与接收消息

消息生产者发送消息

代码语言:java
复制
package cn.com.codingce.controller;


import cn.com.codingce.common.utils.R;
import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("api")
@Slf4j
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/testSend")
    public R testSend() {
        log.info("testSend");
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "health.#", "新HRA3报告来了!!");
        return R.ok();
    }

    @GetMapping(value = "/default", produces = "text/html;charset=utf-8")
    public String getDefault() {
        return "队列服务运行正常...";
    }

}

消息消费者监听消息

消息发送使用api/testSend接口进行发送,消息接收我们创建消息监听类,进行消息接收。

代码语言:java
复制
package cn.com.codingce.listener;

import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
@RabbitListener(queues = RabbitMQConfig.QUEUE) // 监听的队列名称
public class healthHra3MQListener {

    /**
     * RabbitHandler会自动匹配消息类型(消息自动确认)
     *
     * @param msg     发送的是String类型,这里用String进行接收,RabbitHandler会自动进行匹配
     * @param message
     * @throws IOException
     */
    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {
        log.info("releaseCouponRecord into"); // 监听到消息:消息内容,msg=新HRA3报告来啦!!
        long msgTag = message.getMessageProperties().getDeliveryTag();
        log.info("监听到消息:消息内容,msg={}", msg); // 监听到消息:消息内容,msg=新HRA3报告来啦!!
        log.info("msgTag={}", msgTag); // msgTag=1
        log.info("message={}", message.toString()); // message=(Body:'新HRA3报告来啦!!' MessageProperties [headers={}, ……
    }

}

LOG

代码语言:shell
复制
2023-11-12 14:19:19.431  INFO 15548 --- [io-9090-exec-17] c.c.codingce.controller.SendController   : testSend
2023-11-12 14:19:19.432  INFO 15548 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : releaseCouponRecord into
2023-11-12 14:19:19.432  INFO 15548 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : 监听到消息:消息内容,msg=新HRA3报告来了!!
2023-11-12 14:19:19.432  INFO 15548 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : msgTag=201
2023-11-12 14:19:19.432  INFO 15548 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : message=(Body:'新HRA3报告来了!!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=health_hra3_exchange, receivedRoutingKey=health.#, deliveryTag=201, consumerTag=amq.ctag-nTiHnBWaCbD0ki3kZtnmFA, consumerQueue=health_hra3_queue])

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 【Spring Boot】集成RabbitMQ
    • 项目目录
      • 添加虚拟主机
        • SpringBoot配置RabbitMQ
          • 创建配置类RabbitMQConfig
            • 发送消息与接收消息
              • 消息生产者发送消息
              • 消息消费者监听消息
            • LOG
            相关产品与服务
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档