sendLatencyFaultEnable
设定一个topic -> 根据设定的MessageQueue个数 -> 分不在不同的master Broker里边 -> 每个MessageQueue是由多个 CommitLog组成 -> Commit是采用顺序读写。加上OS PageCache来保证写入性能 -> 首先。OS PageCache是基于内存的缓冲池。采用异步刷盘或者同步刷盘顺序写入磁盘 (异步刷盘宕机是会有可能导致数据丢失的
producer.send(msg);
defaultMQProducerImpl.sendDefaultImpl
this.tryToFindTopicPublishInfo 从 NameService 获取 Topic路由信息(本地有缓存就从缓存中获取)
this.selectOneMessageQueue 选择一个消息队列 queue
this.sendKernelImpl 调用发送核心方法
mQClientFactory.getMQClientAPIImpl().sendMessage 进行发送
MQClientAPIImpl#sendMessageSync
remotingClient.invokeSync 调用netty方法发送 RequestCode.SEND_MESSAGE 消息
NettyServerHandler#channelRead0
NettyRemotingAbstract#processMessageReceived
NettyRemotingAbstract#processRequestCommand 处理客户端的请求消息
processor.asyncProcessRequest 客户端发送的是异步消息,不需要同步返回成功
SendMessageProcessor#asyncProcessRequest 进入消息处理
AbstractSendMessageProcessor#parseRequestHeader 解析请求
SendMessageProcessor#asyncSendMessage 异步保存发送的消息
this.brokerController.getMessageStore().asyncPutMessage(msgInner) MessageStore存储消息
MessageStore#asyncPutMessage 异步保存发送的消息
MessageStore#putMessage 保存发送的消息
DefaultMessageStore#asyncPutMessages DefaultMessageStore保存消息默认实现
this.commitLog.asyncPutMessages(messageExtBatch) 保存发送的消息
CommitLog#asyncPutMessages 保存发送的消息
mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext) mappedFile
CommitLog#submitFlushRequest 提交刷盘 (异步 / 同步)
CommitLog#submitReplicaRequest 将消息同步到从节点。可配置备份多少个
消息保存完毕
RemotingClient#invokeSync
RemotingClient#invokeSyncImpl 发送同步方法的实现
this.responseTable.put(opaque, responseFuture) 这个很关键,将opaque 存到响应的 responseTable里边
然后下方 responseFuture.waitResponse(timeoutMillis) 会阻塞当前请求
NettyRemotingAbstract 然后我们看此处的方法。
RemotingResponseCallback callback = new RemotingResponseCallback() 构建了一个远程
如果请求是同步请求的话,一定会触发 callback.callback(response);
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
doAfterRpcHooks(remoteAddr, cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
response.setOpaque(opaque);
想当于将请求的 opaque 塞入到了response里边。
ctx.writeAndFlush(response);
到调用方
NettyRemotingAbstract#processMessageReceived 检查到是 RESPONSE_COMMAND 响应的请求
responseFuture.putResponse 会设置 responseCommand 并且 countDownLatch.countDown 释放之前阻塞的请求