前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ 大脑 NameServer 赏析

RocketMQ 大脑 NameServer 赏析

原创
作者头像
IT技术小咖
发布2023-11-17 10:43:40
3290
发布2023-11-17 10:43:40
举报
文章被收录于专栏:码上修行

1. 为什么是 RocketMQ

2. 使用场景

MQ 的基本用途

  • 异步处理(如:订单支付后,扣减库存、增加积分,引入 MQ 异步去做,降低响应时间)
  • 系统解耦(如:订单完成后,发送 MQ,下游依赖系统(库存、营销)自行调用)
  • 削峰填谷(如:秒杀活动,请求进入MQ,服务器慢慢处理,多余请求抛弃)
  • 日志收集(如:分布式系统调用日志记录)

MQ 本质:异步通信

3. 工作原理

RocketMQ 四大核心组件

  • NameServer:主要负责对于元数据的管理,包括了对于 Topic 和路由信息的管理。(类比:快递中心
  • Broker:消息中转角色,负责存储消息,转发消息。(类比:快递柜
  • Producer:负责生产消息。(类比:寄件人发货
  • Consumer:负责消费消息。(类比:收件人收货

【来源于RocketMQ官网】
【来源于RocketMQ官网】

4. 演进过程

  • 在 RocketMQ 的早期版本其实不叫 RocketMQ,而是叫 MetaQ,在 MetaQ1.0 和 MetaQ2.0 实际上是依赖的是 Zookeeper,但是从 MetaQ3.0 的时候更名为 RocketMQ,同时也去掉了 Zookeeper。
  • RocketMQ 整体设计思想是来自于 kafka,而 kafka 是使用 Zookeeper 作为注册中心,所以早期版本也是使用的 Zookeeper,到了后期发展,摈弃了 Zookeeper,自己写了一套 NameServer,保证独立性。
  • 采用 Zookeeper 需要保持强一致性,还会导致整体架构就会变得复杂,在维护、搭建成本上都会上升,为了保证高性能,低维护成本,那么就开发了 NameServer。

5. 从 NameServer 起点

5.1 RocketMQ 大脑 —— NameServer

NameServer 是一个 Broker 与 Topic 路由的注册中心,支持 Broker 的动态注册与发现,主要功能如下:

Broker管理:接收 Broker 集群的注册信息并且保存下来作为路由信息的基本数据,提供心跳监测机制,检查 Broker 是否还存活。

路由信息管理:每个 NameServer中都保存 Broker 集群的整个路由信息可用于客户端查询队列信息,Producer 和 Consumer 通过 NameServer可以获取整个 Broker 集群路由信息从而进行消息投递和消费。

5.2 本地调试

日志文件存储路径配置和启动

日志文件存储路径配置和启动
日志文件存储路径配置和启动

5.3 NamesrvStartup 启动类

启动类: org.apache.rocketmq.namesrv.NamesrvStartup

代码片段如下:

代码语言:java
复制
public static NamesrvController main0(String[] args) {

        try {
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

通过 main 方法启动 NameServer,分为两大步,先创建 NamesrvController,而后再初始化并启动 NamesrvController。

5.4 NameServer 启动时序图

NameServer启动时序图
NameServer启动时序图

6. 路由管理

6.1 路由注册

6.1.1 心跳请求

在 RocketMQ 中,默认情况下,Broker 服务器会每间隔 30 秒向集群中所有的 NameServer 发送心跳信息。

org.apache.rocketmq.broker.BrokerController#start

代码语言:java
复制
// 每隔30s发送心跳
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

NameServer 会接收到 Broker 发来的心跳包,DefaultRequestProcessor(默认请求处理器) 根据请求头中的code 码,执行处理心跳包的操作。

具体会调用 RouteInfoManager.registerBroker() 操作。其实 broker 注册路由过程,主要还是操作RouteInfoManager 中的多个映射表

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest

路由注册代码片段及时序图
路由注册代码片段及时序图

6.1.2 RouteInfoManager

RouteInfoManager
RouteInfoManager

RouteInfoManager:用于管理心跳信息以及路由管理

RouteInfoManager 管理的路由元数据:(元数据是什么?

① topicQueueTable:topic路由信息,broker名称,读队列数,写队列数等

② brokerAddrTable:broker信息,broker集群,broker名称,broker地址等

③ clusterAddrTable:broker集群信息,broker名称列表

④ brokerLiveTable:broker状态信息,broker存活最新上报时间

⑤ filterServerTable:filterServer列表,消息过滤信息

路由元信息Map结构示例如下:

6.2 路由删除

org.apache.rocketmq.namesrv.NamesrvController#initialize

代码语言:javascript
复制
// 每隔 10s 进行扫描判断是否存在失效的Broker,如果存在则移除失效broker
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker

代码语言:javascript
复制
public int scanNotActiveBroker() {
        int removeCount = 0;
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
                removeCount++;
            }
        }
        return removeCount;
    }

备注:Broker 视角触发路由删除,即 Broker 在正常关闭的情况下,会执行 unregisterBroker 指令这两种方式路由删除的方法都是一样的,都是从相关路由表中删除与该 broker 相关的信息。

6.3 路由发现

RocketMQ 的路由发现采用的是 Pull 模型,当 Topic 路由信息发生变化时,NameServer 不会主动推送给客户端,而是客户端定时拉取主题最新的路由,默认让客户端每 30 秒会拉取一次最新的路由。

Push、Pull、Long Polling 三种模型的优缺点?

路由发现源码片段
路由发现源码片段

7. 小结

7.1 回顾

这次源码剖析学到了什么?

本次分享很多不足的地方还请指教。

如下图所示:

框架示意图
框架示意图

7.2 源码学习的方法

本地调试法(启动类,Debug,QuickStart等)

7.3 NameServer 设计的亮点

  • 读写锁使用,Map 结构存储路由信息;
  • NameServer 之间不通信(AP,最终一致性);
  • NameServer 只管理 Broker 集群等信息。

8. 本地源码调试 —— Step by Step【附录】

(1)从 GitHub 上 clone 代码到本地

https://github.com/apache/rocketmq.git

(2)导入 IDEA

(3)执行 mvn clean install -DskipTests

(4)启动 NameServer

(5)启动 Broker

(6)启动 Producer

(7)启动 Consumer

Step By Step
Step By Step

8.1 启动 NameServer

org.apache.rocketmq.namesrv.NamesrvStartup#main

启动报错:请设置 ROCKET_HOME 环境变量
启动报错:请设置 ROCKET_HOME 环境变量

打开 【Edit Configuration】配置启动参数:

【Edit Configuration】配置
【Edit Configuration】配置
ROCKET_HOME 环境变量设置
ROCKET_HOME 环境变量设置

ROCKET_HOME 环境变量设置完成如下:

ROCKET_HOME 环境变量设置完成
ROCKET_HOME 环境变量设置完成

再次启动 NameServer

启动报错:在如图路径下未找到 logback_namesrv.xml 文件
启动报错:在如图路径下未找到 logback_namesrv.xml 文件

添加 logback_namesrv.xml 文件到指定文件路径下:

打开 logback_namesrv.xml 文件并替换 RocketMQ 运行路径

再次启动,出现“The Name Server boot success …”字样,表示启动成功。

8.2 启动 Broker

org.apache.rocketmq.broker.BrokerStartup#main

启动报错:请设置 ROCKET_HOME 环境变量
启动报错:请设置 ROCKET_HOME 环境变量

ROCKET_HOME 环境变量设置如下:

ROCKET_HOME 环境变量设置
ROCKET_HOME 环境变量设置

再次启动 NameServer

启动报错:在如图路径下未找到 logback_broker.xml 文件
启动报错:在如图路径下未找到 logback_broker.xml 文件

添加 logback_namesrv.xml 文件到指定文件路径下,报错:未找到 broker.conf 文件

打开 broker.conf 文件并添加配置信息

代码语言:javascript
复制
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

#追加配置如下:
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true

#nameServer 地址,分号分割
namesrvAddr=127.0.0.1:9876

#broker 地址
brokerIP1=10.2.67.197

#存储路径
storePathRootDir=D:/xxx/rocketmqserver/store
#commitLog 存储路径
storePathCommitLog=D:/xxx/rocketmqserver/store/commitlog
#消费队列存储路径
storePathConsumeQueue=D:/xxx/rocketmqserver/store/consumequeue
#消息索引存储路径
storePathIndex=D:/xxx/rocketmqserver/store/index
#checkpoint 文件存储路径
storeCheckpoint=D:/xxx/rocketmqserver/store/checkpoint
#abort 文件存储路径
abortFile=D:/xxx/rocketmqserver/store/abort

设置【Programma arguments】项目启动参数如下:

【Programma arguments】项目启动参数
【Programma arguments】项目启动参数

再次启动,出现“The broker[…] boot success …”字样,表示启动成功。

8.3 启动 Producer

org.apache.rocketmq.example.quickstart.Producer#main

8.4 启动 Consumer

org.apache.rocketmq.example.quickstart.Consumer#main

原文链接:https://mp.weixin.qq.com/s/94DaQVGvZwFQlSUvQmrL1A

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 为什么是 RocketMQ
  • 2. 使用场景
  • 3. 工作原理
  • 4. 演进过程
  • 5. 从 NameServer 起点
    • 5.1 RocketMQ 大脑 —— NameServer
      • 5.2 本地调试
        • 5.3 NamesrvStartup 启动类
          • 5.4 NameServer 启动时序图
          • 6. 路由管理
            • 6.1 路由注册
              • 6.1.1 心跳请求
              • 6.1.2 RouteInfoManager
            • 6.2 路由删除
              • 6.3 路由发现
              • 7. 小结
                • 7.1 回顾
                  • 7.2 源码学习的方法
                    • 7.3 NameServer 设计的亮点
                    • 8. 本地源码调试 —— Step by Step【附录】
                      • 8.1 启动 NameServer
                        • 8.2 启动 Broker
                          • 8.3 启动 Producer
                            • 8.4 启动 Consumer
                            相关产品与服务
                            文件存储
                            文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档