发现机制的主要目标是创建 Ignite 节点的拓扑结构,并在每个节点上构建并维护一致的内存视图。 例如,此视图包含集群中的节点数及节点顺序。
发现机制由 DiscoverySpi 接口表示,默认实现是TcpDiscoverySpi。 其他实现,如 ZookeeperDiscoverySpi,本文重点描述的TcpDiscoverySpi。
拓扑结构由特定的 DiscoverySpi 实现定义,例如,TcpDiscoverySpi 定义了环形拓扑。
在描述集群拓扑时,我们谈论的是仅存在于“发现”级别的逻辑布局。 例如,当查询驻留在缓存中的数据时,集群可能使用与本文描述的拓扑不同的拓扑。
TcpDiscoverySpi 将集群的所有服务器节点组织成环形结构,其中每个节点只能向单个节点(称为“邻居”)发送发现消息。客户端节点位于环之外,并连接到一个服务端。 此代码逻辑分别包含在服务器节点ServerImpl类和客户端节点ClientImpl 类中。
当一个新节点启动时,它尝试通过探测TcpDiscoveryIpFinder提供的地址列表来找到一个现有的集群。如果所有地址都不可用,则节点认为自己是唯一的节点,从自己形成一个集群,并成为此集群的协调者。否则,将执行如下节点加入过程。
节点加入过程包括以下几个阶段:
客户端创建连接代码跟踪如下:
Ignite ignite = Ignition.start(cfg);//客户端启动代码 -->IgnitionEx$IgniteNamedInstance grid0.start(2112行) --> … -->ClientImpl spiStart方法启动IgniteSpiThread线程
IgniteSpiThread线程,run:58, IgniteSpiThread, body();方法–>…–>joinTopology:629, ClientImpl–>sendJoinRequest:734, ClientImpl–>…–>TcpDiscoverySpi opensocket()方法,openSocket:1592, TcpDiscoverySpi
opensocket方法里,创建socket连接,并发0x00004747到服务端
/** Network packet header. */
public static final byte[] IGNITE_HEADER = intToBytes(0x00004747);
protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
throws IOException, IgniteSpiOperationTimeoutException {
...
writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
...
Wireshark抓包如下:
节点通过调用 ServerImpl#joinTopology(对于服务器节点)或 ClientImpl#joinTopology(对于客户端节点)开始节点加入过程,然后调用 TcpDiscoverySpi#collectExchangeData 来收集所有必要的 discovery data(例如来自 GridCacheProcessor 的缓存配置,请参阅不同的 GridComponent# collectJoiningNodeData 实现)。该数据被打包到加入请求(TcpDiscoveryJoinRequestMessage)中并发送给协调器。
以客户端ClientImpl作为代码示例: ClientImpl类,joinTopology方法里调用sendJoinRequest方法,如下 (代码路径:run:58, IgniteSpiThread–>body:317, ClientImpl–>…–>tryJoin:2108, ClientImpl$MessageWorker–>…–>joinTopology:629, ClientImpl–>sendJoinRequest, ClientImpl)
@Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon,
InetSocketAddress addr) {
...
// 向服务端发送TcpDiscoveryHandshakeRequest 消息
TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
req.client(true);
spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
...
// collectExchangeData 来收集所有必要的discovery data
if (discoveryData == null)
discoveryData = spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId()));
// 该数据被打包到加入请求中并发送给协调器
TcpDiscoveryJoinRequestMessage joinReqMsg = new TcpDiscoveryJoinRequestMessage(node, discoveryData);
discovery data结构如下:
当协调器收到请求时,它会验证消息并生成 TcpDiscoveryNodeAddedMessage,如果验证成功(请参阅 ServerImpl.RingMessageWorker#processJoinRequestMessage)。这条信息随后通过环发送出去。
服务端ClientImpl 处理TcpDiscoveryJoinRequestMessage代码示例:
@Override protected void body() throws InterruptedException {
...
else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
if (!req.responded()) {
boolean ok = processJoinRequestMessage(req, clientMsgWrk);
if (clientMsgWrk != null && ok)
continue;
else
// Direct join request - no need to handle this socket anymore.
break;
}
}
...
生成 TcpDiscoveryNodeAddedMessage代码示例:
private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg) {
...
//生成 TcpDiscoveryNodeAddedMessage
TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
node, data, spi.gridStartTime);
nodeAddedMsg = tracing.messages().branch(nodeAddedMsg, msg);
nodeAddedMsg.client(msg.client());
processNodeAddedMessage(nodeAddedMsg);
tracing.messages().finishProcessing(nodeAddedMsg);
...
在处理 TcpDiscoveryNodeAddedMessage 时,集群中的每个节点将加入节点的discovery data应用于component,收集其本地discovery data,并将其添加到消息中(详情请参阅ServerImpl.RingMessageWorker#processNodeAddedMessage)。然后通过调用ServerImpl.RingMessageWorker#sendMessageAcrossRing 将消息进一步传播到环上。 当 TcpDiscoveryNodeAddedMessage 完成整个循环并再次到达协调器时,它将被协调器消费并且协调器发出TcpDiscoveryNodeAddFinishedMessage消息。
TcpDiscoveryNodeAddedMessage 也被传递到加入节点,它在所有其他节点已经处理它之后,在最后接收消息。
代码示例:
private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
...
DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
...
if (dataPacket.hasJoiningNodeData()) {
if (spiState == CONNECTED) {
// Node already connected to the cluster can apply joining nodes' disco data immediately
//加入节点的discovery data应用于component
spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
if (!node.isDaemon())
spi.collectExchangeData(dataPacket);
}
...
TcpDiscoveryNodeAddFinishedMessage 完成节点加入过程。 收到此消息时,每个节点都会触发 NODE_JOINED 事件以通知discovery manager有关新加入的节点的信息。
NodeAddFinished 和额外加入请求 如果加入节点没有及时收到TcpDiscoveryNodeAddFinishedMessage,将发送一个额外的加入请求。 该时间由 TcpDiscoverySpi#networkTimeout 定义,默认值为 5 秒 (TcpDiscoverySpi#DFLT_NETWORK_TIMEOUT)。
private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
...
if (state == CONNECTED) {
//触发 NODE_JOINED 事件
boolean notified = notifyDiscovery(EVT_NODE_JOINED, topVer, node, msg.spanContainer());
...