前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >RabbitMQ基础教程之Spring&JavaConfig使用篇

RabbitMQ基础教程之Spring&JavaConfig使用篇

作者头像
一灰灰blog
发布于 2018-06-04 07:17:46
发布于 2018-06-04 07:17:46
75000
代码可运行
举报
文章被收录于专栏:小灰灰小灰灰
运行总次数:0
代码可运行

RabbitMQ基础教程之Spring使用篇

相关博文,推荐查看:

  1. RabbitMq基础教程之安装与测试
  2. RabbitMq基础教程之基本概念
  3. RabbitMQ基础教程之基本使用篇
  4. RabbitMQ基础教程之使用进阶篇

在实际的应用场景中,将RabbitMQ和Spring结合起来使用的时候可能更加频繁,网上关于Spring结合的博文中,大多都是xml的方式,这篇博文,则主要介绍下利用JavaConfig的结合,又会是怎样的

<!--more-->

I. Spring中RabbitMQ的基本使用姿势

1. 准备

开始之前,首先添加上必要的依赖,主要利用 spring-rabbit 来实现,这个依赖中,内部又依赖的Spring相关的模块,下面统一改成5.0.4版本

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.20</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.7.3.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
</dependencies>

流程分析

实现主要分为两块,一个是投递服务,一个是消费服务,结合前面RabbitMQ的基本使用姿势中的流程,即便是使用Spring,我们也避免不了下面几步

  • 建立连接
  • 声明Exchange ,声明Queue
  • 建立Queue和Exchange之间的绑定关系
  • 发送消息
  • 消费消息(ack/nak)

2. 基本case

首先借助Spring,来实现一个最基本的最简单的实现方式

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * Created by yihui in 19:53 18/5/30.
 */
public class SimpleProducer {
    public static void main(String[] args) throws InterruptedException {
        CachingConnectionFactory factory = new CachingConnectionFactory("127.0.0.1", 5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        RabbitAdmin admin = new RabbitAdmin(factory);

        // 创建队列
        Queue queue = new Queue("hello", true, false, false, null);
        admin.declareQueue(queue);

        //创建topic类型的交换机
        TopicExchange exchange = new TopicExchange("topic.exchange");
        admin.declareExchange(exchange);

        //交换机和队列绑定,路由规则为匹配"foo."开头的路由键
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));


        //设置监听
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
        Object listener = new Object() {
            public void handleMessage(String foo) {
                System.out.println(" [x] Received '" + foo + "'");
            }
        };
        MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
        container.setMessageListener(adapter);
        container.setQueues(queue);
        container.start();

        //发送消息
        RabbitTemplate template = new RabbitTemplate(factory);
        template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!");
        Thread.sleep(1000);

        // 关闭
        container.stop();
    }
}

3. 逻辑分析

上面这一段代码中,包含了消息投递和消费两块,从实现而言,基本上逻辑和前面的基础使用没有什么太大的区别,步骤如下:

  1. 建立连接: new CachingConnectionFactory("127.0.0.1", 5672)
  2. 声明Queue: new Queue("hello", true, false, false, null)
  3. 声明Exchange: new TopicExchange("topic.exchange")
  4. 绑定Queue和Exchange: admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));
  5. 投递消息: template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!");
  6. 消费消息: 设置MessageListenerAdapter

这里面有几个类需要额外注意:

  • RabbitTemplate: Spring实现的发送消息的模板,可以直接发送消息
  • SimpleMessageListenerContainer: 注册接收消息的容器

II. Spring结合JavaConfig使用RabbitMQ使用姿势

1. 公共配置

主要是将公共的ConnectionFactory 和 RabbitAdmin 抽取出来

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
@ComponentScan("com.git.hui.rabbit.spring")
public class SpringConfig {

    private Environment environment;

    @Autowired
    public void setEnvironment(Environment environment) {
        this.environment = environment;
        System.out.println("then env: " + environment);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
}

2. 消息投递

发送消息的组件就比较简单了,直接利用 AmqpTemplate 即可

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
public class AmqpProducer {

    private AmqpTemplate amqpTemplate;

    @Autowired
    public void amqpTemplate(ConnectionFactory connectionFactory) {
        amqpTemplate = new RabbitTemplate(connectionFactory);
    }

    /**
     * 将消息发送到指定的交换器上
     *
     * @param exchange
     * @param msg
     */
    public void publishMsg(String exchange, String routingKey, Object msg) {
        amqpTemplate.convertAndSend(exchange, routingKey, msg);
    }
}

3. DirectExchange消息消费

根据不同的Exchange类型,分别实现如下

DirectExchange方式

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class DirectConsumerConfig {
    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public DirectExchange directExchange() {
        DirectExchange directExchange = new DirectExchange("direct.exchange");
        directExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return directExchange;
    }

    @Bean
    public Queue directQueue() {
        Queue queue = new Queue("aaa");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding directQueueBinding() {
        Binding binding = BindingBuilder.bind(directQueue()).to(directExchange()).with("test1");
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener directConsumer() {
        return new BasicConsumer("direct");
    }

    @Bean(name = "directMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(directQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(directConsumer());
        return container;
    }
}

从上面的实现,基本上都是重新定义了一个Queue, Exchange, Binding, MessageListenerContainer(用来监听消息),并将消息的消费抽出了一个公共类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
public class BasicConsumer implements ChannelAwareMessageListener {
    private String name;

    public BasicConsumer(String name) {
        this.name = name;
    }

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            byte[] bytes = message.getBody();
            String data = new String(bytes, "utf-8");
            System.out.println(name + " data: " + data + " tagId: " + message.getMessageProperties().getDeliveryTag());
        } catch (Exception e) {
            log.error("local cache rabbit mq localQueue error! e: {}", e);
        }
    }
}

4. 测试

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringConfig.class)
public class SprintUnit {
    @Autowired
    private AmqpProducer amqpProducer;

    @Test
    public void testDirectConsumer() throws InterruptedException {
        String[] routingKey = new String[]{"hello.world", "world", "test1"};
        for (int i = 0; i < 10; i++) {
            amqpProducer
                    .publishMsg("direct.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i);
        }
        System.out.println("-------over---------");

        Thread.sleep(1000 * 60 * 10);
    }
}

这个测试类中,虽然主要是往MQ中投递消息,但在Spring容器启动之后,接收MQ消息并消费的实际任务,是通过前面的MessageListenerContainer托付给Spring容器了,上面测试执行之后,输出为

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
direct data: >>> hello test1>>> 2 tagId: 1
direct data: >>> hello test1>>> 5 tagId: 2
direct data: >>> hello test1>>> 8 tagId: 3

5. Topic & Fanout策略

上面的一个写出来之后,再看这两个就比较相似了

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class TopicConsumerConfig {
    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public TopicExchange topicExchange() {
        TopicExchange topicExchange = new TopicExchange("topic.exchange");
        topicExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return topicExchange;
    }

    @Bean
    public Queue topicQueue() {
        Queue queue = new Queue("bbb");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding topicQueueBinding() {
        Binding binding = BindingBuilder.bind(topicQueue()).to(topicExchange()).with("*.queue");
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener topicConsumer() {
        return new BasicConsumer("topic");
    }

    @Bean(name = "topicMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(topicQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(topicConsumer());
        return container;
    }
}

对应的测试case

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Test
public void testTopicConsumer() throws InterruptedException {
    String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue"};
    for (int i = 0; i < 20; i++) {
        amqpProducer.publishMsg("topic.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i);
    }
    System.out.println("-------over---------");

    Thread.sleep(1000 * 60 * 10);
}

广播方式

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class FanoutConsumerConfig {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public FanoutExchange fanoutExchange() {
        FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
        fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return fanoutExchange;
    }

    @Bean
    public Queue fanoutQueue() {
        Queue queue = new Queue("ccc");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding fanoutQueueBinding() {
        Binding binding = BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener fanoutConsumer() {
        return new BasicConsumer("fanout");
    }

    @Bean(name = "FanoutMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(fanoutQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(fanoutConsumer());
        return container;
    }
}

对应的测试case

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Test
public void testFanoutConsumer() throws InterruptedException {
    String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue", "hello.world", "world", "test1"};
    for (int i = 0; i < 20; i++) {
        amqpProducer
                .publishMsg("fanout.exchange", routingKey[i % 6], ">>> hello " + routingKey[i % 6] + ">>> " + i);
    }
    System.out.println("-------over---------");

    Thread.sleep(1000 * 60 * 10);
}

II. 其他

项目地址

一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

声明

尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
JavaScript闭包实例讲解
闭包是JavaScript语言中的难点,很多刚入行的(包括我在内)一时对他很难理解,于是在网上各种搜罗有关闭包的学习资料,但是无数的文章介绍闭包,但都是了解一个皮毛。说实在的我到现在也不敢和大家百分百的肯定掌握它。所以今天我就把我的整理的学习笔记分享给大家,希望能够对大家有用。
ZEHAN
2020/09/23
6520
从λ演算到函数式编程聊闭包(2):彻底理解JavaScript闭包规则
闭包是很多语言都具备的特性,上篇《从抽象代数漫游函数式编程(1):闭包概念再Java/PHP/JS中的定义》
周陆军
2021/08/24
8180
闭包初识
[详情参考](http://www.cnblogs.com/wangfupeng1988/p/3994065.html);
天天_哥
2018/09/29
2930
js闭包就那么回事
今天了解了一下js闭包这块的内容,还是有点诡异的,将实践结果记录一下,看完只后,我敢说,闭包就那么回事,所谓的闭包,其实就是客户端开发中,其实就是叫做内存泄漏,就是不当引用导致对象没法得到释放,哈哈,玩笑开得有点过了,只是有点像哈,其实并不全是。
老码小张
2020/05/04
6780
js闭包就那么回事
js 闭包 详解
闭包(closure)是Javascript语言的一个难点,也是它的特色,很多高级应用都要依靠闭包实现。 闭包的特性 闭包有三个特性: 1.函数嵌套函数 2.函数内部可以引用外部的参数和变量 3.参数和变量不会被垃圾回收机制回收 闭包的定义及其优缺点 闭包 是指有权访问另一个函数作用域中的变量的函数,创建闭包的最常见的方式就是在一个函数内创建另一个函数,通过另一个函数访问这个函数的局部变量 使用闭包有一个优点,也是它的缺点,就是可以把局部变量驻留在内存中,可以避免使用全局变量。全局变量在每个模块都可调用,这
deepcc
2018/05/16
3K0
理解闭包 js回收机制
为什么要有回收机制?why? 打个比方,我有一个内存卡,这个内存是8G的,我把文件,视频,音乐,都保存到了这个内存卡,随着我的储存的内容越来越多,这个内存卡已经保存不了了,如果我还想再把其他的文件保存到这个内存卡就需要删除一些文件,但是这些被删除的文件是我们自己手动删除的对吧,手动删除就相当于js中的delete。 在这些程序语言中同样也会出现这些问题,对,内存!我们声明的任何变量都需要消耗内存,这些变量越多运行的速度也会越慢。当然不只是变量,代码中的任何东西。这些语言的设计者为了解决这些问题,设计了一套代
庞小明
2018/03/07
1.4K0
JavaScript进阶教程(5)-一文让你搞懂作用域链和闭包
在JS中变量可以分为局部变量和全局变量,对于变量不熟悉的可以看一下我这篇文章:搞懂JavaScript全局变量与局部变量,看这篇文章就够了 作用域就是变量的使用范围,分为局部作用域和全局作用域,局部变量的使用范围为局部作用域,全局变量的使用范围是全局作用域。在 ECMAScript 2015 引入let 关键字之前,js中没有块级作用域---即在JS中一对花括号({})中定义的变量,依然可以在花括号外面使用。
AlbertYang
2020/09/16
3300
JavaScript进阶教程(5)-一文让你搞懂作用域链和闭包
深入理解作用域和闭包
JavaScript中的变量是松散类型的,没有规则定义它必须包含什么数据类型,它的值和数据类型在执行期间是可以改变的。
神奇的程序员
2022/04/10
5610
深入理解作用域和闭包
JavaScript闭包及实现循环绑定事件
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
奋飛
2019/08/15
9400
前端面试题:闭包_前端设计模式面试题
前段时间一直在投一些中小型公司吧,感觉好久都收不到反馈,也不知道是被淘汰了还是没出结果呢,最近开始投一些大一点的公司准备尝试一下,就在昨天接到面试电话的时候,接受到了滴滴的毒打。跟一些面试不一样的是不只是一些基础的基本概念吧,比如说什么是原型和原型链,说一下继承,讲一下this指向之类的。更多的是为什么要这样用,手写算法,预测输出结果之类的面试题。
全栈程序员站长
2022/09/27
3560
前端面试题:闭包_前端设计模式面试题
JS闭包
闭包定义 闭包是一个拥有许多变量和绑定了这些变量的环境的表达式(通常是一个函数),因而这些变量也是该表达式的一部分。就是在另一个作用域中保存了一份它从上一级函数或者作用域得到的变量,而这些变量是不会随上一级函数的执行完成而销毁。 前提条件 计算机中的内存变量如果有被引用着的话,则系统是不会将之回收的。只要我们能够一直持有这个引用,则就可以令局部变量避免被回收——这是闭包概念成立的前提 闭包用途 可以读取到函数内部的变量 可以让函数内部变量保持在内存中 避免全局变量的污染 私有成员的存在 注
苦咖啡
2018/04/28
2.6K0
深入贯彻闭包思想,全面理解JS闭包形成过程
谈起闭包,它可是JavaScript两个核心技术之一(异步和闭包),在面试以及实际应用当中,我们都离不开它们,甚至可以说它们是衡量js工程师实力的一个重要指标。下面我们就罗列闭包的几个常见问题,从回答问题的角度来理解和定义你们心中的闭包。
疯狂的技术宅
2019/03/28
7600
深入贯彻闭包思想,全面理解JS闭包形成过程
匿名函数闭包模仿块级作用域,轻松解决开发中的两大难题
大家都知道在ES6之前,JavaScript是没有块级作用域的,但其实我们是可以通过匿名函数的闭包来模仿实现一个块级作用域,并且可以依靠这样的操作来解决平时开发中的两大难题。
@零一
2021/01/29
7150
从闭包和高阶函数初探JS设计模式
JavaScript是一门完整的面向对象的编程语言,JavaScript在设计之初参考并引入了Lambda表达式、闭包和高阶函数等特性。
小东同学
2022/07/29
5440
从闭包和高阶函数初探JS设计模式
闭包与高阶函数
闭包(closure)指有权访问另一个函数作用域中变量的函数。简单理解就是 ,一个作用域可以访问另外一个函数内部的局部变量。
梨涡浅笑
2020/10/27
3580
JS完美收官之——闭包
在上一篇JS完美收官之作用域中,我们已经知道当函数执行完毕后,它所产生的执行期上下文会被销毁,被世人称之为渣男类型的,用完就丢掉,而今天我们探究的是闭包却与之相反,可以将闭包理解为"痴情的男孩",就是不管怎么打,怎么骂,都紧紧拽着你衣角的那种,不由想起曾小贤那句“好男人就是我,我就是闭包”。好了,我们一起来看看到底什么是闭包?为何被称为好男人呢?
程序员法医
2022/08/11
2220
JS完美收官之——闭包
闭包
闭包(closure)指有权访问另一个函数作用域中变量的函数。简单理解就是 ,一个作用域可以访问另外一个函数内部的局部变量。
清出于兰
2022/01/05
4900
闭包
深入理解立即执行函数
立即执行函数常用于第三方库,它可以用来隔离变量作用域,很多第三方库都会存在大量的变量和函数,在ES5环境下为了避免变量污染,开发者想到的解决办法就是使用立即执行函数。
神奇的程序员
2022/04/10
1.4K0
JavaScript闭包从概念、原理到应用
闭包的概念:有权访问另一个函数作用域中的变量的函数;一般情况就是在一个函数中包含另一个函数。
JanYork_简昀
2022/05/25
7040
JavaScript闭包从概念、原理到应用
JS作用域与闭包
全局变量在函数外定义,HTML中全局变量是window对象,所有数据对象都属于window对象。
用户7353950
2022/06/23
2K0
JS作用域与闭包
相关推荐
JavaScript闭包实例讲解
更多 >
目录
  • RabbitMQ基础教程之Spring使用篇
    • I. Spring中RabbitMQ的基本使用姿势
      • 1. 准备
      • 2. 基本case
      • 3. 逻辑分析
    • II. Spring结合JavaConfig使用RabbitMQ使用姿势
      • 1. 公共配置
      • 2. 消息投递
      • 3. DirectExchange消息消费
      • 4. 测试
      • 5. Topic & Fanout策略
    • II. 其他
      • 项目地址
      • 一灰灰Blog: https://liuyueyi.github.io/hexblog
      • 声明
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档