MQ 的基本用途
MQ 本质:异步通信
RocketMQ 四大核心组件
NameServer 是一个 Broker 与 Topic 路由的注册中心,支持 Broker 的动态注册与发现,主要功能如下:
Broker管理:接收 Broker 集群的注册信息并且保存下来作为路由信息的基本数据,提供心跳监测机制,检查 Broker 是否还存活。
路由信息管理:每个 NameServer中都保存 Broker 集群的整个路由信息可用于客户端查询队列信息,Producer 和 Consumer 通过 NameServer可以获取整个 Broker 集群路由信息从而进行消息投递和消费。
日志文件存储路径配置和启动
启动类: org.apache.rocketmq.namesrv.NamesrvStartup
代码片段如下:
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
通过 main 方法启动 NameServer,分为两大步,先创建 NamesrvController,而后再初始化并启动 NamesrvController。
在 RocketMQ 中,默认情况下,Broker 服务器会每间隔 30 秒向集群中所有的 NameServer 发送心跳信息。
org.apache.rocketmq.broker.BrokerController#start
// 每隔30s发送心跳
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
NameServer 会接收到 Broker 发来的心跳包,DefaultRequestProcessor(默认请求处理器) 根据请求头中的code 码,执行处理心跳包的操作。
具体会调用 RouteInfoManager.registerBroker() 操作。其实 broker 注册路由过程,主要还是操作RouteInfoManager 中的多个映射表
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
RouteInfoManager:用于管理心跳信息以及路由管理
RouteInfoManager 管理的路由元数据:(元数据是什么?)
① topicQueueTable:topic路由信息,broker名称,读队列数,写队列数等
② brokerAddrTable:broker信息,broker集群,broker名称,broker地址等
③ clusterAddrTable:broker集群信息,broker名称列表
④ brokerLiveTable:broker状态信息,broker存活最新上报时间
⑤ filterServerTable:filterServer列表,消息过滤信息
路由元信息Map结构示例如下:
org.apache.rocketmq.namesrv.NamesrvController#initialize
// 每隔 10s 进行扫描判断是否存在失效的Broker,如果存在则移除失效broker
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker
public int scanNotActiveBroker() {
int removeCount = 0;
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
removeCount++;
}
}
return removeCount;
}
备注:Broker 视角触发路由删除,即 Broker 在正常关闭的情况下,会执行 unregisterBroker 指令这两种方式路由删除的方法都是一样的,都是从相关路由表中删除与该 broker 相关的信息。
RocketMQ 的路由发现采用的是 Pull 模型,当 Topic 路由信息发生变化时,NameServer 不会主动推送给客户端,而是客户端定时拉取主题最新的路由,默认让客户端每 30 秒会拉取一次最新的路由。
(Push、Pull、Long Polling 三种模型的优缺点?)
这次源码剖析学到了什么?
本次分享很多不足的地方还请指教。
如下图所示:
本地调试法(启动类,Debug,QuickStart等)
(1)从 GitHub 上 clone 代码到本地
https://github.com/apache/rocketmq.git
(2)导入 IDEA
(3)执行 mvn clean install -DskipTests
(4)启动 NameServer
(5)启动 Broker
(6)启动 Producer
(7)启动 Consumer
org.apache.rocketmq.namesrv.NamesrvStartup#main
打开 【Edit Configuration】配置启动参数:
ROCKET_HOME 环境变量设置完成如下:
再次启动 NameServer
添加 logback_namesrv.xml 文件到指定文件路径下:
打开 logback_namesrv.xml 文件并替换 RocketMQ 运行路径
再次启动,出现“The Name Server boot success …”字样,表示启动成功。
org.apache.rocketmq.broker.BrokerStartup#main
ROCKET_HOME 环境变量设置如下:
再次启动 NameServer
添加 logback_namesrv.xml 文件到指定文件路径下,报错:未找到 broker.conf 文件
打开 broker.conf 文件并添加配置信息
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#追加配置如下:
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#nameServer 地址,分号分割
namesrvAddr=127.0.0.1:9876
#broker 地址
brokerIP1=10.2.67.197
#存储路径
storePathRootDir=D:/xxx/rocketmqserver/store
#commitLog 存储路径
storePathCommitLog=D:/xxx/rocketmqserver/store/commitlog
#消费队列存储路径
storePathConsumeQueue=D:/xxx/rocketmqserver/store/consumequeue
#消息索引存储路径
storePathIndex=D:/xxx/rocketmqserver/store/index
#checkpoint 文件存储路径
storeCheckpoint=D:/xxx/rocketmqserver/store/checkpoint
#abort 文件存储路径
abortFile=D:/xxx/rocketmqserver/store/abort
设置【Programma arguments】项目启动参数如下:
再次启动,出现“The broker[…] boot success …”字样,表示启动成功。
org.apache.rocketmq.example.quickstart.Producer#main
org.apache.rocketmq.example.quickstart.Consumer#main
原文链接:https://mp.weixin.qq.com/s/94DaQVGvZwFQlSUvQmrL1A
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。