样例 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("organization"); consumer.setNamesrvAddr...CONSUME_FROM_FIRST_OFFSET 从最早的消息开始消费 CONSUME_FROM_TIMESTAMP 从指定时间开始消费 注:如果使用CONSUME_FROM_TIMESTAMP ,需设置参数 DefaultMQPushConsumer.setConsumeTimestamp...(“20131223171201”) 时间戳字符串格式为yyyyMMddHHmmss DefaultMQPushConsumer.subscribe(String topic, String subExpression...) subExpression参数为tag选择表达式 语法: 不过滤tag:"*" 或者null 根据多个tag过滤:“tag1 || tag2 || tag3” DefaultMQPushConsumer.registerMessageListener
/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...()必须大于等于1且小于等于65535;defaultMQPushConsumer.getPullThresholdSizeForQueue()必须大于等于1且小于等于1024 pullMessage...(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest...()必须大于等于1且小于等于65535;defaultMQPushConsumer.getPullThresholdSizeForQueue()必须大于等于1且小于等于1024 cachedMessageCount...若大于defaultMQPushConsumer.getPullThresholdForQueue()或者cachedMessageSizeInMiB大于defaultMQPushConsumer.getPullThresholdSizeForQueue
/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...{ this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; } //...... } DefaultMQPushConsumer...private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup...() < 1 || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {...consumeExecutor执行,若出现RejectedExecutionException则将剩余的msg添加到msgThis,然后执行submitConsumeRequestLater doc DefaultMQPushConsumer
序 本文主要研究一下rocketmq的consumeThread th (38).jpeg DefaultMQPushConsumer rocketmq-client-4.5.2-sources.jar.../org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...() < 1 || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) { throw new..."consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") " +...及submitConsumeRequestLater方法均会往consumeExecutor提交consumeRequest doc DefaultMQPushConsumer
序 本文主要研究一下rocketmq的consumeThread DefaultMQPushConsumer rocketmq-client-4.5.2-sources.jar!.../org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...() < 1 || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) { throw new..."consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") " +...及submitConsumeRequestLater方法均会往consumeExecutor提交consumeRequest doc DefaultMQPushConsumer
Pull Consumer Push Consumer DefaultMQPushConsumer DefaultMQPushCOnsumerImpl 通过 start 方法启动 org.apache.rocketmq.client.consumer.DefaultMQPushConsumer...DefaultMQPushConsumer#start 启动代码: public void start() throws MQClientException { setConsumerGroup...(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode...() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID...if (this.defaultMQPushConsumer.getOffsetStore() !
/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...()); //...... // pullBatchSize if (this.defaultMQPushConsumer.getPullBatchSize...() 1024) { throw new MQClientException...= null) { if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !...()传递进来的参数对应于maxNums值 小结 DefaultMQPushConsumer定义了pullBatchSize属性,默认为32;DefaultMQPushConsumerImpl的checkConfig
/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...= -1) { if (this.defaultMQPushConsumer.getPullThresholdForTopic() 102400) { throw...()在值非-1时必须大于等于1且小于等于6553500;defaultMQPushConsumer.getPullThresholdSizeForTopic()在值非-1时必须大于等于1且小于等于102400...()在值非-1时必须大于等于1且小于等于6553500;defaultMQPushConsumer.getPullThresholdSizeForTopic()在值非-1时必须大于等于1且小于等于102400
(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode...() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID...()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel())...", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING...(), this)返回false的时候,会抛出MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup
consumer/PushConsumerImpl.java public class PushConsumerImpl implements PushConsumer { private final DefaultMQPushConsumer...(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode...() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID...()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel())...", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING
(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode...() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID...()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel())...} else { switch (this.defaultMQPushConsumer.getMessageModel()) {...", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING
/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...defaultMQPushConsumer; private final MessageListenerConcurrently messageListener; private final...(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60,...(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); } private void cleanExpireMsg...public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { if (pushConsumer.getDefaultMQPushConsumerImpl
/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...()); //...... // pullInterval if (this.defaultMQPushConsumer.getPullInterval...() 65535) { throw new MQClientException...方法会校验defaultMQPushConsumer.getPullInterval()必须大于等于0且小于等于65535;pullCallback的onSuccess方法判断defaultMQPushConsumer.getPullInterval...()),否则这执行executePullRequestImmediately(pullRequest) doc DefaultMQPushConsumer
/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...()); //...... // pullInterval if (this.defaultMQPushConsumer.getPullInterval...() 65535) { throw new MQClientException...方法会校验defaultMQPushConsumer.getPullInterval()必须大于等于0且小于等于65535;pullCallback的onSuccess方法判断defaultMQPushConsumer.getPullInterval...()),否则这执行executePullRequestImmediately(pullRequest) doc DefaultMQPushConsumer
/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...; } public TraceDispatcher getTraceDispatcher() { return traceDispatcher; } } DefaultMQPushConsumer...defaultMQPushConsumer; private final MessageListenerConcurrently messageListener; private final...(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60,...(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); } private void cleanExpireMsg
/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...) { this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; } //...... } DefaultMQPushConsumer...private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup...() < 1 || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {...consumeExecutor执行,若出现RejectedExecutionException则将剩余的msg添加到msgThis,然后执行submitConsumeRequestLater doc DefaultMQPushConsumer
/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup...() < 1 || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {...this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan...(),大于则执行executePullRequestLater方法进行流控,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL默认值为50 doc DefaultMQPushConsumer
/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java public class DefaultMQPushConsumer extends...{ this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; } //...... } DefaultMQPushConsumer...timeMillis, TimeUnit.MILLISECONDS); } //...... } submitConsumeRequestLater方法在timeMillis为-1时会读取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis...如果该值小于10则重置为10,如果该值大于30000则重置为30000;然后使用scheduledExecutorService延时timeMillis执行submitConsumeRequest方法 小结 DefaultMQPushConsumer...如果该值小于10则重置为10,如果该值大于30000则重置为30000;然后使用scheduledExecutorService延时timeMillis执行submitConsumeRequest方法 doc DefaultMQPushConsumer
领取专属 10元无门槛券
手把手带您无忧上云