前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析

【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析

作者头像
Reactor2020
发布2023-03-22 19:05:46
3K0
发布2023-03-22 19:05:46
举报
文章被收录于专栏:【云原生 • Prometheus】

往期推荐

【sharding-sphere】 - 01 SQL路由

【Nacos源码分析】- 02 获取配置流程

【Java并发编程】- 03 MESI、内存屏障

【Spring源码】- 11 Spring AOP之编程式事务

【编程开发】- 01 日志框架

概述

SimpleMessageListenerContainerspringrabbitmq原生api基础上封装实现的一个消费工具类,该类非常强大,可以实现:监听单个或多个队列、自动启动、自动声明,它还支持动态配置,如动态添加监听队列、动态调整并发数等等,基本上对RabbitMQ消费场景这个类都能满足。如@RabbitListener、cloud-stream中StreamListener中底层实现都是基于该类,所以,理解SimpleMessageListenerContainer原理对理解spring rabbitmq中消费模型非常关键。

基本使用

1、SimpleMessageListenerContainer#addQueueNames()方法可以运行时添加监听队列,removeQueueNames()方法可以运行时移除监听队列;

2、后置处理器setAfterReceivePostProcessors()

代码语言:javascript
复制
//后置处理器,接收到的消息都添加了Header请求头
container.setAfterReceivePostProcessors(message -> {
    message.getMessageProperties().getHeaders().put("desc",10);
    return message;
});
container.setMessageListener((MessageListener) message -> {
    System.out.println("====接收到消息=====");
    System.out.println(message.getMessageProperties());
    System.out.println(new String(message.getBody()));
});

3、设置消费者的Consumer_tag和Arguments:container.setConsumerTagStrategy可以设置消费者的 Consumer_tag, container.setConsumerArguments可以设置消费者的 Arguments

代码语言:javascript
复制
container.setConsumerTagStrategy(queue -> "order_queue_"+(++count));
//设置消费者的Arguments
Map<String, Object> args = new HashMap<>();
args.put("module","订单模块");
args.put("fun","发送消息");
container.setConsumerArguments(args);

4、setConcurrentConsumers设置并发消费者:setConcurrentConsumers设置多个并发消费者一起消费,并支持运行时动态修改。setMaxConcurrentConsumers设置最多的并发消费者。

代码语言:javascript
复制
  @Bean
   public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
       SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
       container.setConnectionFactory(connectionFactory);
       container.setQueueNames("zhihao.miao.order");
       container.setConcurrentConsumers(5);
       container.setMaxConcurrentConsumers(10);
       container.setMessageListener((MessageListener) message -> {
           System.out.println("====接收到消息=====");
           System.out.println(message.getMessageProperties());
           System.out.println(new String(message.getBody()));
       });
       return container;
   }

核心原理

API结构

SimpleMessageListenerContainer类结构如下:

源码分析

方法入口

SimpleMessageListenerContainer类启动的入口是start()方法,该方法位于AbstractMessageListenerContainer类中:

代码语言:javascript
复制
public void start() {
    //如果已启动,则什么也不执行,直接退出
 if (isRunning()) {
  return;
 }
    //initialized是否执行初始化,没有则执行afterPropertiesSet()方法进行初始化,执行完成后initialized设置成true
 if (!this.initialized) {
  synchronized (this.lifecycleMonitor) {
   if (!this.initialized) {
    afterPropertiesSet();
   }
  }
 }
 try {
  logger.debug("Starting Rabbit listener container.");
        //验证RabbitAdmin,mismatchedQueuesFatal=true时,spring context中RabbitAdmin数量不能大于1
  configureAdminIfNeeded();
        //执行RabbitAdmin#initialize方法,spring context中注入的exchanges, queues and bindings执行声明式创建
        /*
        总结一下,我们发现,要想自动创建队列,SimpleMessageListenerContainer需要满足这么两点:
         mismatchedQueuesFatal属性设置为true
         autoDeclare属性也设置为true
        */
  checkMismatchedQueues();
        //启动核心
  doStart();
 }
 catch (Exception ex) {
  throw convertRabbitAccessException(ex);
 }
 finally {
  this.lazyLoad = false;
 }
}

SimpleMessageListenerContainer#doStart方法:

代码语言:javascript
复制
protected void doStart() {
 Assert.state(!this.consumerBatchEnabled || getMessageListener() instanceof BatchMessageListener
   || getMessageListener() instanceof ChannelAwareBatchMessageListener,
   "When setting 'consumerBatchEnabled' to true, the listener must support batching");
 //如果MessageListener是ListenerContainerAware,则进行expectedQueueNames校验
    checkListenerContainerAware();
    //调用父类doStart()方法,主要是active和running都设置成true
 super.doStart();
 synchronized (this.consumersMonitor) {
  if (this.consumers != null) {
   throw new IllegalStateException("A stopped container should not have consumers");
  }
        /*
        创建BlockingQueueConsumer类型consumer,每个concurrentConsumers并发对应创建一个对象,并存储到Set<BlockingQueueConsumer> consumers集合中,
        返回值就是创建consumer对象个数,具体创建逻辑见:SimpleMessageListenerContainer#createBlockingQueueConsumer,主要注意下prefetchCount计算:
        int actualPrefetchCount = getPrefetchCount() > this.batchSize ? getPrefetchCount() : this.batchSize;即如果prefetchCount大于batchSize,则其就是实际值,否则prefetchCount等于batchSize值
        */
  int newConsumers = initializeConsumers();
  if (this.consumers == null) {
   logger.info("Consumers were initialized and then cleared " +
     "(presumably the container was stopped concurrently)");
   return;
  }
  if (newConsumers <= 0) {
   if (logger.isInfoEnabled()) {
    logger.info("Consumers are already running");
   }
   return;
  }
        /*
        每个并发对应一个BlockingQueueConsumer对象,这里将每个BlockingQueueConsumer对象封装成AsyncMessageProcessingConsumer,这样可以丢到线程池中异步执行
        */
  Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
  for (BlockingQueueConsumer consumer : this.consumers) {
            //将BlockingQueueConsumer对象封装成AsyncMessageProcessingConsumer进行异步执行
   AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
            //存储到processors集合中
   processors.add(processor);
            //将AsyncMessageProcessingConsumer丢到线程池中执行
   getTaskExecutor().execute(processor);
   if (getApplicationEventPublisher() != null) {
                //事件发送
    getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
   }
  }
        //判断启动过程中是否存在异常
  waitForConsumersToStart(processors);
 }
}

上面代码大致逻辑:BlockingQueueConsumer对象可以看成consumer,然后将其包装成AsyncMessageProcessingConsumer异步任务丢入到线程池中运行。

异步任务

上面分析了BlockingQueueConsumer类型的consumer会被封装成AsyncMessageProcessingConsumer异步任务丢入到线程池中运行。下面主要就来分析下异步任务执行时做了些什么,该逻辑在SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run方法,它主要做如下几个事情:

1、监听队列判断

代码语言:javascript
复制
//BlockingQueueConsumer.getQueueCount() < 1,表示当前consumer没有设置任何监听队列,则没必要启动
if (this.consumer.getQueueCount() < 1) {
  if (logger.isDebugEnabled()) {
   logger.debug("Consumer stopping; no queues for " + this.consumer);
  }
  SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
  if (getApplicationEventPublisher() != null) {
   getApplicationEventPublisher().publishEvent(
     new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
  }
  this.start.countDown();
  return;
}

2、核心逻辑

代码语言:javascript
复制
try {
 initialize();
 while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
  mainLoop();
 }
}
catch (InterruptedException e) {
    省略。。。
}
catch (QueuesNotAvailableException ex) {
 省略。。。
}
省略。。。

AsyncMessageProcessingConsumer#run执行最核心逻辑就是上面try语句中,首先执行initialize()初始化方法,然后开始无限循环执行mainLoop()方法。

初始化

上面分析的AsyncMessageProcessingConsumer运行时执行了两个关键操作:initialize()初始化操作和mainLoop()无限循环。

首先,来看下initialize()初始化操作主要干了些什么:

  • 调用attemptDeclarations()方法进行声明式exchangequeuebindings创建,主要通过执行RabbitAdmin#initialize方法实现;
  • 调用BlockingQueueConsumer#start方法,该方法主要完成与Rabbit Broker指令交互: 1、passiveDeclarations()方法校验监听队列是否存在:channel.queueDeclarePassive(queueName),最终会向Rabbit Broker发送queue.declare指令,并设置passive=true,如下图:

2、setQosAndreateConsumers()方法用于进行客户端流量控制Qos以及消息订阅

a、Qos流控:channel.basicQos(this.prefetchCount),最终会向Rabbit Broker发送basic.qos指令,并将prefetch-sizeprefetch-countglobal参数设置过去,如下图:

b、consumeFromQueue()方法会使用channel.basicConsume方法订阅消息,最终会向Rabbit Broker发送basic.consume指令,并指定订阅消息的queue名称等参数消息(如下图),注意:SimpleMessageListenerContainer可能设置多个监听队列,则BlockingQueueConsumer这里会给每个监听队列都向Broker发送一个basic.consume订阅指令,并且是使用同一个channel

Broker发送指令com.rabbitmq.client.impl.AMQCommand#transmit()方法中,可以在如下红色框框处代码断点监控:

响应处理

上面分析initialize()初始化操作,客户端向Broker发送basic.qosbasic.consume指令就相当于告诉了服务器:我都准备好了,如果监听队列有消息你就把它推送给我,下面就来分析下Broker消息推送流程。

Rabbit Broker接收到Basic.consume指令后,会向客户端反馈Basic.consume-ok指令,表示服务端一切就绪准备给客户端推送消息,然后就通过Basic.Deliver指令类型将消息推送给客户端,一条消息对应一个Deliver反馈,客户端接收到服务端返回过来的指令类型后,在ChannelN#processAsync方法进行判断处理,它是amqp-client依赖包中类:

如果是Deliver类型指令,则调用processDelivery()方法进行处理:

代码语言:javascript
复制
protected void processDelivery(Command command, Basic.Deliver method) {
    Basic.Deliver m = method;

    //根据Deliver的consumerTag获取到InternalConsumer对象,因为一个Channel上可能存在多个consumer,需要找到Broker是针对哪个consumer进行的响应
    Consumer callback = _consumers.get(m.getConsumerTag());
    if (callback == null) {
        if (defaultConsumer == null) {
            // No handler set. We should blow up as this message
            // needs acking, just dropping it is not enough. See bug
            // 22587 for discussion.
            throw new IllegalStateException("Unsolicited delivery -" +
                    " see Channel.setDefaultConsumer to handle this" +
                    " case.");
        }
        else {
            callback = defaultConsumer;
        }
    }

    Envelope envelope = new Envelope(m.getDeliveryTag(),
                                     m.getRedelivered(),
                                     m.getExchange(),
                                     m.getRoutingKey());
    try {
        // call metricsCollector before the dispatching (which is async anyway)
        // this way, the message is inside the stats before it is handled
        // in case a manual ack in the callback, the stats will be able to record the ack
        metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
        this.dispatcher.handleDelivery(callback,
                                       m.getConsumerTag(),
                                       envelope,
                                       (BasicProperties) command.getContentHeader(),
                                       command.getContentBody());
    } catch (WorkPoolFullException e) {
        // couldn't enqueue in work pool, propagating
        throw e;
    } catch (Throwable ex) {
        getConnection().getExceptionHandler().handleConsumerException(this,
            ex,
            callback,
            m.getConsumerTag(),
            "handleDelivery");
   }
}

processDelivery()处理Broker返回的Deliver消息大致流程:

Consumer callback = _consumers.get(m.getConsumerTag()):根据DeliverconsumerTag获取到InternalConsumer对象,因为一个Channel上可能存在多个consumer,需要找到Broker是针对哪个consumer进行的响应

封装DeliverEnvelope

代码语言:javascript
复制
Envelope envelope = new Envelope(m.getDeliveryTag(),
                                         m.getRedelivered(),
                                         m.getExchange(),
                                         m.getRoutingKey());

metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag()):统计数据处理

调用ConsumerDispatcher#handleDelivery,其会创建任务丢到线程池中执行,任务:将数据交由具体的consumer处理,即调用InternalConsumer#handleDelivery

代码语言:javascript
复制
this.dispatcher.handleDelivery(callback,
                                           m.getConsumerTag(),
                                           envelope,
                                           (BasicProperties) command.getContentHeader(),
                                           command.getContentBody());

InternalConsumer#handleDelivery()方法:将Broker返回的Deliver数据放入到BlockingQueueConsumer.queue中:

代码语言:javascript
复制
BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));

所以,如果ListenerContainer监听多个队列,则BlockingQueueConsumer中则对应多个InternalConsumer,每个InternalConsumer映射Broker上的一个,BlockingQueueConsumer下所有InternalConsumer共享同一个queue

业务处理

上面分析了消息订阅以及Broker推送过来的消息数据会被缓存到BlockingQueueConsumer对象的queue队列中,下面就来分析下从queue中提取消息到传递给用户业务逻辑这个流程。这就需要分析AsyncMessageProcessingConsumer#run方法中另一个非常重要操作:无限循环mainLoop操作,它主要就是完成从queue中提取消息数据然后经过一系列操作最终传递给用户逻辑MessageListener中。

代码语言:javascript
复制
private void mainLoop() throws Exception { // NOSONAR Exception
 try {
  boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
  if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
   checkAdjust(receivedOk);
  }
  long idleEventInterval = getIdleEventInterval();
  if (idleEventInterval > 0) {
   if (receivedOk) {
    updateLastReceive();
   }
   else {
    long now = System.currentTimeMillis();
    long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
    long lastReceive = getLastReceive();
    if (now > lastReceive + idleEventInterval
      && now > lastAlertAt + idleEventInterval
      && SimpleMessageListenerContainer.this.lastNoMessageAlert
      .compareAndSet(lastAlertAt, now)) {
     publishIdleContainerEvent(now - lastReceive);
    }
   }
  }
 }
 catch (ListenerExecutionFailedException ex) {
  // Continue to process, otherwise re-throw
  if (ex.getCause() instanceof NoSuchMethodException) {
   throw new FatalListenerExecutionException("Invalid listener", ex);
  }
 }
 catch (AmqpRejectAndDontRequeueException rejectEx) {
  /*
   *  These will normally be wrapped by an LEFE if thrown by the
   *  listener, but we will also honor it if thrown by an
  *  error handler.
  */
 }
}

跟踪下doReceiveAndExecute():

代码语言:javascript
复制
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR

 Channel channel = consumer.getChannel();

 List<Message> messages = null;
 long deliveryTag = 0;

    //batchSize默认是1,用于指定一次从queue中提取消息数量
 for (int i = 0; i < this.batchSize; i++) {
  logger.trace("Waiting for message from consumer.");
  Message message = consumer.nextMessage(this.receiveTimeout);
  if (message == null) {
   break;
  }
        //是否批处理
  if (this.consumerBatchEnabled) {
   Collection<MessagePostProcessor> afterReceivePostProcessors = getAfterReceivePostProcessors();
   if (afterReceivePostProcessors != null) {
    Message original = message;
    deliveryTag = message.getMessageProperties().getDeliveryTag();
    for (MessagePostProcessor processor : getAfterReceivePostProcessors()) {
     message = processor.postProcessMessage(message);
     if (message == null) {
      channel.basicAck(deliveryTag, false);
      if (this.logger.isDebugEnabled()) {
       this.logger.debug(
         "Message Post Processor returned 'null', discarding message " + original);
      }
      break;
     }
    }
   }
   if (message != null) {
    if (messages == null) {
     messages = new ArrayList<>(this.batchSize);
    }
    if (isDeBatchingEnabled() && getBatchingStrategy().canDebatch(message.getMessageProperties())) {
     final List<Message> messageList = messages;
     getBatchingStrategy().deBatch(message, fragment -> messageList.add(fragment));
    }
    else {
     messages.add(message);
    }
   }
  }
  else {
   messages = debatch(message);
   if (messages != null) {
    break;
   }
   try {
                //执行MessageListener
    executeListener(channel, message);
   }
   catch (ImmediateAcknowledgeAmqpException e) {
    if (this.logger.isDebugEnabled()) {
     this.logger.debug("User requested ack for failed delivery '"
       + e.getMessage() + "': "
       + message.getMessageProperties().getDeliveryTag());
    }
    break;
   }
   catch (Exception ex) {
    if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
     if (this.logger.isDebugEnabled()) {
      this.logger.debug("User requested ack for failed delivery: "
        + message.getMessageProperties().getDeliveryTag());
     }
     break;
    }
    if (getTransactionManager() != null) {
     if (getTransactionAttribute().rollbackOn(ex)) {
      RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
        .getResource(getConnectionFactory());
      if (resourceHolder != null) {
       consumer.clearDeliveryTags();
      }
      else {
       /*
       * If we don't actually have a transaction, we have to roll back
       * manually. See prepareHolderForRollback().
       */
       consumer.rollbackOnExceptionIfNecessary(ex);
      }
       throw ex; // encompassing transaction will handle the rollback.
      }
      else {
       if (this.logger.isDebugEnabled()) {
        this.logger.debug("No rollback for " + ex);
       }
       break;
      }
     }
     else {
      consumer.rollbackOnExceptionIfNecessary(ex);
      throw ex;
     }
    }
   }
  }
  if (messages != null) {
   executeWithList(channel, messages, deliveryTag, consumer);
  }

  return consumer.commitIfNecessary(isChannelLocallyTransacted());

 }

总结

上面对SimpleMessageListenerContainer核心源码进行分析,比较枯燥不太直观,总结下其最核心就是位于AsyncMessageProcessingConsumer#run方法中两个操作:initialize()和无限循环mainLoop()

initialize


initialize()方法主要完成:通过指令方式将需要监听队列信息告诉Rabbit BrokerBroker在监听队列中有消息数据时通过Deliver指令将消息推送给客户端,客户端接收的Deliver指令后,根据consumerTag分发(dispatcher)给具体consumer,然后consumer将其放入到其所属BlockingQueueConsumer对象的队列queue中,其逻辑可见下图:

BlockingQueueConsumerAsyncMessageProcessingConsumer、监听队列等关系:

1、BlockingQueueConsumer相当于一个逻辑消费者,通过封装成AsyncMessageProcessingConsumer异步任务,然后丢到线程池中运行,线程池可以通过SimpleMessageListenerContainer#setTaskExecutor进行自定义配置,所以,BlockingQueueConsumer可以看成单独线程运行,且对应一个Channel

2、SimpleMessageListenerContainer可以监听多个队列消息,每个队列又会创建一个InternalConsumer对象,用于映射Broker上的consumer概念,它们是共用同一个channel,即channel下存在多个consumer,它们之间通过consumerTag区分,另外,Broker推送消息也是根据consumerTag识别具体推送给哪个consumer进行处理;

案例,比如:

代码语言:javascript
复制
container.setQueueNames("test01", "test02");
container.setConcurrentConsumers(3);
container.setConsumerTagStrategy(queue -> "consumer idx:"+consumerIdx.getAndIncrement());

a、根据并发数concurrentConsumers创建对应数量的BlockingQueueConsumer,然后封装成AsyncMessageProcessingConsumer,再分配一个线程进行执行,这里设置成3,所以会有3个线程运行AsyncMessageProcessingConsumer,每个AsyncMessageProcessingConsumer对应一个channel,所以会创建3个channel,在Web UI上可以看到对应channel:

b、每个监听队列创建一个InternalConsumer和Broker的consumer进行映射,这里有两个监听队列,所以每个channel下会存在2个consumer:

AsyncMessageProcessingConsumer如何订阅:

a、首先发送Basic.Qos指令约定消息推送速率问题;

b、然后发送Basic.Consume指令告诉Broker客户端要开始订阅什么队列上的消息,以及把consumerTag带上,因为可能存在多个监听队列,则同一个channel上可能会发送多次Basic.Consume指令,Brokerchannel推送消息时需要根据consumerTag找到对应consumer处理;

c、Broker通过Deliver指令类型方式向客户端推送消息,客户端接收到消息后,根据consumerTag找到对应consumer交由其进行处理,即分发dispatcher;

d、这里的consumer对应的是InternalConsumer,它处理逻辑就是放入到它所在的BlockingQueueConsumer对象中消息队列queue中;

mainLoop


Broker推送过来的消息放入到了BlockingQueueConsumer对象的消息队列queue中,后续就是从queue中提取消息进行业务处理,逻辑见下图:

a、AsyncMessageProcessingConsumer被丢入到线程池中执行,则其对应一个线程;

b、这个线程会一直循环执行mainLoop()方法;

c、mainLoop()方法中就会从queue中提取消息,根据batchSize确定每次提取消息数量,最后回调MessageListener,实现将消息传递到业务逻辑进行处理;

d、注意:所有的AsyncMessageProcessingConsumer共用同一个MessageListener对象,对象状态要注意线程安全问题;

总体流程


本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-08-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Reactor2020 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 基本使用
  • 核心原理
    • API结构
      • 源码分析
        • 方法入口
        • 异步任务
        • 初始化
        • 响应处理
        • 业务处理
    • 总结
    相关产品与服务
    消息队列
    腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档