往期推荐
【Spring源码】- 11 Spring AOP之编程式事务
SimpleMessageListenerContainer
是spring
在rabbitmq
原生api
基础上封装实现的一个消费工具类,该类非常强大,可以实现:监听单个或多个队列、自动启动、自动声明,它还支持动态配置,如动态添加监听队列、动态调整并发数等等,基本上对RabbitMQ
消费场景这个类都能满足。如@RabbitListener
、cloud-stream中StreamListener
中底层实现都是基于该类,所以,理解SimpleMessageListenerContainer
原理对理解spring rabbitmq
中消费模型非常关键。
1、SimpleMessageListenerContainer#addQueueNames()
方法可以运行时添加监听队列,removeQueueNames()
方法可以运行时移除监听队列;
2、后置处理器setAfterReceivePostProcessors()
//后置处理器,接收到的消息都添加了Header请求头
container.setAfterReceivePostProcessors(message -> {
message.getMessageProperties().getHeaders().put("desc",10);
return message;
});
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
});
3、设置消费者的Consumer_tag和Arguments:container.setConsumerTagStrategy可以设置消费者的 Consumer_tag, container.setConsumerArguments可以设置消费者的 Arguments
container.setConsumerTagStrategy(queue -> "order_queue_"+(++count));
//设置消费者的Arguments
Map<String, Object> args = new HashMap<>();
args.put("module","订单模块");
args.put("fun","发送消息");
container.setConsumerArguments(args);
4、setConcurrentConsumers设置并发消费者:setConcurrentConsumers设置多个并发消费者一起消费,并支持运行时动态修改。setMaxConcurrentConsumers设置最多的并发消费者。
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
container.setConcurrentConsumers(5);
container.setMaxConcurrentConsumers(10);
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
});
return container;
}
SimpleMessageListenerContainer
类结构如下:
SimpleMessageListenerContainer
类启动的入口是start()
方法,该方法位于AbstractMessageListenerContainer
类中:
public void start() {
//如果已启动,则什么也不执行,直接退出
if (isRunning()) {
return;
}
//initialized是否执行初始化,没有则执行afterPropertiesSet()方法进行初始化,执行完成后initialized设置成true
if (!this.initialized) {
synchronized (this.lifecycleMonitor) {
if (!this.initialized) {
afterPropertiesSet();
}
}
}
try {
logger.debug("Starting Rabbit listener container.");
//验证RabbitAdmin,mismatchedQueuesFatal=true时,spring context中RabbitAdmin数量不能大于1
configureAdminIfNeeded();
//执行RabbitAdmin#initialize方法,spring context中注入的exchanges, queues and bindings执行声明式创建
/*
总结一下,我们发现,要想自动创建队列,SimpleMessageListenerContainer需要满足这么两点:
mismatchedQueuesFatal属性设置为true
autoDeclare属性也设置为true
*/
checkMismatchedQueues();
//启动核心
doStart();
}
catch (Exception ex) {
throw convertRabbitAccessException(ex);
}
finally {
this.lazyLoad = false;
}
}
SimpleMessageListenerContainer#doStart
方法:
protected void doStart() {
Assert.state(!this.consumerBatchEnabled || getMessageListener() instanceof BatchMessageListener
|| getMessageListener() instanceof ChannelAwareBatchMessageListener,
"When setting 'consumerBatchEnabled' to true, the listener must support batching");
//如果MessageListener是ListenerContainerAware,则进行expectedQueueNames校验
checkListenerContainerAware();
//调用父类doStart()方法,主要是active和running都设置成true
super.doStart();
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
/*
创建BlockingQueueConsumer类型consumer,每个concurrentConsumers并发对应创建一个对象,并存储到Set<BlockingQueueConsumer> consumers集合中,
返回值就是创建consumer对象个数,具体创建逻辑见:SimpleMessageListenerContainer#createBlockingQueueConsumer,主要注意下prefetchCount计算:
int actualPrefetchCount = getPrefetchCount() > this.batchSize ? getPrefetchCount() : this.batchSize;即如果prefetchCount大于batchSize,则其就是实际值,否则prefetchCount等于batchSize值
*/
int newConsumers = initializeConsumers();
if (this.consumers == null) {
logger.info("Consumers were initialized and then cleared " +
"(presumably the container was stopped concurrently)");
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
/*
每个并发对应一个BlockingQueueConsumer对象,这里将每个BlockingQueueConsumer对象封装成AsyncMessageProcessingConsumer,这样可以丢到线程池中异步执行
*/
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers) {
//将BlockingQueueConsumer对象封装成AsyncMessageProcessingConsumer进行异步执行
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
//存储到processors集合中
processors.add(processor);
//将AsyncMessageProcessingConsumer丢到线程池中执行
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
//事件发送
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
//判断启动过程中是否存在异常
waitForConsumersToStart(processors);
}
}
上面代码大致逻辑:BlockingQueueConsumer
对象可以看成consumer
,然后将其包装成AsyncMessageProcessingConsumer
异步任务丢入到线程池中运行。
上面分析了BlockingQueueConsumer
类型的consumer
会被封装成AsyncMessageProcessingConsumer
异步任务丢入到线程池中运行。下面主要就来分析下异步任务执行时做了些什么,该逻辑在SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run
方法,它主要做如下几个事情:
1、监听队列判断
//BlockingQueueConsumer.getQueueCount() < 1,表示当前consumer没有设置任何监听队列,则没必要启动
if (this.consumer.getQueueCount() < 1) {
if (logger.isDebugEnabled()) {
logger.debug("Consumer stopping; no queues for " + this.consumer);
}
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
}
this.start.countDown();
return;
}
2、核心逻辑
try {
initialize();
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
}
catch (InterruptedException e) {
省略。。。
}
catch (QueuesNotAvailableException ex) {
省略。。。
}
省略。。。
AsyncMessageProcessingConsumer#run
执行最核心逻辑就是上面try语句中,首先执行initialize()
初始化方法,然后开始无限循环执行mainLoop()
方法。
上面分析的AsyncMessageProcessingConsumer
运行时执行了两个关键操作:initialize()
初始化操作和mainLoop()
无限循环。
首先,来看下initialize()
初始化操作主要干了些什么:
attemptDeclarations()
方法进行声明式exchange
、queue
、bindings
创建,主要通过执行RabbitAdmin#initialize
方法实现;BlockingQueueConsumer#start
方法,该方法主要完成与Rabbit Broker
指令交互:
1、passiveDeclarations()
方法校验监听队列是否存在:channel.queueDeclarePassive(queueName)
,最终会向Rabbit Broker
发送queue.declare
指令,并设置passive=true
,如下图: 2、setQosAndreateConsumers()
方法用于进行客户端流量控制Qos
以及消息订阅
a、Qos流控:channel.basicQos(this.prefetchCount)
,最终会向Rabbit Broker
发送basic.qos
指令,并将prefetch-size
、prefetch-count
和global
参数设置过去,如下图:
b、consumeFromQueue()
方法会使用channel.basicConsume
方法订阅消息,最终会向Rabbit Broker
发送basic.consume
指令,并指定订阅消息的queue
名称等参数消息(如下图),注意:SimpleMessageListenerContainer
可能设置多个监听队列,则BlockingQueueConsumer
这里会给每个监听队列都向Broker发送一个basic.consume
订阅指令,并且是使用同一个channel
:
向Broker
发送指令com.rabbitmq.client.impl.AMQCommand#transmit()
方法中,可以在如下红色框框处代码断点监控:
上面分析initialize()
初始化操作,客户端向Broker
发送basic.qos
和basic.consume
指令就相当于告诉了服务器:我都准备好了,如果监听队列有消息你就把它推送给我,下面就来分析下Broker
消息推送流程。
Rabbit Broker
接收到Basic.consume
指令后,会向客户端反馈Basic.consume-ok
指令,表示服务端一切就绪准备给客户端推送消息,然后就通过Basic.Deliver
指令类型将消息推送给客户端,一条消息对应一个Deliver
反馈,客户端接收到服务端返回过来的指令类型后,在ChannelN#processAsync
方法进行判断处理,它是amqp-client
依赖包中类:
如果是Deliver类型指令,则调用processDelivery()
方法进行处理:
protected void processDelivery(Command command, Basic.Deliver method) {
Basic.Deliver m = method;
//根据Deliver的consumerTag获取到InternalConsumer对象,因为一个Channel上可能存在多个consumer,需要找到Broker是针对哪个consumer进行的响应
Consumer callback = _consumers.get(m.getConsumerTag());
if (callback == null) {
if (defaultConsumer == null) {
// No handler set. We should blow up as this message
// needs acking, just dropping it is not enough. See bug
// 22587 for discussion.
throw new IllegalStateException("Unsolicited delivery -" +
" see Channel.setDefaultConsumer to handle this" +
" case.");
}
else {
callback = defaultConsumer;
}
}
Envelope envelope = new Envelope(m.getDeliveryTag(),
m.getRedelivered(),
m.getExchange(),
m.getRoutingKey());
try {
// call metricsCollector before the dispatching (which is async anyway)
// this way, the message is inside the stats before it is handled
// in case a manual ack in the callback, the stats will be able to record the ack
metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
this.dispatcher.handleDelivery(callback,
m.getConsumerTag(),
envelope,
(BasicProperties) command.getContentHeader(),
command.getContentBody());
} catch (WorkPoolFullException e) {
// couldn't enqueue in work pool, propagating
throw e;
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConsumerException(this,
ex,
callback,
m.getConsumerTag(),
"handleDelivery");
}
}
processDelivery()
处理Broker
返回的Deliver
消息大致流程:
Consumer callback = _consumers.get(m.getConsumerTag())
:根据Deliver
的consumerTag
获取到InternalConsumer
对象,因为一个Channel
上可能存在多个consumer
,需要找到Broker
是针对哪个consumer
进行的响应
封装Deliver
成Envelope
:
Envelope envelope = new Envelope(m.getDeliveryTag(),
m.getRedelivered(),
m.getExchange(),
m.getRoutingKey());
metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag())
:统计数据处理
调用ConsumerDispatcher#handleDelivery
,其会创建任务丢到线程池中执行,任务:将数据交由具体的consumer
处理,即调用InternalConsumer#handleDelivery
this.dispatcher.handleDelivery(callback,
m.getConsumerTag(),
envelope,
(BasicProperties) command.getContentHeader(),
command.getContentBody());
InternalConsumer#handleDelivery()
方法:将Broker
返回的Deliver
数据放入到BlockingQueueConsumer.queue
中:
BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
所以,如果ListenerContainer
监听多个队列,则BlockingQueueConsumer
中则对应多个InternalConsumer
,每个InternalConsumer
映射Broker
上的一个,BlockingQueueConsumer
下所有InternalConsumer
共享同一个queue
。
上面分析了消息订阅以及Broker推送过来的消息数据会被缓存到BlockingQueueConsumer
对象的queue
队列中,下面就来分析下从queue
中提取消息到传递给用户业务逻辑这个流程。这就需要分析AsyncMessageProcessingConsumer#run
方法中另一个非常重要操作:无限循环mainLoop
操作,它主要就是完成从queue
中提取消息数据然后经过一系列操作最终传递给用户逻辑MessageListener
中。
private void mainLoop() throws Exception { // NOSONAR Exception
try {
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
checkAdjust(receivedOk);
}
long idleEventInterval = getIdleEventInterval();
if (idleEventInterval > 0) {
if (receivedOk) {
updateLastReceive();
}
else {
long now = System.currentTimeMillis();
long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
long lastReceive = getLastReceive();
if (now > lastReceive + idleEventInterval
&& now > lastAlertAt + idleEventInterval
&& SimpleMessageListenerContainer.this.lastNoMessageAlert
.compareAndSet(lastAlertAt, now)) {
publishIdleContainerEvent(now - lastReceive);
}
}
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
/*
* These will normally be wrapped by an LEFE if thrown by the
* listener, but we will also honor it if thrown by an
* error handler.
*/
}
}
跟踪下doReceiveAndExecute()
:
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
Channel channel = consumer.getChannel();
List<Message> messages = null;
long deliveryTag = 0;
//batchSize默认是1,用于指定一次从queue中提取消息数量
for (int i = 0; i < this.batchSize; i++) {
logger.trace("Waiting for message from consumer.");
Message message = consumer.nextMessage(this.receiveTimeout);
if (message == null) {
break;
}
//是否批处理
if (this.consumerBatchEnabled) {
Collection<MessagePostProcessor> afterReceivePostProcessors = getAfterReceivePostProcessors();
if (afterReceivePostProcessors != null) {
Message original = message;
deliveryTag = message.getMessageProperties().getDeliveryTag();
for (MessagePostProcessor processor : getAfterReceivePostProcessors()) {
message = processor.postProcessMessage(message);
if (message == null) {
channel.basicAck(deliveryTag, false);
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Message Post Processor returned 'null', discarding message " + original);
}
break;
}
}
}
if (message != null) {
if (messages == null) {
messages = new ArrayList<>(this.batchSize);
}
if (isDeBatchingEnabled() && getBatchingStrategy().canDebatch(message.getMessageProperties())) {
final List<Message> messageList = messages;
getBatchingStrategy().deBatch(message, fragment -> messageList.add(fragment));
}
else {
messages.add(message);
}
}
}
else {
messages = debatch(message);
if (messages != null) {
break;
}
try {
//执行MessageListener
executeListener(channel, message);
}
catch (ImmediateAcknowledgeAmqpException e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("User requested ack for failed delivery '"
+ e.getMessage() + "': "
+ message.getMessageProperties().getDeliveryTag());
}
break;
}
catch (Exception ex) {
if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("User requested ack for failed delivery: "
+ message.getMessageProperties().getDeliveryTag());
}
break;
}
if (getTransactionManager() != null) {
if (getTransactionAttribute().rollbackOn(ex)) {
RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
.getResource(getConnectionFactory());
if (resourceHolder != null) {
consumer.clearDeliveryTags();
}
else {
/*
* If we don't actually have a transaction, we have to roll back
* manually. See prepareHolderForRollback().
*/
consumer.rollbackOnExceptionIfNecessary(ex);
}
throw ex; // encompassing transaction will handle the rollback.
}
else {
if (this.logger.isDebugEnabled()) {
this.logger.debug("No rollback for " + ex);
}
break;
}
}
else {
consumer.rollbackOnExceptionIfNecessary(ex);
throw ex;
}
}
}
}
if (messages != null) {
executeWithList(channel, messages, deliveryTag, consumer);
}
return consumer.commitIfNecessary(isChannelLocallyTransacted());
}
上面对SimpleMessageListenerContainer
核心源码进行分析,比较枯燥不太直观,总结下其最核心就是位于AsyncMessageProcessingConsumer#run
方法中两个操作:initialize()
和无限循环mainLoop()
。
initialize
initialize()
方法主要完成:通过指令方式将需要监听队列信息告诉Rabbit Broker
,Broker
在监听队列中有消息数据时通过Deliver
指令将消息推送给客户端,客户端接收的Deliver
指令后,根据consumerTag
分发(dispatcher
)给具体consumer
,然后consumer
将其放入到其所属BlockingQueueConsumer
对象的队列queue
中,其逻辑可见下图:
BlockingQueueConsumer
、AsyncMessageProcessingConsumer
、监听队列等关系:
1、BlockingQueueConsumer
相当于一个逻辑消费者,通过封装成AsyncMessageProcessingConsumer
异步任务,然后丢到线程池中运行,线程池可以通过SimpleMessageListenerContainer#setTaskExecutor
进行自定义配置,所以,BlockingQueueConsumer
可以看成单独线程运行,且对应一个Channel
;
2、SimpleMessageListenerContainer
可以监听多个队列消息,每个队列又会创建一个InternalConsumer
对象,用于映射Broker
上的consumer
概念,它们是共用同一个channel
,即channel
下存在多个consumer
,它们之间通过consumerTag
区分,另外,Broker
推送消息也是根据consumerTag
识别具体推送给哪个consumer
进行处理;
案例,比如:
container.setQueueNames("test01", "test02");
container.setConcurrentConsumers(3);
container.setConsumerTagStrategy(queue -> "consumer idx:"+consumerIdx.getAndIncrement());
a、根据并发数concurrentConsumers
创建对应数量的BlockingQueueConsumer
,然后封装成AsyncMessageProcessingConsumer
,再分配一个线程进行执行,这里设置成3,所以会有3个线程运行AsyncMessageProcessingConsumer
,每个AsyncMessageProcessingConsumer
对应一个channel,所以会创建3个channel,在Web UI上可以看到对应channel:
b、每个监听队列创建一个InternalConsumer
和Broker的consumer进行映射,这里有两个监听队列,所以每个channel下会存在2个consumer:
AsyncMessageProcessingConsumer
如何订阅:
a、首先发送Basic.Qos
指令约定消息推送速率问题;
b、然后发送Basic.Consume
指令告诉Broker
客户端要开始订阅什么队列上的消息,以及把consumerTag
带上,因为可能存在多个监听队列,则同一个channel
上可能会发送多次Basic.Consume
指令,Broker
向channel
推送消息时需要根据consumerTag
找到对应consumer
处理;
c、Broker通过Deliver指令类型方式向客户端推送消息,客户端接收到消息后,根据consumerTag找到对应consumer交由其进行处理,即分发dispatcher;
d、这里的consumer对应的是InternalConsumer
,它处理逻辑就是放入到它所在的BlockingQueueConsumer
对象中消息队列queue
中;
mainLoop
Broker推送过来的消息放入到了BlockingQueueConsumer对象的消息队列queue中,后续就是从queue中提取消息进行业务处理,逻辑见下图:
a、AsyncMessageProcessingConsumer
被丢入到线程池中执行,则其对应一个线程;
b、这个线程会一直循环执行mainLoop()
方法;
c、mainLoop()
方法中就会从queue中
提取消息,根据batchSize
确定每次提取消息数量,最后回调MessageListener
,实现将消息传递到业务逻辑进行处理;
d、注意:所有的AsyncMessageProcessingConsumer
共用同一个MessageListener
对象,对象状态要注意线程安全问题;
总体流程