点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。
和我们平时给朋友发送短信类似
JMS Pub/Sub模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic
主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。
主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。
非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收到发送某个主题的消息。
如果消费者处于离线状态,生产者发送的主题将会丢失作废,消费者永远不会收到
一句话:先要订阅注册才能接收到发布,只给订阅者发布消息
客户端首先向MQ注册一个自己的身份ID识别号,当客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,
当客户端再次连接到MQ时会根据消费者的ID得到所有当自己处于离线时发送到主题的消息
非持久订阅状态下,不能恢复或重新派送一个未签收的消息。
持久订阅才能恢复或重新派送一个未签收的消息
JMS编码总体架构(类似JDBC编码)
官网:https://activemq.apache.org/
特性:
# 解压
tar -zxvf apache-activemq-5.16.1-bin.tar.gz
# 在根目录下创建文件夹
mkdir my-activeMQ
# 将解压的文件copy到自己新创建的目录下
cp -r /opt/apache-activemq-5.16.1 /my-activeMQ/
# 进入activeMQ文件夹
cd /my-activeMQ/apache-activemq-5.16.1/bin
# 启动 普通启动 默认进程端口 61616
./activemq start
# 带有日志启动
./activemq start > /my-activeMQ/myrunmq.log
ActiveMQ控制台
# 连接之前要将防火墙关闭 或者设置白名单 service iptables stop
http://ip:8161/admin/ #账号 admin 密码 admin
<!--需要引入的依赖-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!--junit/log4j等基础使用配置-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
// 消息生产者的代码
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(Stringp[] args){
//1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码 如果用户名密码改了 也可以传进去 接受3参
ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
//2 通过连接工厂 获得连接connection并启动访问 抛异常
Connection connection = activeMQConenctionFactory.createConnection();
connection.start();
//3 创建会话session
// 两个参数 第一个叫事务 第二个叫签收
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4 创建目的地 (具体是队列还是主题) 注意这里的包是jms
Queue queue = session.createQueue(QUEUE_NAME);
//5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//6 通过消息生产者messageProducer生产3条消息发送到MQ的队列里面
for (int i = 1; i<=3; i++){
//7 创建消息
TextMessage textMessage = session.createTextMessage("msg ---"+i);//理解为一个字符串
//8 通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9 关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("*******消息发布到MQ完成");
}
控制台说明:
// 消息消费者的代码 Consumer
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args){
System.out.println("***********我是1号消费者")
//1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码 如果用户名密码改了 也可以传进去 接受3参
ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
//2 通过连接工厂 获得连接connection并启动访问 抛异常
Connection connection = activeMQConenctionFactory.createConnection();
connection.start();
//3 创建会话session
// 两个参数 第一个叫事务 第二个叫签收
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4 创建目的地 (具体是队列还是主题) 注意这里的包是jms
Queue queue = session.createQueue(QUEUE_NAME);
//5 创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
/*
// 1 种
同步阻塞方式receive()
订阅者或接收者调用MessageConsumer的receive()的方法来接受消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞
while(true){
// receive有两种 1 无超时时间 2 超时时间
TextMessage textMessage = (TextMessage)messageConsumer.receive();
// TextMessage textMessage = (TextMessage)messageConsumer.receive(4000L);
if (null != textmessage){
System.out.println("*****消费者接收到消息:"+textMessage.getText());
}else{
break;
}
}
messageConsumer.close();
session.close();
connection.close();
**/
//第2种 通过监听的方式来消费消息 MessageConsumer messageConsumer = session.createConsumer(queue);
// 异步非阻塞方式(监听器onMessage())
//订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器
//当消息到达之后 系统自动调用监听器MessageListener的onMessage(Message message)方法
messageConsumer.setMessageListener(new MessageListener(){
@Override
pubilc void onMessage(Message message){
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
// 这里有异常 try catch
System.out.println("*****消费者接收到消息:"+textMessage.getText());
}
}
});
// 抛异常 保证控制台不灭
System.in.read();
messageConsumer.close();
session.close();
connection.close();
/*
*1 先生产 只 启动1号消费者 问题:1号消费者能消费到消息吗?Yes
*2 先生产 先 启动1号消费者 再启动2号消费者 问题:2号消费者还能消费到消息吗?No
* 1号可以消费
* 2号不可以消费
*3 先启动2个消费者,再生产6条消息 请问消息情况如何?一人一半
*/
}
两种消费方式:
在点对点的消息传递种,目的地被称为队列(queue)
点对点消息传递域的特点如下:
// 消息生产者的代码
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(Stringp[] args){
//1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码 如果用户名密码改了 也可以传进去 接受3参
ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
//2 通过连接工厂 获得连接connection并启动访问 抛异常
Connection connection = activeMQConenctionFactory.createConnection();
connection.start();
//3 创建会话session
// 两个参数 第一个叫事务 第二个叫签收
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4 创建目的地 (具体是队列还是主题) 注意这里的包是jms
Topic topic = session.createTopic(TOPIC_NAME);
//5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
// messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT) 持久化的topic
// connection.start(); 持久化的topic
//6 通过消息生产者messageProducer生产3条消息发送到MQ的队列里面
for (int i = 1; i<=3; i++){
//7 创建消息
TextMessage textMessage = session.createTextMessage("TOPIC_NAMEmsg ---"+i);//理解为一个字符串
//8 通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9 关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("*******TOPIC_NAME消息发布到MQ完成");
}
// 消息消费者的代码 Consumer
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args){
System.out.println("***********我是1号消费者")
//1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码 如果用户名密码改了 也可以传进去 接受3参
ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
//2 通过连接工厂 获得连接connection并启动访问 抛异常
Connection connection = activeMQConenctionFactory.createConnection();
connection.start();
// 持久化 connection.setClientID("zs")
//3 创建会话session
// 两个参数 第一个叫事务 第二个叫签收
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4 创建目的地 (具体是队列还是主题) 注意这里的包是jms
Topic topic = session.createTopic(TOPIC_NAME);
//TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark") //主题订阅者
//5 创建消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
//通过监听的方式来消费消息
/*messageConsumer.setMessageListener(new MessageListener(){
@Override
pubilc void onMessage(Message message){
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
// 这里有异常 try catch
System.out.println("*****消费者接收到消息:"+textMessage.getText());
}
}
});*/
// lomdba 表达式
messageConsumer.setMessageListener((Message message) ->{
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
// 这里有异常 try catch
System.out.println("*****消费者接收到消息:"+textMessage.getText());
}
})
// 抛异常 保证控制台不灭
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
// 消息生产者的代码
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(Stringp[] args){
//1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码 如果用户名密码改了 也可以传进去 接受3参
ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
//2 通过连接工厂 获得连接connection并启动访问 抛异常
Connection connection = activeMQConenctionFactory.createConnection();
//3 创建会话session
// 两个参数 第一个叫事务 第二个叫签收
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4 创建目的地 (具体是队列还是主题) 注意这里的包是jms
Topic topic = session.createTopic(TOPIC_NAME);
//5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT) // 持久化的topic
connection.start(); // 持久化的topic
//6 通过消息生产者messageProducer生产3条消息发送到MQ的队列里面
for (int i = 1; i<=3; i++){
//7 创建消息
TextMessage textMessage = session.createTextMessage("TOPIC_NAMEmsg ---"+i);//理解为一个字符串
//8 通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9 关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("*******TOPIC_NAME消息发布到MQ完成");
}
// 消息消费者的代码 Consumer 持久化的
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args){
System.out.println("***********z3")
//1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码 如果用户名密码改了 也可以传进去 接受3参
ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory();
//2 通过连接工厂 获得连接connection并启动访问 抛异常
Connection connection = activeMQConenctionFactory.createConnection();
// 持久化
connection.setClientID("z3")
//3 创建会话session
// 两个参数 第一个叫事务 第二个叫签收
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4 创建目的地 (具体是队列还是主题) 注意这里的包是jms
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark") //主题订阅者
connection.start();
Message message = topicSubscriber.receive();
while(null != message){
TextMessage textMessage = TextMessage(message);
System.out.println("******收到的持久化Topic"+textMessage.getText());
message = topicSubscriber.receive(1000L);
}
session.close();
connection.close();
}
一定要先运行一次消费,等于向MQ注册,类似我订阅了这个主题。
然后再运行生产者发送是那个消息,此时
无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。
发布/订阅消息传递域的特点如下:
相当于一个ActiveMQ的服务器实例
说白了,Broker其实就是实现了用代码形式的启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动
再用的时候再去启动这样节省了资源,也保证了可靠性。
./activemq start xbean:file:/my-activeMQ/apache-activemq-5.16.1/conf/activemq02.xml # 指定配置文件启动
引入依赖
<!--不引入该依赖可能报错 ClassNotFoundException 。。。。。。。-->
<dependency>
<groupId>com.faster.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
public class EmbedBroker{
public static void main(String[] args){
// 用ActiveMQ Broker作为独立的消息服务器来构建Java应用
// ActiveMQ也支持在vm中通信基于嵌入式的Broker
BrokerService brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.addConnector("tcp://localhost:61616");
brokerService.start();
}
}
<!--需要引入的依赖-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<!--不引入该依赖可能报错 ClassNotFoundException 。。。。。。。-->
<dependency>
<groupId>com.faster.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
<!--activemq 对jms的支持,整合spring和ActiveMQ-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.23</version>
</dependency>
<!--activemq 所需要的pool包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.9</version>
</dependency>
<!--Spring AOP 相关jar-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib</artifactId>
<version>2.1_2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
applicationContext.xml
<context:component-scan base-package="com.vipbbo.activemq"/>
<!--配置生产者-->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destory-method="stop">
<property name="connectionFactory">
<!--真正可以产生Connection的ConnectionFactory,由对应的JMS服务厂商提供-->
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://ip.....:61616"/>
</bean>
</property>
<property name="maxConnections"value="100"></property>
</bean>
<!--这个队列目的地,点对点的-->
<bean id="destinationQueue" class="org.apache.activeqm.command.ActiveMQQueue">
<construtor-arg index="0" value="spring-active-queue"/>
</bean>
<!--spring 提供的jms工具类 它可以进行消息发送 接收等-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="defaultDestination" ref="destinationQueue"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
// 生产者
@Service
public class SpringMQ_Produe{
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args){
ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml")
SpringMQ_Produe produce = (SpringMQ_Produce)ctx.getBean("springMQ_Produce");
/*
produce.jmsTemplate.send(new MessageCreator(){
@Override
pubilc Message createMessage(Session session) throw JMSException{
TextMessage textMessage = session.createTextMessage("***spring和ActiveMQ的整合case...")
return textMessage;
}
})
*/
//表达式
produce.jmsTemplate.send(new MessageCreator(session -> ){
TextMessage textMessage = session.createTextMessage("***spring和ActiveMQ的整合case...")
return textMessage;
})
System.out.println("***********send task over")
}
}
// 消费者
@Service
public class SpringMQ_Consumer{
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args){
ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml")
SpringMQ_Consumer consumer = (SpringMQ_Produce)ctx.getBean("SpringMQ_Consumer");
String retValue = (String)consumer.jmsTemplate.receiveAndConvert();
System.out.println("*****消费者收到的消息:"+retValue);
}
}
添加Topic在配置文件中
<!--这个主题目的地,点对点的-->
<bean id="destinationTopic" class="org.apache.activeqm.command.ActiveMQTopic">
<construtor-arg index="0" value="spring-active-topic"/>
</bean>
<!--spring 提供的jms工具类 它可以进行消息发送 接收等-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="defaultDestination" ref="destinationTopic"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
修改applicationContext.xml
<!--配置监听程序-->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory"/>
<!--注意这里的ref 要监听的 -->
<property name="destination" ref="destinationTopic"/>
<!-- public class MyMessageListener implements MessageListener-->
<property name="messageListener" ref="myMessageListener"/>
</bean>
<!--自己定义的包名类名 或者在类上使用 @Component注解-->
<!--
<bean id="myMessageListener" class="com.vipbbo.activemq.spring.MyMessageListener"/>
-->
@Component
public class MyMessageListener implements MessageListener{
@Override
pubilc void onMessage(Message message){
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try{
System,out.println("textMessage.getText()");
} catch (JMSException e){
e.printStackTracr();
}
}
}
}
boot 使用的是 2.1.5
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.vipbbo.activemq</groupId>
<artifactId>boot_mq_produce</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.bulid.sourceEncoding>UTF-8</project.bulid.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--activemq 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
</dependencies>
server:
port: 7777
spring:
activemq:
broker-url: tcp://ip:61616 # 自己的mq服务器地址
user: admin
password: admin
jms:
pub-sub-domain: false # false = Queue true = Topic
# 自定义队列名称
myqueque: boot-activemq-queue
@Component
@EnableJms //一定要开启
public class ConfigBean{
@Value("${myqueue}")
private String myQueue;
@Bean // 相当于<bean id ="" class="" >
public Queue queue(){
return new ActiveMQQueue(myQueue)
}
}
生产者
// 生产者
@Component
public class Queue_Produce{
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg(){
jmsMessagingTemplate.convertAndSend(queue,UUID.randomUUID().toString().subString(0,6));
}
}
测试类
@SpringBootTest(classes= Main_Produce.class) // 主类
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMQ{
@Resource
private Queue_Produce queue_produce;
@Test
public void testSend() throw Exception{
queue_produce.produceMsg();
}
}
消费者
pom文件和生产者一样 端口号8888
@Component
public class Queue_Consumer
{
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage) throw JMSException
{
System.out.println("********消费者收到消息:"+textMessage.getText());
}
}
和Queue 的一样
server:
port: 6666
spring:
activemq:
broker-url: tcp://ip:61616 # 自己的mq服务器地址
user: admin
password: admin
jms:
pub-sub-domain: true # false = Queue true = Topic
# 自定义队列名称
myTopic: boot-activemq-topic
@Component
@EnableJms //一定要开启
public class ConfigBean
{
@Value("${myTopic}")
private String topicName;
@Bean // 相当于<bean id ="" class="" >
public Topic topic()
{
return new ActiveMQTopic(topicName)
}
}
生产者
// 生产者 topic
@Component
public class Topic_Produce{
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Schdeuled(fixedDelay = 3000) // 在主启动动类开启 加注解 @EnableScheduling
public void produceTopic()
{
jmsMessagingTemplate.convertAndSend(topic,"主题"+UUID.randomUUID().toString().subString(0,6));
}
}
消费者
pom文件和生产者一样
# 第一个配置
server:
port: 5566
spring:
activemq:
broker-url: tcp://ip:61616 # 自己的mq服务器地址
user: admin
password: admin
jms:
pub-sub-domain: true # false = Queue true = Topic
# 自定义队列名称
myTopic: boot-activemq-topic
# 第二个配置
server:
port: 5566
spring:
activemq:
broker-url: tcp://ip:61616 # 自己的mq服务器地址
user: admin
password: admin
jms:
pub-sub-domain: true # false = Queue true = Topic
# 自定义队列名称
myTopic: boot-activemq-topic
字节流
。默认情况下ActiveMQ把wire protocol叫做OpenWire,它的目的是促使网络上的效率和数据快速交互。如果你不特别指定ActiveMQ的网络监听端口,那么这些端口都将使用BIO网络IO模型,(OpenWire,STOMP,AMQP...)所以为了首先提高单节点的网络吞吐性能,我们需要明确指定Active的网络IO模型
如下所示:URL格式以nio开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型
<broker>
...
<transportConnectors>
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>
</<transportConnectors>
...
</broker>
Java代码
public static final String ACTIVEMQ_URL = "nio://ip:61618";
如何解决让这个端口支持NIO网络IO模型,又让它支持多个协议呢?
解决:
参考:https://activemq.apache.org/auto
使用auto关键字。
使用"+"符号来为端口设置多种特性
如果我们既需要某一个端口支持NIO网络IO模型,又需要它支持多个协议
<transportConnector name="nio" uri="auto+nio://0.0.0.0:61608?maximumConnection=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50"/>
即Advance Message Queuing Protocol,一个提供同意消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
参考地址:https://activemq.apache.org/amqp
STOMP,Stream Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。
参考地址:https://activemq.apache.org/stomp
Secure Sockets Layer Protocol(SSL)
MQTT(Message Queuing Telemetry ,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有互联网和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
参考:https://activemq.apache.org/mqtt
参考资料:https://github.com/fusesource/mqtt-client
官网:https://activemq.apache.org/persistence 可以在conf文件下的注释中找到
为了避免意外宕机后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制
。ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。
消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需把消息发送出去。
基于文件的存储方式,是以前的默认消息存储,现在不用了
AMQ Message是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件夹中,文件的默认大小为32M,当一个存储文件中的消息已经全被消费,那么这个文件将被表示为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本
https://activemq.apache.org/kahadb
验证:可以在activeMQ的conf目录下的activemq.xml看见
<kahaDB directory="${activemq.data}/kafadb}"/>
KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。
消息存储使用一个事务日志
和仅仅用一个索引文件
来存储它所有的地址
kahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。
数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
kahadb在消息保存目录中只有4类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比较这就非常简洁了。
保证索引的连续性没有碎片
)这种文件系统是从ActiveMQ5.8
之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库储存形式,但是它提供比KahaDB更快的持久性
。
但它不使用自定义B-Tree实现来索引预写日志,而是基于LevelDB
的索引
默认配置如下
<persistenceAdapter>
<leveleDBdirectory="activemq-data"/>
</persistenceAdapter>
lib
文件夹下(如果你使用的是其他连接池 需要将连接池的相关jar包拷贝进来)
cp mysql-connector-java-5.1.3.jar /my-activeMQ/apache-activemq-5.16.1/bin/my-activeMQ/apache-activemq-5.16.1/conf
路径下修改activemq.xml配置文件,按照如下修改:<!--修改前-->
<persistenceAdapter>
<kahaDB directory="${activemq-data}/kahadb"/>
</persistenceAdapter>
<!--修改后 mysql-ds createTablesOnStartuo默认是true 新建一套表 第一次是true 第二次要设置是false-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartuo="true"/>
</persistenceAdapter>
数据库连接池配置
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" detroy-methid="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://自己数据库IP:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="自己的数据库账号"/>
<property name="password" value="自己数据库的密码"/>
<property name="maxTotal" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
配置在activemq.xml
的broker
标签之外import
标签的里面
建仓SQL和建表说明
activemq
的数据库ACTIVEMQ_MSGS
ID # 自增的数据库主键
CONTAINER # 消息的Destination
MSGID_PROD # 消息发送者的主键
MSG_SEQ # 是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION # 消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG # 消息本地的Java序列化对象的二进制数据
PRIORITY # 优先级 从0-9 数值越大优先级越高
ACTIVEMQ_ACKS
activemq_acks # 用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。数据字段如下:
CONTAINER # 消息的Destination
SUB_DEST # 如果使用的是static集群,这个字段会有集群其他系统的信息
CLIENT_ID # 每一个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME # 订阅者名称
SELECTOR # 选择器,可以选择之消费满足条件的消息。条件可以用自定义属性来实现,可支持多属性AND和OR操作
LAST_ACKED_ID # 记录消费过的消息ID
ACTIVEMQ_LOCK
表activemq_lock 在集群环境下才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker
注意 : 如果新建数据库OK + 上述配置OK + 代码运行 OK ,3张表会自动生成 万一情况,手动建表SQL(如果配置好不需要手动,应急)
生产者
)_ Queue_messageProducer.setDeliverMode(DeliveryMode.PERSISTENT)
// 消息生产者的代码
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String QUEUE_NAME = "jdbc01";
public static void main(Stringp[] args){
//1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码 如果用户名密码改了 也可以传进去 接受3参
ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory();
//2 通过连接工厂 获得连接connection并启动访问 抛异常
Connection connection = activeMQConenctionFactory.createConnection();
connection.start();
//3 创建会话session
// 两个参数 第一个叫事务 第二个叫签收
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4 创建目的地 (具体是队列还是主题) 注意这里的包是jms
Queue queue = session.createQueue(QUEUE_NAME);
//5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.setDeliverMode(DeliveryMode.PERSISTENT)
//6 通过消息生产者messageProducer生产3条消息发送到MQ的队列里面
for (int i = 1; i<=3; i++){
//7 创建消息
TextMessage textMessage = session.createTextMessage("msg ---"+i);//理解为一个字符串
//8 通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9 关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("*******消息发布到MQ完成");
}
// 消费者不用动
要开启持久化 Topic
messageProducer.setDeliverMode(DeliveryMode.PERSISTENT)
NON_PERSISTENCE
时,消息被保存在内存中,当DeliveryMode设置为PEREISTENCE
时,消息保存在broker的相应的文件或者数据库中.而且点对点模型中消息一旦被Consemer消费就从Broker中删除 看activemq_msgs
activemq_acks
activemq_msgs
表中,只要有任意一个消费者已经消费过了,消费之后这些消息就会立即被删除
Topic:一般是先启动消费者订阅然后再生成的情况 下会将消息保存到activemq_msgs
并且不会删除
activemq_acks
表记录了订阅者的信息数据库jar包
默认dbcp2
记得需要将使用到的相关jar文件放置到ActiveMQ安装目录下的lib目录。mysql-jdbc驱动的jar包和对应的数据库连接池jar包
createTablesOnStartup
在jdbcPersistenceAdapter标签中设置了createTablesOnStartup属性时在第一次启动ActiveMQ时,ActiveMQ服务节点会自动创建所需要的数据表,启动完成后可以去掉这个属性,或者更改createTablesOnStartup属性为false
下划线
java.lang.IIIegalStateException:BeanFactory not initialized already closed
这是因为你的操作系统的机器名中有"__"符号。请更改机器名并且重启后即可解决。这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库和读库
ActiveMQ Journal,使用高速缓存写入技术
,大大提高了性能
当消费者的消费速度能够及时跟上生产者消息的产生速度时,journal文件就能够大大减少需要写入到DB的消息。
降低了MySQL的操作,降低MySQL负担
在/my-activeMQ/apache-activemq-5.16.1/conf
路径下修改activemq.xml配置文件,按照如下修改:
<!--修改前 mysql-ds createTablesOnStartuo默认是true 新建一套表 第一次是true 第二次要设置是false-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartuo="true"/>
</persistenceAdapter>
<!--修改后 -->
<persistenceAdapter>
<jdbcPersistenceAdapterFactory journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data"/>
</persistenceAdapter>
持久化消息主要是指:
MQ所在的服务器down了消息不会丢失的机制
持久化机制演化过程:
从最初的AMQ Message Store 方案到ActiveMQ V4版本中推出的High performance journal (高性能事务支持)附件并且同步推出了关系型数据库的存储方案。ActiveMQ5.3版本中又推出了KahaDB的支持(V5.4版本后成为ActiveMQ默认的持久化方案),后来ActiveMQ V5.8版本开始支持LevelDB,到现在,V5.9+版本提供了标准的zookeeper+LevelDB集群化方案。本次重点了解KahaDB、LevelDB、和mysql数据库这三种持久化存储方案。
ActiveMQ的消息持久化机制有:
AMQ 基于日志文件 KahaDB 基于日志文件,从ActiveMQ5.4开始默认的持久化插件 JDBC 基于第三方数据库 LevelDB 基于文件的本地数据库储存,从ActiveMQ5.8版本之后又推出了LevelDB的持久化引擎性能高于KahaDB Replicated LevelDB Store 从ActiveMQ5.9 提供了基于LevelDB和zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。
无论使用哪种持久化方式,消息的存储逻辑都是一致的:
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。
官网:https://activemq.apache.org/masterslave
引入消息队列之后如何保证其高可用性 单点故障 (集群)
基于Zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能,避免单点故障。
基于sharedFileSystem共享文件系统(KahaDB) 基于JDBC 基于可复制的LevelDB
官网:https://activemq.apache.org/replicated-leveldb-store
mkdir /mq_cluster/
cd /mq_cluster/
cp -r /opt/apache/-activemq-5.15.9 mq_node01
cp -r mq_node01 mq_node02
cp -r mq_node01 mq_node03
# mq_node01默认不动
# mq_node02 mq_node03和2一样操作
cd /mq_cluster/mq_node02/conf
vim jetty.xml # 里面有一个port
vim /etc/hosts
# 01 02 03 一样的操作 3个节点的brokerName 要求一致
cd /mq_cluster/mq_node01/conf
vim activemq.xml # broker标签的brokerName="zzyymq"
3个节点的持久化配置
参考官网:
1号机:(vi /mq_cluster/mq_node01/conf/activemq.xml)
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq-data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:63631"
zkAddress="localhost:2191,localhost:2192,localhost:2193"
zkPath="/activemq/leveldb-stores"
hostname="zzyymq-server"
sync="local_disk"
/>
</persistenceAdapter>
2号机(vi /mq_cluster/mq_node01/conf/activemq.xml)
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq-data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:63632"
zkAddress="localhost:2191,localhost:2192,localhost:2193"
zkPath="/activemq/leveldb-stores"
hostname="zzyymq-server"
sync="local_disk"
/>
</persistenceAdapter>
3号机(vi /mq_cluster/mq_node01/conf/activemq.xml)
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq-data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:63633"
zkAddress="localhost:2191,localhost:2192,localhost:2193"
zkPath="/activemq/leveldb-stores"
hostname="zzyymq-server"
sync="local_disk"
/>
</persistenceAdapter>
1 号不用动
vi /mq_cluster/mq_node02/conf/activemq.xml # 2号修改tcp端口号为61617 3号修改tcp端口号为61618
# 查看zookeeper的运行数量
ps -ef |grep zookeeper|grep -V grep|wc -l
#!bin/sh
cd /mq_cluster/mq_node01/bin
./activemq start
cd /mq_cluster/mq_node02/bin
./activemq start
cd /mq_cluster/mq_node03/bin
./activemq start
cd /myzookeeper/zk01/bin
./zkCli.sh -server 127.0.0.1:2191 # 查看activemq是否注册成功
ls /activemq/leveldb-stores # 里面应该有3个
get /activemq/leveldb-stores/00000000000 # 查看elected 是否有值 有值的为master null为slaver
// 生产者
public static final String ACTIVEMQ_URL = "failover:(tcp://ip:61616,tcp://ip:61617,tcp://ip:61618)randomize=false";
public static final String QUEUE_NAME = "jdbc01";
// 消费者
public static final String ACTIVEMQ_URL = "failover:(tcp://ip:61616,tcp://ip:61617,tcp://ip:61618)randomize=false";
public static final String QUEUE_NAME = "jdbc01";
ZK+Replicated LevelDB Store
对于一个Slow Consumer,使用同步发送消息可能出现Producer堵塞等情况,慢消费者适合使用异步发送
ActiveMQ支持同步、异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。
ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送。
如果你没有使用事务并且发送的是持久化消息,每一次发送都是同步发送的且会阻塞producer直到broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来很大的延时。
很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化消息。
异步发送
它可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过也带来了额外的问题。
就是需要消耗较多的Client端内存同时也会导致broker端性能消耗增加:
此外它不能有效地确保消息的发送成功。在useAsyncSend=true的情况下客户端需要容忍消息丢失的可能。
参考#:https://activemq.apache.org/async-sends
// Configuring Async Send using a Connection URI
// You can use the Connection Configuration URI to configure async sends as follows
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
// Configuring Async Send at the ConnectionFactory Level
// You can enable this feature on the ActiveMQConnectionFactory object using the property.
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
// Configuring Async Send at the Connection Level
// Configuring the dispatchAsync setting at this level overrides the settings at the connection factory level.
//You can enable this feature on the ActiveMQConnection object using the property.
((ActiveMQConnection)connection).setUseAsyncSend(true);
异步发送丢失消息的场景是:生产者设置UseAsyncSend=true,使用producer.send(msg)持续发送消息。 由于消息不阻塞,生产者会认为所有的send消息均被成功发送至MQ。 如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。 所以,正确的异步发送方法是需要接收回调的 同步发送和异步发送的区别就在此 同步发送等send不阻塞了就表示一定发送成功了 ,异步发送需要接收回执并由客户端再判断一次是否发送成功。
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue);
TextMessage message =null;
for (int i = 1; i<=3; i++){
//7 创建消息
message = session.createTextMessage("TOPIC_NAMEmsg ---"+i);//理解为一个字符串
message.setJMSMessageID(UUID.randomUUID().toString()+"------order--");
String msgID = message.getJMSMessageID();
activeMQMessageProducer.send(message,new AsyncCallback()
{
@Override
public void onSuccess(){
System.out.prinitln(msgID+"has been ok send")
}
@Override
public void onException(JMSException exception){
System.out.prinitln(msgID+"fail to send to mq");
}
})
//8 通过messageProducer发送给mq
messageProducer.send(textMessage);
}
参考官网:https://activemq.apache.org/delay-and-schedule-message-delivery
四大属性:
AMQ_SCHEDULED_DELAY long 延迟投递的时间 AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔 AMQ_SCHEDULED_REPEAT int 重复投递的次数 AMQ_SCHEDULED_CRON String Cron表达式
案例演示:
要在activemq.xml中配置schedulerSupport属性为True
<!--放在broker标签中-->
scheduleSupport="true"
Java代码里面封装的辅助消息类型:ScheduleMessage
代码
// JMSProducer_DelayAndSchedule
// 消息生产者的代码
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String QUEUE_NAME = "queue-delay";
public static void main(Stringp[] args){
//1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码 如果用户名密码改了 也可以传进去 接受3参
ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
//2 通过连接工厂 获得连接connection并启动访问 抛异常
Connection connection = activeMQConenctionFactory.createConnection();
connection.start();
//3 创建会话session
// 两个参数 第一个叫事务 第二个叫签收
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4 创建目的地 (具体是队列还是主题) 注意这里的包是jms
Queue queue = session.createQueue(QUEUE_NAME);
//5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
Long delay = 3 * 1000;
long period = 4 * 1000;
int repeat = 5;
//6 通过消息生产者messageProducer生产3条消息发送到MQ的队列里面
for (int i = 1; i<=3; i++){
//7 创建消息
TextMessage textMessage = session.createTextMessage("delay msg ---"+i);//理解为一个字符串
message.setLongProperty(ScheduleMessage.AMQ_SCHEDULED_DELAY,delay);
message.setLongtProperty(ScheduleMessage.AMQ_SCHEDULED_PERIOD,period);
message.setIntProperty(ScheduleMessage.AMQ_SCHEDULED_REPEAT,repeat);
//8 通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9 关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("*******消息发布到MQ完成");
}
// JMSConsumer_DelayAndSchedule
具体哪些情况会引起消息的重发:
请说说消息重发时间间隔和重发次数吗?
间隔:1
次数:6
有毒消息Poison ACK谈谈你的理解
参考官网:https://activemq.apache.org/redelivery-policy
一个消息被redelivedred超过默认的最大重试次数(默认6次)时,消费端会给MQ发送一个"poison ack" 表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)
collisionAvoidanceFactor # 设置防止冲突范围的正负百分比,只有启用useCllisionAvoidance参数时才生效,也就是在延迟时间上再加一个时间波动范围。默认值为0.15
maximumRedeliveries # 最大重试次数,达到最大重试次数后抛出异常。为-1时不限制次数,为0时表示不进行重传,默认值为6
maximumRedeliveryDelay # 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔是最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为-1.
initialRedeliveryDelay # 初始重发延迟时间,默认为1000L
redeliveryDelay # 重发延迟时间,当initialRedeliveryDelay=0时生效,默认为1000L
useCollisionAvoidance # 启用防止冲突功能,默认为false
useExponentialBackOff # 启用指数倍数递增的方式增加延迟时间,默认false
backOffMultiplier # 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效,默认为5
参考官网:https://activemq.apache.org/message-redelivery-and-dlq-handling
ActiveMQ中引入了死信队列(Dead Letter Queue)的概念。即一条消息在被重发了多次后(默认重发6次redeliveryCounter==6),将会被ActiveMQ移入"死信队列"。开发人员可以在这个Queue中查看处理出错的消息,进行人工干预
重新执行发货和配送的逻辑
SharedDeadLetterStrategy(共享的死信队列)
将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略
共享队列默认为"ActiveMQ DLQ",可以通过"deadLetterQueue"属性来设定。
<deadLetterStrategy>
<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>
IndividualDeadLetterStrategy(个性化的死信队列)
把DeadLetter放入各自的死信通道中,
对于Queue而言,死信通道的前缀默认为"ActiveMQ.DLQ.Queue.";
对于Topic而言,死信通道的前缀默认为"ActiveMQ.DLQ.Topic.";
比如队列Order,那么它对应的 死信通道为ActiveMQ.DLQ.Queue.Order。我们使用"queuePrefix" "topicPrefix"来指定上述前缀。
默认情况下,无论是Topic还是Queue ,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue;不过开发者也可以指定Topic。
<policyEntry queue="order">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="false"/>
</deadLetterStrategy>
</policyEntry>
将队列order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.Order为Topic
属性"useQueueForTopicMessages" 此值表示是否将Topic的DeadLetter保存在Queue中默认为true
有时需要直接删除过期的消息而不需要发送到死信队列中,"processExpired"表示是否将过期消息放入到死信队列,默认为true
<policyEntry queue=">">
<deadLetterStrategy>
<sharedDeadLetterStratgy processExpired = "false"/>
</deadLetterStrategy>
</policyEntry>
默认情况下,ActiveMQ不会把非持久的死消息发送到死信队列中。
processNonPersistent 表示是否将"非持久化"消息放入到死信队列中,默认为false
非持久性如果你想要把非持久的消息发送到死信队列中,需要设置属性processNonPersistent= "true"
<!-- queue=">" 中的 > 类似于sql中的* 就是全部-->
<policyEntry queue=">">
<deadLetterStrategy>
<sharedDeadLetterStrategy processNonPersistent="true"/>
</deadLetterStrategy>
</policyEntry>
由于网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。
如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
另一种是token的方式
如果上面两种情况还不行,准备一个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将< id,message>以K-V形式写入redis。那消费者开始消费,先去redis中查询有没有消费记录即可。
以上笔记获取方式:
链接:https://pan.baidu.com/s/1S0sObSiEAVC42FrfYT589g
提取码:ndhk
过期请私信