前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ在Springboot下的使用

RabbitMQ在Springboot下的使用

作者头像
冬天里的懒猫
发布2021-11-03 14:36:20
3330
发布2021-11-03 14:36:20
举报
文章被收录于专栏:做不甩锅的后端

在springboot下操作rabbitMQ。

1.pom文件配置

pom文件配置如下:

代码语言:javascript
复制
  <!-- Spring Boot -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- amqp协议 用来连接rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>

2.yml配置

代码语言:javascript
复制
server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.161.114
    port: 5672
    username: root
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 手动ack
        concurrency: 5 #消费端最小并发数
        max-concurrency: 10 #消费端最大并发数
        prefetch: 5 # 一次请求中预处理的消息数量
    cache:
      channel:
        size: 50 #缓存的channel数量

#自定义配置
rabbitmq-demo:
  defaultExchange: amqpExchange
  queue: queue
  routeKey: queue_key

3.java代码

自定义MQ配置:MQProperties:

代码语言:javascript
复制
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "rabbitmq-demo")
@Data
public class MQProperties {

	private String defaultExchange;
	private String routeKey;
	private String queue;
}

RabbitMQ中队列及exchange的配置:

代码语言:javascript
复制
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitMQConfig {
	
	@Autowired
	private MQProperties mqProperties;
	
	@Bean
	public Queue queue() {
		boolean durable = true;
		boolean exclusive = false;
		boolean autoDelete = false;
		return new Queue(mqProperties.getQueue(),durable,exclusive,autoDelete);
	}


	@Bean
	public DirectExchange defaultExchange() {
		boolean durable = true;
		boolean autoDelete = false;
		return new DirectExchange(mqProperties.getDefaultExchange(), durable, autoDelete);
	}

	@Bean
	public Binding binding() {
		return BindingBuilder.bind(queue())
				.to(defaultExchange())
				.with(mqProperties.getRouteKey());
	}
}

生产者Producer:

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class Producer {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Autowired
	private MQProperties mqProperties;

	public void sendMessage(String msg) {
		rabbitTemplate.convertAndSend(mqProperties.getDefaultExchange(),
				mqProperties.getRouteKey(), msg);
	}
}

消费者 Consumer:

代码语言:javascript
复制
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class Consumer {
	
	@RabbitListener(queues = "${rabbitmq-demo.queue}")
	public void  receive(String payload, Channel channel,
	                     @Header(AmqpHeaders.DELIVERY_TAG) long tag){
		log.info("消费者获取消息内容:{}",payload);
		RabbitMQUtils.askMessage(channel, tag);
	}
}

定义一个发送数据的接口 Controller:

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class Controller {
	@Autowired
	Producer  producer;

	@RequestMapping("/sendQueue")
	@ResponseBody
	public String sendQueue(String msg) {
		producer.sendMessage(msg);
		return "success";
	}
}

定义一个工具类RabbitMQUtils:

代码语言:javascript
复制
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

@Slf4j
public class RabbitMQUtils {

	public static void askMessage(Channel channel, long tag) {
		askMessage(channel, tag, false);
	}

	public static void askMessage(Channel channel, long tag, boolean multiple) {
		try {
			channel.basicAck(tag, multiple);
		} catch (IOException e) {
			log.error("RabbitMQ,IO异常,异常原因为:{}", e.getMessage());
		}
	}

	public static void rejectMessage(Channel channel, long tag) {
		rejectMessage(channel, tag, false, false);
	}

	public static void rejectAndBackMQ(Channel channel, long tag) {
		rejectMessage(channel, tag, false, true);
	}

	public static void rejectMessage(Channel channel, long tag, boolean multiple, boolean request) {
		try {
			channel.basicNack(tag, multiple, request);
		} catch (IOException e) {
			log.error("RabbitMQ,IO异常,异常原因为:{}", e.getMessage());
		}
	}
}

4.测试

上述代码在springboot中启动以后,进行测试:

代码语言:javascript
复制
wget http://127.0.0.1:8080/sendQueue?msg=testmsg

测试结果:

代码语言:javascript
复制
2021-11-02 15:36:45.515  INFO 14692 --- [ntContainer#0-4] com.dhb.rabbitmq.demo.Consumer           : 消费者获取消息内容:testmsg

在RabbitMQ的后台界面,队列queue的绑定关系如下:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/11/02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.pom文件配置
  • 2.yml配置
  • 3.java代码
  • 4.测试
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档