近期很多客户在咨询如何从RocketMQ企业版迁移到标准的Apache RocketMQ。基于此,我做了一下的第一版的Java代码Demo,来尝试总结一些迁移的注意事项和两者在客户端的主要差别。后期再逐步整理其他语言的Demo案例,比如我自己很喜欢的Scala和常见语言 Python/Golang/Nodejs。
第一篇文章会针对最基础的代码做迁移对比,之后会逐步增加高阶功能的迁移。
说明了动机,也许有些人觉得我说的有些主观,至少这是我真实的看法。我不会喜欢用只有demo的SDK。
我们在项目里选择使用org.apache.rocketmq的客户端。当前比较新的版本是4.8.0,完全没有问题。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
public interface MessageListenerConcurrently extends MessageListener {
/**
* It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if
* consumption failure
*
* @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
* @return The consume status
*/
ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeConcurrentlyContext context);
}
这个场景下如果想达到针对每个message 消费的回掉接口,需要确认下面这个配置:
consumeMessageBatchMaxSize = 1
之后你会发现,在List<MessageExt> msgs
里只会有1个message,这样就确保了回掉Listener每次只处理一个信息,并且返回的Status只针对这个message。
for(int i = 0 ; i < x ; i++){
// 这里topic会被累计叠加subscribe
consumer.subscribe(topic, tagsString); // tagString in format "tagA||tagB||tagC"
//Listener会被覆盖掉
consumer.registerMessageListener(new UnifiedConcurrentlyMessageListener(listenerMap));
}
源代码如下:
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
// 针对每个topic 做一个subscriptionData,然后put到一个Map当中去,所以之后可以被轮巡
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
而MessageListener就不一样了
public void registerMessageListener(MessageListener messageListener) {
//直接覆盖了~ 好像也可以改名叫setMessageListener
this.messageListenerInner = messageListener;
}
所以这里如果需要监听多个topic和使用不同MessageListener的场景里,需要用类似如下的代码实现:
List<DefaultMQPushConsumer> consumers = new ArrayList<>();
public void init() {
try {
for (String topic : topicList) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(namespace, groupId);
consumer.setNamesrvAddr(nameSrvAddr);
// Optional, default value is CONSUME_FROM_LAST_OFFSET
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeThreadMin(consumeThreadNums);
// default max thread number is 20
consumer.setConsumeThreadMax(consumeThreadNums * 2);
// max retry times
consumer.setMaxReconsumeTimes(3);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setConsumeMessageBatchMaxSize(1);
// subscribe to a topic and append a topic message listener.
consumer.subscribe(topic, tagsStr);
consumer.registerMessageListener(new MyMessageListener());
// store the consumer to the list
consumers.add(consumer);
// start consumer
consumer.start();
}
} catch (Throwable e) {
e.printStackTrace();
throw new RuntimeException("canot initialize consumer");
}
}
MessageListenerConcurrently
和MessageListenerOrderly
。在企业版里,创建Consumer的时候就定了,Consumer分为Consumer
,BatchConsumer
和OrderConsumer
。ConsumeOrderlyStatus
和ConsumeConcurrentlyStatus
,分别对应企业版的Action
和OrderAction
。Apache RocketMQ 的生产者函数send(msg, callback)
vs 企业版生产者函数sendAsync(msg, callback)
源码如下:
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
// 这里用异步的线程调用 - 非阻塞
executor.submit(new Runnable() {
@Override
public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
} catch (Exception e) {
sendCallback.onException(e);
}
} else {
sendCallback.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);
}
}
// 最终调用的是
// MQClientAPIImpl::sendMessageAsync
代码调用如下:
//异步模式
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n", sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
//或者同步模式
SendResult sendResult = producer.send(msg);
当前腾讯云TDMQ的Pulsar已经支持了RocketMQ的协议兼容,并且贡献给了社区 - RoP
当前产品还在内测期,开白可以使用。
想要享受开源便利,又不希望自己运维的同学们可以开始试用了~
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。