前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spring JMS各组件详解

Spring JMS各组件详解

作者头像
颇忒脱
发布于 2018-10-19 06:42:54
发布于 2018-10-19 06:42:54
1.6K00
代码可运行
举报
运行总次数:0
代码可运行

Github

在上一篇文章使用Spring/Spring Boot集成JMS的陷阱中讲到了在使用Spring JMS组件时存在这一些性能陷阱,本文会着该文讲解一下Spring JMS的各个组件、它们的作用以及正确使用的方法。

JmsTemplate

用来send、receive消息(receive是同步式的,会block)。每个JmsTemplate实例拥有自己的配置,比如:connectionFactorysessionTransactedsessionAcknowledgeModedeliveryModetimeToLive等等,所以需根据不同场景配置提供不同的JmsTemplate Bean而不是一个Singleton Bean通吃所有JMS操作。

下面是类图(只包含了部分关键属性):

ConnectionFactory

Spring提供了两个javax.jms.ConnectionFactory的实现:SingleConnectionFactoryCachingConnectionFactory。它们实际上是一种Wrapper,用来缓存如:ConnectionSessionMessageProducerMessageConsumer

事实上JmsTemplate的Javadoc提到过:

NOTE: The ConnectionFactory used with this template should return pooled Connections (or a single shared Connection) as well as pooled Sessions and MessageProducers. Otherwise, performance of ad-hoc JMS operations is going to suffer.

在Spring JMS文档的Caching Messaging Resources中也提到了需要优化资源使用以提升性能:

The standard API involves creating many intermediate objects. To send a message the following 'API' walk is performed ConnectionFactory->Connection->Session->MessageProducer->send Between the ConnectionFactory and the Send operation there are three intermediate objects that are created and destroyed. To optimise the resource usage and increase performance two implementations of ConnectionFactory are provided.

下面是类图(只包含了部分关键属性):

SingleConnectionFactory

SingleConnectionFactory顾名思义,无论调用多少次createConnection(..)都返回同一个Connection实例。但是它并不缓存Session,也就是说调用一次createSession(...)就会创建一个新的实例。

可以通过SingleConnectionFactoryTest了解详情。

所以在大多数情况下,不推荐使用SingleConnectionFactory

CachingConnectionFactory

CachingConnectionFactory继承自SingleConnectionFactory,除了依然保留缓存同一个Connection实例的特性外,还增加了对于SessionMessageProducerMessageConsumer的缓存。

CachingConnectionFactory其内部维护了一个Acknowledge Mode -> List<Session>的Map,sessionCacheSize实际上指的是List<Session>的大小,所以最多会有4 * sessionCacheSize数量的Session被缓存(因为JMS规定了四种Acknowledge Mode)。 并且CachingConnectionFactory其本质不是一个Object Pool,所以不会因为实际请求Session数量超出sessionCacheSize导致block或者返回null,可以放心使用。

CachingConnectionFactory返回的每个Session内部都有ConsumerCacheKey -> MessageConsumer以及DestinationCacheKey -> MessageProducer的Map,用来缓存MessageProducerMessageConsumer

可以通过CachingConnectionFactory了解详情。

MessageListenerContainer

Spring JMS中有一个特性MessageListenerContainer,按照官方文档的说法:

A message listener container is used to receive messages from a JMS message queue and drive the MessageListener that is injected into it.

上面提到的MessageListener就是javax.jms.MessageListener,第一次看到这个东西感觉有点奇怪,因为MessageListener的正规用法应该MessageConsumer.setMessageListener()就行了。

因为MessageListenerContainer继承自SmartLifeCycle,所以它提供了程序启动时开启connection、session,程序关闭是关闭session、connection的功能,能够让你不用操心资源回收问题。

下面介绍一下两个实现SimpleMessageListenerContainerDefaultMessageListenerContainer

下面是类图(只包含了部分关键属性):

SimpleMessageListenerContainer

SimpleMessageListenerContainer使用MessageConsumer.setMessageListener()来监听消息,它不支持参与外部事务(比如PlatformTransactionManager)。

它是可以持有多个MessageConsumer实例的。代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// ...

private int concurrentConsumers = 1;
private Set<Session> sessions;
private Set<MessageConsumer> consumers;

// ...
protected void initializeConsumers() throws JMSException {
  // Register Sessions and MessageConsumers.
  synchronized (this.consumersMonitor) {
    if (this.consumers == null) {
      this.sessions = new HashSet<Session>(this.concurrentConsumers);
      this.consumers = new HashSet<MessageConsumer>(this.concurrentConsumers);
      Connection con = getSharedConnection();
      for (int i = 0; i < this.concurrentConsumers; i++) {
        Session session = createSession(con);
        MessageConsumer consumer = createListenerConsumer(session);
        this.sessions.add(session);
        this.consumers.add(consumer);
      }
    }
  }
}

其处理消息的方式有两种:1)传统的MessageConsumer.setMessageListener();2)使用Executor

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
if (this.taskExecutor != null) {
  consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(final Message message) {
      taskExecutor.execute(new Runnable() {
        @Override
        public void run() {
          processMessage(message, session);
        }
      });
    }
  });
}
else {
  consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
      processMessage(message, session);
    }
  });
}

DefaultMessageListenerContainer

DefaultMessageListenerContainerSimpleMessageListenerContainer不同,它使用MessageConsumer.receive()来处理消息,并且支持XA transaction。

因为receive()是同步的、blocking方法,其性能没有setMessageListener()好,所以它非常依赖多线程(TaskExecutor),这也也带来来dynamic scaling的好处。

请注意不要对Topic采用多线程,否则会收到重复的消息,详情见官方文档

异步接收消息

同步接收消息的方式有JmsTemplate.receive*()MessageConsumer.receive*(),这里不多讲,重点讲异步接收消息的几种方式。

MessageListener & MessageListenerContainer

MessageListener包装到MessageListenerContainer里接收消息,例子参见官方文档Asynchronous Reception - Message-Driven POJOs

SessionAwareMessageListener

SessionAwareMessageListener是Spring提供和MessageListener类似的接口,MessageListenerContainer支持这个接口,用法和MessageListener一样。

MessageListenerAdapter

MessageListenerAdapter是Spring提供的另一个异步接收消息的方式,它MessageListenerSessionAwareMessageListener更灵活,因为它采用反射机制来把消息传递到你的接收消息的方法上。

使用方法见官方文档the SessionAwareMessageListener interface

@JmsListener

@JmsListener是另一种接收消息的方法,怎么使用它可以看官方文档Annotation-driven listener endpoints

@JmsListenerMessageListenerSessionAwareMessageListenerMessageListenerAdapter一样也需要一个Container,用户可以通过@JmsListener.containerFactory属性来指定JmsListenerContainerFactory

Spring提供了两种JmsListenerContainerFactory实现:

  1. DefaultJmsListenerContainerFactory,用来生产DefaultMessageListenerContainer,Spring Boot提供DefaultJmsListenerContainerFactoryConfigurer作为配置工具
  2. SimpleJmsListenerContainerFactory,用来生产SimpleMessageListenerContainer

所以在使用@JmsListener需要仔细的选择正确的JmsListenerContainerFactory,而不是全局采用一种配置。

总结

使用Spring JMS时有需要注意以下三点:

  1. 根据实际情况,配置合适的ConnectionFactory Bean,如有需要可以有多个ConnectionFactory Bean。
  2. JmsTemplate, MessageListenerContainer, JmsListenerContainerFactory需根据实际情况配置不同Bean,避免全局使用一套。
  3. JmsTemplate, MessageListenerContainer, JmsListenerContainerFactory选择合适的ConnectionFactory。
  4. 设定好合适的Executor/线程池大小,避免大量Thread block。

下面是一张各个组件的关系图。

参考资料

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
ActiveMQ进阶学习
本文主要讲述ActiveMQ与spring整合的方案。介绍知识点包括spring,jms,activemq基于配置文件模式管理消息,消息监听器类型,消息转换类介绍,spring对JMS事物管理。 1. spring整合activemq配置文件说明 1.1 配置ConnectionFactory ConnectionFactory是用于产生到JMS服务器的链接的,Spring提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFac
企鹅号小编
2018/02/02
1.1K1
ActiveMQ进阶学习
干货--JMS(java消息服务)整合Spring项目案例
Sprng-jms消息服务小项目 所需的包: spring的基础包 spring-jms-xx包 spring-message–xx包 commons-collection-xx包 commons-pool2-xx包 aop切面的包: spring-aop,spring-aspect,aopalliance,aspectjrt.jar,aspectjweaver.jar 配置: 1.配置ConnectionFactory 2.
汤高
2018/01/11
1.9K0
干货--JMS(java消息服务)整合Spring项目案例
Spring JMS的使用
我们都知道使用Spring可以简化我们的开发,同样的使用Spring也可以集成JMS来连接ActiveMQ,这里说明一下几个需要用到的类:
端碗吹水
2020/09/23
9370
Spring JMS的使用
深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
第一篇博文深入浅出JMS(一)–JMS基本概念,我们介绍了JMS的两种消息模型:点对点和发布订阅模型,以及消息被消费的两个方式:同步和异步,JMS编程模型的对象,最后说了JMS的优点。
程序猿小亮
2021/01/29
6260
Spring JMS---三种消息监听器
链接:https://blog.csdn.net/u012562943/article/details/51424232
用户5224393
2019/08/13
2.5K0
spring 整合 ActiveMQ
JMS即Java Message Service,Java消息服务。主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。
WindWant
2020/09/11
4810
JMS学习之路(一):整合activeMQ到SpringMVC
JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
肖哥哥
2019/02/22
1.6K1
JMS学习之路(一):整合activeMQ到SpringMVC
Jms规范学习
1、什么是消息中间件?   关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。
别先生
2019/10/08
8800
Jms规范学习
spring 整合JMS
(1)创建工程springjms_producer,在POM文件中引入SpringJms 、activeMQ以及单元测试相关依赖
一点博客
2019/07/24
6420
消息队列之ActiveMQ
分布式中 service之间相互调用的时候 存在耦合 比如这边添加完商品后做同步索引库处理,添加商品就是在数据库中插入一条数据,而同步索引库这个功能一般写在solr的service层里,这时候就会有出现服务间的耦合 因此我们需要一个中间商来赚差价。。 是需要一个中间件来传递信息。
周杰伦本人
2023/10/12
3140
JMS--ActiveMQ的简单使用
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有 Producer(生产者)、Consumer(消费者)。消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
宋先生
2019/07/18
1.1K0
ActiveMQ学习总结------Spring整合ActiveMQ 04
注:本文将省略一部分与ActiveMQ无关的spring、mvc等代码,学习者需有SSM框架基础
Arebirth
2020/06/19
6280
一篇文章让你了解JMS以及中间件之ActiveMQ
点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。
@派大星
2023/07/15
1.6K0
一篇文章让你了解JMS以及中间件之ActiveMQ
activitemq整合spring
image.png activitemq整合spring 一.activmq的点对点模型 image.png pom.xml: <?xml version="1.0" encoding="UTF-8"
Dream城堡
2019/01/28
4930
activitemq整合spring
【消息队列 MQ 专栏】消息队列之 ActiveMQ
ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通信。它的设计目标是提供标准的、面向消息的、多语言的应用集成消息通信中间件。ActiveMQ 实现了 JMS 1.1 并提供了很多附加的特性,比如 JMX 管理、主从管理、消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化、消息队列监控等等。其主要特性有:
芋道源码
2018/07/31
6.8K1
【消息队列 MQ 专栏】消息队列之 ActiveMQ
JMS消息中间件之ActiveMQ学习
本文介绍了在Java中如何使用ActiveMQ进行消息发布和订阅。主要包括了创建生产者、创建消费者、发布和订阅消息等步骤。同时,还介绍了如何通过代码进行消息的发布和订阅。
用户1141560
2017/12/26
9450
JMS消息中间件之ActiveMQ学习
ActiveMQ的安装与使用。
4、ActiveMQ的安装。官方网址:http://activemq.apache.org/
别先生
2019/09/29
1.8K0
ActiveMQ的安装与使用。
ActiveMQ基础学习简单记录
JMS即Java Message Service,是JavaEE的消息服务接口。JMS主要有两个版本:1.1和2.0。2.0和1.1相比,主要是简化了收发消息的代码。
大忽悠爱学习
2023/05/23
1.7K0
ActiveMQ基础学习简单记录
ActiveMQ—基于Java的消息传递服务器
内聚: 标志一个模块内各个元素彼此结合的紧密程度;内聚从功能角度衡量模块内的联系,好的内聚模块应当恰好做一件事。 耦合: 是对一个软件结构内各个模块之间相互依赖程度的度量;耦合的强弱取决于模块间接口的复杂程度、进入或访问一个模块的点以及通过接口的数据。 需求的原则是:高内聚,低耦合。然而在实际需求过程中,往往会因为技术驱动,导致需求间耦合很紧,不利于后期有效地迭代开发。有效的解决办法是按流程、和业务梳理需求。
时间静止不是简史
2020/07/25
1.4K1
JMS中间件ActiveMQ详解
Java Message Service(JMS)是SUN提出的旨在统一各种MOM(Message-Oriented Middleware )系统接口的规范,它包含点对点(Point to Point,PTP)和发布/订阅(Publish/Subscribe,pub/sub)两种消息模型,提供可靠消息传输、事务和消息过滤等机制。
烂猪皮
2018/08/03
1.6K0
JMS中间件ActiveMQ详解
相关推荐
ActiveMQ进阶学习
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档