总结了一下ActiveMQ的关键技术点,以及与spring boot的集成应用,形成一个总结报告,一个是为了指导新手从哪些方面入手,另外是技术人员熟悉关键技术点,用来巩固加深印象。
ActiveMQ 是一个开源消息中间件,支持多种消息协议(如 JMS、AMQP、MQTT)和传输模式,适用于异步通信、系统解耦和流量削峰。核心特性包括:
useAsyncSend=true
提升生产者吞吐量。
引入 Spring Boot Starter:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 连接池支持 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.16.5</version>
</dependency>
配置连接参数:
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
pool:
enabled: true
max-connections: 10 # 连接池最大连接数
使用 JmsTemplate
发送消息:
@Component
public class MessageProducer {
@Autowired
private JmsTemplate jmsTemplate;
// 发送到 Queue
public void sendToQueue(String message) {
jmsTemplate.convertAndSend("my-queue", message);
}
// 发送到 Topic
public void sendToTopic(String message) {
jmsTemplate.setPubSubDomain(true); // 切换为 Topic 模式
jmsTemplate.convertAndSend("my-topic", message);
jmsTemplate.setPubSubDomain(false); // 恢复默认 Queue 模式
}
}
使用 @JmsListener
注解监听消息:
@Component
public class MessageConsumer {
// 监听 Queue
@JmsListener(destination = "my-queue")
public void handleQueueMessage(String message) {
System.out.println("Received from Queue: " + message);
}
// 监听 Topic
@JmsListener(destination = "my-topic", containerFactory = "topicListenerFactory")
public void handleTopicMessage(String message) {
System.out.println("Received from Topic: " + message);
}
}
// 配置 Topic 监听容器工厂
@Configuration
public class JmsConfig {
@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true); // 启用 Topic 模式
return factory;
}
}
通过 @Transactional
注解实现消息事务:
@Transactional
public void processOrder(Order order) {
// 业务逻辑处理
orderService.save(order);
// 发送消息(事务提交后发送)
jmsTemplate.convertAndSend("order-queue", order.getId());
}
消息转换器:自定义 JSON 消息序列化:
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
// 配置 JmsTemplate
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
return jmsTemplate;
}
死信队列处理:
@JmsListener(destination = "DLQ")
public void handleDlqMessage(Message message) {
// 记录日志或重试逻辑
}
启用 SSL:
spring:
activemq:
broker-url: ssl://localhost:61617
监控控制台:访问 ActiveMQ Web Console(默认 http://localhost:8161/admin
)。
idle-timeout
)。
@Transactional
注解应用于 public 方法,并启用事务管理器。
useCompression=true
减少网络传输量。
@JmsListener
的 concurrency
参数提升并发处理能力。
ActiveMQ 作为成熟的消息中间件,与 Spring Boot 集成可快速构建可靠、松耦合的分布式系统。开发者需重点关注:
通过合理配置 JmsTemplate
和 @JmsListener
,可简化消息生产与消费逻辑。实际应用中需结合监控工具跟踪消息堆积和延迟,并定期清理无效队列防止资源浪费。
本篇的分享就到这里了,感谢观看,如果对你有帮助,别忘了点赞+收藏+关注。