上篇文章主要介绍消息中间件并以RocketMQ架构展开描述其核心组件以及MQ运行流程
本篇文章以Product的视角来看看发送消息的核心原理与设计思想,最后以图文并茂的方式描述出发送消息的核心流程
RocketMQ中普通消息提供三种发送方式:同步、异步、单向
上篇文章中我们已经使用封装好的API延时过同步发送
在使用三种方式前,我们先来理解它们的理论知识
同步发送:发送完消息后,需要阻塞直到收到Broker的响应,通常用于数据一致性较高的操作,需要确保消息到达Broker并持久化
同步发送收到响应并不一定就是成功,还需要根据响应状态进行判断
SendResult响应状态包括:
(这些状态与设置的刷盘策略有关,后续保证消息可靠的文章再进行详细展开说明,本篇文章还是回归主线探究发送消息)
异步发送:发送完消息后立即响应,不需要阻塞等待,但需要设置监听器,当消息成功或失败时进行业务处理,可以在失败时进行重试等其他逻辑保,通常用于追求响应时间的场景
异步发送相当于同步发送,需要新增SendCallback回调来进行后续成功/失败的处理,并且异步发送没有返回值
@GetMapping("/asyncSend")
public String asyncSend() {
producer.sendAsyncMsg(topic, "tag", "async hello world!", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功{}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.info("消息发送失败", throwable);
//记录后续重试
}
});
return "asyncSend ok";
}
原生API封装:
public void sendAsyncMsg(String topic, String tag, String jsonBody, SendCallback sendCallback) {
Message message = new Message(topic, tag, jsonBody.getBytes(StandardCharsets.UTF_8));
try {
producer.send(message, sendCallback);
} catch (MQClientException | RemotingException | InterruptedException e) {
throw new RuntimeException(e);
}
}
单向发送:只要发出消息就响应,性能最好,通常用于追求性能,不追求可靠的场景,如:异步日志收集
由于单向发送的特性,即不需要回调也没有返回结果
@GetMapping("/sendOnewayMsg")
public String onewaySend() {
producer.sendOnewayMsg(topic, "tag", "oneway hello world!");
return "sendOnewayMsg ok";
}
原生API封装:
public void sendOnewayMsg(String topic, String tag, String jsonBody) {
Message message = new Message(topic, tag, jsonBody.getBytes(StandardCharsets.UTF_8));
try {
producer.sendOneway(message);
} catch (MQClientException | RemotingException | InterruptedException e) {
throw new RuntimeException(e);
}
}
在研究发送消息的原理前,不妨来思考下,如果让我们实现,我们要思考下需要哪些步骤?
像我们平时进行业务代码编写前的第一步就是进行参数校验,因为要防止参数“乱填”的意外情况
然后由于需要找到对应的Broker,那肯定要获取Topic路由相关信息
这个路由信息前文说过是从NameServer集群定时获取即时更新的,那么客户端的内存里肯定会进行存储
像这样的数据肯定是类似于多级缓存的,先在本地缓存,如果本地没有或者本地是旧数据,那么就网络通信再去远程(NameServer)获取一份后再更新本地缓存
获取完路由信息后,可以通过设置的Topic获取对应的MessageQueue队列信息,因为Topic下可能有很多队列,因此需要负载均衡算法决定要发送的队列
rocketmq发送消息还提供超时、重试等机制,因此在这个过程中需要计算时间、重试次数
最后发送消息会进行网络通信,我们要选择合适的工具进行RPC
总结一下,如果让我们设计起码要有这些流程:参数校验、获取路由信息、根据负载均衡算法选择队列、计算超时,重试次数、选择网络通信RPC工具...
在设计完流程后,如果我们是一位”成熟的设计师“,那么一定会将这些步骤中通用的步骤抽象成模板,模板可以作为三种发送消息通用方式,而那些变动的就是策略,解耦互不影响,并在重要的流程前后留下”钩子“,方便让使用者进行扩展
rocketmq流程与我们设计、思考的流程类似,先准备一张最终的流程图,方便跟着流程图一起阅读源码:
通过三种发送方式,都会来到DefaultMQProducerImpl.sendDefaultImpl
这个就是通用方法的模板
代码块中只展示部分关键代码,流程如下:
在3、4步骤中还会进行重试、超时判断等
private SendResult sendDefaultImpl(
//消息
Message msg,
//方式
final CommunicationMode communicationMode,
//异步的回调
final SendCallback sendCallback,
//超时时间
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//参数校验
Validators.checkMessage(msg, this.defaultMQProducer);
//获取路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
//计算重试次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
//已经重发次数
int times = 0;
//重试循环
for (; times < timesTotal; times++) {
//上次试过的Broker
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//选择一个要发送的MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
try {
beginTimestampPrev = System.currentTimeMillis();
//重发时设置topic
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
//超时退出
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
//发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
//记录延时
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
//最后分情况处理
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
//如果响应状态不成功 如果设置重试其他broker则进行重试
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
}
//...
}
其中CommunicationMode就是发送的方式,分别为:SYNC同步、ASYNC异步、ONEWAY单向
rocketmq中使用大量散列表存储数据,其中存储路由信息的是
ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>()
topicPublishInfoTable中Key为topic,Value为路由信息TopicPublishInfo
TopicPublishInfo中主要包括messageQueueList对应的队列列表、sendWhichQueue后续用来选择哪一个队列、topicRouteData路由数据
在topicRouteData路由数据中主要有brokerDatas、queueDatas
brokerDatas包含所有的Broker信息,queueDatas包含每个broker上对应的数据,比如读写队列数量
在获取路由信息的方法中,先尝试从本地获取 this.topicPublishInfoTable.get
,如果本地不存在则从NameServer获取 this.mQClientFactory.updateTopicRouteInfoFromNameServer
(这里的this.mQClientFactory
实际上是MQClientInstance,生产者、消费者都会用到,用于客户端远程调用服务端,里面也会存对应相关的组件)
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//本地获取
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
//远程获取
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//远程获取
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
选择队列默认情况下会来到这里,会通过线性轮询选择队列 selectOneMessageQueue,重试的区别为本次选择的broker不和上次的相同
(因为上次失败broker可能会存在问题,这次就换一个broker)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//lastBrokerName:上一次的broker
if (lastBrokerName == null) {
//线性轮询选择队列 selectOneMessageQueue
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
//线性轮询选择队列
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
//找到不和上次一样的broker
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
在发送消息前需要对消息进行封装,如:设置唯一ID、尝试压缩消息、封装消息头等
在发送前还有检查禁止发送的钩子和发送前后执行的钩子,方便扩展
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//获取broker信息
String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
}
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
//不是批量消息就设置唯一ID
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
//尝试压缩消息
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= compressType.getCompressionFlag();
msgBodyCompressed = true;
}
//尝试执行检查禁止发送消息的钩子
if (hasCheckForbiddenHook()) {
//...
this.executeCheckForbiddenHook(checkForbiddenContext);
}
//尝试执行发送消息前的钩子
if (this.hasSendMessageHook()) {
//...
this.executeSendMessageHookBefore(context);
}
//封装消息头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//set...
//根据不同的发送方式调整
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
//...
//获取MQ客户端发送
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
brokerName,
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
//检查超时
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
//获取MQ客户端发送消息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
brokerName,
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
//尝试执行发送完消息的钩子
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
}
//...
}
同步消息最终会调用invokeSync
,这种服务间的网络通信又称为远程调用RPC
在RPC前后也有钩子可以进行扩展
最终调用invokeSyncImpl
会通过netty的channel进行写数据
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
//rpc前的钩子
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
}
//使用netty的channel写数据
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//rpc后的钩子
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
this.updateChannelLastResponseTime(addr);
return response;
}
}
通过netty的channel写请求,并添加监听器,最后使用结果调用waitResponse
进行同步等待
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
try {
//结果
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
//写请求 并添加监听器
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
//...
});
//同步调用 等待结果
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
异步消息RPC类似,只是不需要最后的同步等待
走完整体的发送消息流程,我们再回过头来查看重试机制
总共尝试发送消息的次数取决于 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1
如果是同步消息则为 1 + retryTimesWhenSendFailed 默认2次 = 3次,其他情况就是1次
也就是说只有同步发送才会重试!异步、单向都不会进行重试?
就在我查找同步最大重试次数 retryTimesWhenSendFailed
时,同时还发现异步的最大重试次数 retryTimesWhenSendAsyncFailed
实际上异步发送重试的代码在异常的catch块中,异常才去执行 onExceptionImpl
如果重试同步发送时,需要去其他broker还要把 retryAnotherBrokerWhenNotStoreOK
设置为true,默认false
至此发送消息的流程算是过了一遍,在查看源码的过程中大部分内容都是见名知意的,这不比公司的”shit mountain“看着舒服?
最后再来总结下流程,便于同学们记忆:
retryAnotherBrokerWhenNotStoreOK
开启后,同步失败新的重试会选择其他broker消息发送的方式有三种:同步、异步、单向,根据顺序可靠性逐渐下降、性能逐渐提升
同步消息能够通过响应判断是否真正成功,常用于需要消息可靠、数据一致的场景,如同步
异步消息通过实现回调处理成功与失败,常用于响应时间敏感的场景,如异步短信
单向消息不需要进行处理,常用于追求性能的场景,如异步日志
消息发送的过程中会先检查消息参数,确保消息无误,再获取路由信息,如果本地不存在则向NameServer获取
路由信息存储topic对应的broker、队列列表、broker上的队列等相关信息
然后通过线性轮询算法选择要发送消息的队列,如果重试则不会选择相同的broker
接着会设置消息的唯一ID、判断是否压缩消息、尝试执行检查禁止发送、发送消息前后的钩子等
最后使用netty写请求进行rpc调用,同时也会有rpc前后的钩子
在此期间同步、异步会根据参数进行超时检查、重试等操作
本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。