前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ignite 2.11.0 节点发现原理及源码分析

ignite 2.11.0 节点发现原理及源码分析

作者头像
lovelife110
发布2021-12-08 17:17:40
6940
发布2021-12-08 17:17:40
举报
文章被收录于专栏:爱生活爱编程

ignite 2.11.0 节点发现原理

节点发现介绍

发现机制的主要目标是创建 Ignite 节点的拓扑结构,并在每个节点上构建并维护一致的内存视图。 例如,此视图包含集群中的节点数及节点顺序。

发现机制由 DiscoverySpi 接口表示,默认实现是TcpDiscoverySpi。 其他实现,如 ZookeeperDiscoverySpi,本文重点描述的TcpDiscoverySpi。

拓扑结构由特定的 DiscoverySpi 实现定义,例如,TcpDiscoverySpi 定义了环形拓扑。

在描述集群拓扑时,我们谈论的是仅存在于“发现”级别的逻辑布局。 例如,当查询驻留在缓存中的数据时,集群可能使用与本文描述的拓扑不同的拓扑。

环形拓扑

TcpDiscoverySpi 将集群的所有服务器节点组织成环形结构,其中每个节点只能向单个节点(称为“邻居”)发送发现消息。客户端节点位于环之外,并连接到一个服务端。 此代码逻辑分别包含在服务器节点ServerImpl类和客户端节点ClientImpl 类中。

节点加入过程

概述

当一个新节点启动时,它尝试通过探测TcpDiscoveryIpFinder提供的地址列表来找到一个现有的集群。如果所有地址都不可用,则节点认为自己是唯一的节点,从自己形成一个集群,并成为此集群的协调者。否则,将执行如下节点加入过程。

节点加入过程包括以下几个阶段:

  1. 加入节点向集群中的随机节点发送TcpDiscoveryJoinRequestMessage,该节点会把消息被转发到协调器。
  2. 协调器将新节点放置在最后一个节点和自身之间,并通过在环中发送TcpDiscoveryNodeAddedMessage来传播拓扑更改消息。
  3. 在集群的所有成员接收到TcpDiscoveryNodeAddedMessage 之后,发送TcpDiscoveryNodeAddFinishedMessage 来完成更改。

创建连接

客户端创建连接代码跟踪如下:

代码语言:javascript
复制
 Ignite ignite = Ignition.start(cfg);//客户端启动代码 -->IgnitionEx$IgniteNamedInstance grid0.start(2112行) --> … -->ClientImpl spiStart方法启动IgniteSpiThread线程
IgniteSpiThread线程,run:58, IgniteSpiThread, body();方法–>…–>joinTopology:629, ClientImpl–>sendJoinRequest:734, ClientImpl–>…–>TcpDiscoverySpi opensocket()方法,openSocket:1592, TcpDiscoverySpi

opensocket方法里,创建socket连接,并发0x00004747到服务端

代码语言:javascript
复制
    /** Network packet header. */
    public static final byte[] IGNITE_HEADER = intToBytes(0x00004747);

    protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
        throws IOException, IgniteSpiOperationTimeoutException {
            ...
            writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
            ...

Wireshark抓包如下:

TcpDiscoveryJoinRequestMessage

节点通过调用 ServerImpl#joinTopology(对于服务器节点)或 ClientImpl#joinTopology(对于客户端节点)开始节点加入过程,然后调用 TcpDiscoverySpi#collectExchangeData 来收集所有必要的 discovery data(例如来自 GridCacheProcessor 的缓存配置,请参阅不同的 GridComponent# collectJoiningNodeData 实现)。该数据被打包到加入请求(TcpDiscoveryJoinRequestMessage)中并发送给协调器。

客户端ClientImpl作为代码示例: ClientImpl类,joinTopology方法里调用sendJoinRequest方法,如下 (代码路径:run:58, IgniteSpiThread–>body:317, ClientImpl–>…–>tryJoin:2108, ClientImpl$MessageWorker–>…–>joinTopology:629, ClientImpl–>sendJoinRequest, ClientImpl)

代码语言:javascript
复制
    @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon,
        InetSocketAddress addr) {
                ...
                // 向服务端发送TcpDiscoveryHandshakeRequest 消息
                TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
                req.client(true);
                spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
                  ...       
                    // collectExchangeData 来收集所有必要的discovery data
                    if (discoveryData == null)
                        discoveryData = spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId()));
                    // 该数据被打包到加入请求中并发送给协调器
                    TcpDiscoveryJoinRequestMessage joinReqMsg = new TcpDiscoveryJoinRequestMessage(node, discoveryData);

discovery data结构如下:

当协调器收到请求时,它会验证消息并生成 TcpDiscoveryNodeAddedMessage,如果验证成功(请参阅 ServerImpl.RingMessageWorker#processJoinRequestMessage)。这条信息随后通过环发送出去。

服务端ClientImpl 处理TcpDiscoveryJoinRequestMessage代码示例:

代码语言:javascript
复制
        @Override protected void body() throws InterruptedException {
                       ...
                        else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
                            TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;

                            if (!req.responded()) {
                                boolean ok = processJoinRequestMessage(req, clientMsgWrk);

                                if (clientMsgWrk != null && ok)
                                    continue;
                                else
                                    // Direct join request - no need to handle this socket anymore.
                                    break;
                            }
                        }
                        ...

生成 TcpDiscoveryNodeAddedMessage代码示例:

代码语言:javascript
复制
        private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg) {
                ...
                //生成 TcpDiscoveryNodeAddedMessage
                TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
                    node, data, spi.gridStartTime);

                nodeAddedMsg = tracing.messages().branch(nodeAddedMsg, msg);

                nodeAddedMsg.client(msg.client());

                processNodeAddedMessage(nodeAddedMsg);

                tracing.messages().finishProcessing(nodeAddedMsg);
                ...

TcpDiscoveryNodeAddedMessage

在处理 TcpDiscoveryNodeAddedMessage 时,集群中的每个节点将加入节点的discovery data应用于component,收集其本地discovery data,并将其添加到消息中(详情请参阅ServerImpl.RingMessageWorker#processNodeAddedMessage)。然后通过调用ServerImpl.RingMessageWorker#sendMessageAcrossRing 将消息进一步传播到环上。 当 TcpDiscoveryNodeAddedMessage 完成整个循环并再次到达协调器时,它将被协调器消费并且协调器发出TcpDiscoveryNodeAddFinishedMessage消息。

TcpDiscoveryNodeAddedMessage 也被传递到加入节点,它在所有其他节点已经处理它之后,在最后接收消息。

代码示例:

代码语言:javascript
复制
        private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
       				 ...
                    DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
      				  ...          
                    if (dataPacket.hasJoiningNodeData()) {
                        if (spiState == CONNECTED) {
                            // Node already connected to the cluster can apply joining nodes' disco data immediately
                            //加入节点的discovery data应用于component
                            spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));

                            if (!node.isDaemon())
                                spi.collectExchangeData(dataPacket);
                        } 
 					...

TcpDiscoveryNodeAddFinishedMessage

TcpDiscoveryNodeAddFinishedMessage 完成节点加入过程。 收到此消息时,每个节点都会触发 NODE_JOINED 事件以通知discovery manager有关新加入的节点的信息。

NodeAddFinished 和额外加入请求 如果加入节点没有及时收到TcpDiscoveryNodeAddFinishedMessage,将发送一个额外的加入请求。 该时间由 TcpDiscoverySpi#networkTimeout 定义,默认值为 5 秒 (TcpDiscoverySpi#DFLT_NETWORK_TIMEOUT)。

代码语言:javascript
复制
        private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
                ...
                if (state == CONNECTED) {
                //触发 NODE_JOINED 事件
                    boolean notified = notifyDiscovery(EVT_NODE_JOINED, topVer, node, msg.spanContainer());
                ...
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/10/28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ignite 2.11.0 节点发现原理
  • 节点发现介绍
  • 环形拓扑
  • 节点加入过程
    • 概述
      • 创建连接
        • TcpDiscoveryJoinRequestMessage
          • TcpDiscoveryNodeAddedMessage
            • TcpDiscoveryNodeAddFinishedMessage
            相关产品与服务
            云服务器
            云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档