前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >海豚调度连接Zookeeper超时定位

海豚调度连接Zookeeper超时定位

作者头像
DH镔
发布2023-10-21 11:50:34
2.4K0
发布2023-10-21 11:50:34
举报
文章被收录于专栏:编程从踩坑到跳坑

# 海豚调度连接Zookeeper超时定位

# 过程

在本地启动海豚调度的服务,出现zookeeper connect timeout异常,但是检查zookeeper节点都是正常的。经过一轮分析,发现个大坑!!!

海豚调度的zookeeper配置信息:

代码语言:javascript
复制
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

registry.plugin.name=zookeeper
registry.servers=x.x.x.x:2181
registry.namespace=dolphinscheduler
registry.base.sleep.time.ms=60
registry.max.sleep.ms=300
registry.max.retries=5
registry.session.timeout.ms=30000
registry.connection.timeout.ms=7500
registry.block.until.connected.wait=600
registry.digest=

异常启动日志如下:

代码语言:javascript
复制
Caused by: org.apache.dolphinscheduler.registry.api.RegistryException: zookeeper connect timeout
	at org.apache.dolphinscheduler.plugin.registry.zookeeper.ZookeeperRegistry.start(ZookeeperRegistry.java:111) ~[classes/:na]
	at org.apache.dolphinscheduler.service.registry.RegistryClient.start(RegistryClient.java:291) ~[classes/:na]
	at org.apache.dolphinscheduler.service.registry.RegistryClient.afterConstruct(RegistryClient.java:81) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_332]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_332]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_332]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_332]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389) ~[spring-beans-5.3.12.jar:5.3.12]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333) ~[spring-beans-5.3.12.jar:5.3.12]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157) ~[spring-beans-5.3.12.jar:5.3.12]
	... 45 common frames omitted

在配置文件中其中一个关键的配置:registry.block.until.connected.wait,意思是curator等待zookeeper连接超时,超过这个时间的话,异常退出。

海豚调度连接zookeeper关键代码

代码语言:javascript
复制
    public void start(Map<String, String> config) {
        CuratorFrameworkFactory.Builder builder =
            CuratorFrameworkFactory.builder()
                                   .connectString(SERVERS.getParameterValue(config.get(SERVERS.getName())))
                                   .retryPolicy(buildRetryPolicy(config))
                                   .namespace(NAME_SPACE.getParameterValue(config.get(NAME_SPACE.getName())))
                                   .sessionTimeoutMs(SESSION_TIMEOUT_MS.getParameterValue(config.get(SESSION_TIMEOUT_MS.getName())))
                                   .connectionTimeoutMs(CONNECTION_TIMEOUT_MS.getParameterValue(config.get(CONNECTION_TIMEOUT_MS.getName())));

        String digest = DIGEST.getParameterValue(config.get(DIGEST.getName()));
        if (!Strings.isNullOrEmpty(digest)) {
            buildDigest(builder, digest);
        }
        client = builder.build();

        client.start();
        try {
            // 这行!!!等待连接,超出配置时间异常退出
            if (!client.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT_MS.getParameterValue(config.get(BLOCK_UNTIL_CONNECTED_WAIT_MS.getName())), MILLISECONDS)) {
                client.close();
                throw new RegistryException("zookeeper connect timeout");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RegistryException("zookeeper connect error", e);
        }
    }

那么,既然是因为等待超时导致的异常,尝试调整registry.block.until.connected.wait,从600ms修改到60000ms,也就60s,启动!!!等待大概20秒左右,正常启动了。真见鬼!!!这种情况下,我开始怀疑是不是网络有问题,开始抓包。

发现程序初始化完成,开始连接zookeeper,但迟迟没有发送tcp握手包,在这情况下,我开始怀疑代码的问题了。

通过分析zookeeper的源码,找出了慢的关键代码

代码语言:javascript
复制
        private void startConnect(InetSocketAddress addr) throws IOException {
            // initializing it for new connection
            saslLoginFailed = false;
            state = States.CONNECTING;

            // 在这!!!
            setName(getName().replaceAll("\\(.*\\)",
                    "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
            if (ZooKeeperSaslClient.isEnabled()) {
                try {
                    zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr));
                } catch (LoginException e) {
                    // An authentication error occurred when the SASL client tried to initialize:
                    // for Kerberos this means that the client failed to authenticate with the KDC.
                    // This is different from an authentication error that occurs during communication
                    // with the Zookeeper server, which is handled below.
                    LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
                      + "SASL authentication, if Zookeeper server allows it.");
                    eventThread.queueEvent(new WatchedEvent(
                      Watcher.Event.EventType.None,
                      Watcher.Event.KeeperState.AuthFailed, null));
                    saslLoginFailed = true;
                }
            }
            logStartConnect(addr);

            clientCnxnSocket.connect(addr);
        }

从上面的代码看到setName这个方法,作用是修改线程名称的,其中调用了addr.getHostName(),罪魁祸首就是这个!!!

java.net.InetSocketAddress#getHostName,作用是获取主机名称,如果在初始化这个对象的时候,传入的hostname是一个域名,那么可以直接返回,但是!!!!如果是一个ip地址的话,那问题就来了。

如果是一个ip地址,getHostName可以通过ip地址去反查域名,这个操作不常见,原理是通过DNS查询PTR记录,但一般来说DNS服务器是没有这个记录,所以查询不到,查询不到就会从本地的hosts文件找,在都找不到的情况下,这过程大概会阻塞个20秒,海豚设置的创建连接超时是600ms,所以必定失败。

解决方法:

  1. 如果要用ip的话,在DNS服务器添加该ip的PTR记录。但,这不是很现实
  2. 在本地的host文件中添加解析信息(推荐)

例子:

代码语言:javascript
复制
x.x.x.x my-zookeeper

然后~~~就正常了。问题到这里就解决了,所以在用zookeeper的时候,推荐使用域名的方式,不然就有可能出现连接zookeeper非常慢!!!

# 进阶

getHostName这个方法到底做了什么操作?

getHostName的源码中会调用到java.net.InetAddress#getHostFromNameService获取主机名称

代码语言:javascript
复制
    private static String getHostFromNameService(InetAddress addr, boolean check) {
        String host = null;
        for (NameService nameService : nameServices) {
            try {
                // first lookup the hostname
                // 关键代码在这
                host = nameService.getHostByAddr(addr.getAddress());

                /* check to see if calling code is allowed to know
                 * the hostname for this IP address, ie, connect to the host
                 */
                if (check) {
                    SecurityManager sec = System.getSecurityManager();
                    if (sec != null) {
                        sec.checkConnect(host, -1);
                    }
                }

                /* now get all the IP addresses for this hostname,
                 * and make sure one of them matches the original IP
                 * address. We do this to try and prevent spoofing.
                 */

                InetAddress[] arr = InetAddress.getAllByName0(host, check);
                boolean ok = false;

                if(arr != null) {
                    for(int i = 0; !ok && i < arr.length; i++) {
                        ok = addr.equals(arr[i]);
                    }
                }

                //XXX: if it looks a spoof just return the address?
                if (!ok) {
                    host = addr.getHostAddress();
                    return host;
                }

                break;

            } catch (SecurityException e) {
                host = addr.getHostAddress();
                break;
            } catch (UnknownHostException e) {
                host = addr.getHostAddress();
                // let next provider resolve the hostname
            }
        }

        return host;
    }

最终会调用到java.net.Inet4AddressImpl#getHostByAddr方法

代码语言:javascript
复制
public native String getHostByAddr(byte[] addr) throws UnknownHostException;

native方法,翻jdk源码,定位到一下代码

代码语言:javascript
复制
/*
 * Class:     java_net_Inet6AddressImpl
 * Method:    getHostByAddr
 * Signature: (I)Ljava/lang/String;
 */
JNIEXPORT jstring JNICALL
Java_java_net_Inet6AddressImpl_getHostByAddr(JNIEnv *env, jobject this,
                                            jbyteArray addrArray) {

    jstring ret = NULL;

#ifdef AF_INET6
    char host[NI_MAXHOST+1];
    int error = 0;
    int len = 0;
    jbyte caddr[16];

    struct sockaddr_in him4;
    struct sockaddr_in6 him6;
    struct sockaddr *sa;

    /*
     * For IPv4 addresses construct a sockaddr_in structure.
     */
    if ((*env)->GetArrayLength(env, addrArray) == 4) {
        jint addr;
        (*env)->GetByteArrayRegion(env, addrArray, 0, 4, caddr);
        addr = ((caddr[0]<<24) & 0xff000000);
        addr |= ((caddr[1] <<16) & 0xff0000);
        addr |= ((caddr[2] <<8) & 0xff00);
        addr |= (caddr[3] & 0xff);
        memset((void *) &him4, 0, sizeof(him4));
        him4.sin_addr.s_addr = (uint32_t) htonl(addr);
        him4.sin_family = AF_INET;
        sa = (struct sockaddr *) &him4;
        len = sizeof(him4);
    } else {
        /*
         * For IPv6 address construct a sockaddr_in6 structure.
         */
        (*env)->GetByteArrayRegion(env, addrArray, 0, 16, caddr);
        memset((void *) &him6, 0, sizeof(him6));
        memcpy((void *)&(him6.sin6_addr), caddr, sizeof(struct in6_addr) );
        him6.sin6_family = AF_INET6;
        sa = (struct sockaddr *) &him6 ;
        len = sizeof(him6) ;
    }

    // 关键代码在这。。。。
    error = getnameinfo(sa, len, host, NI_MAXHOST, NULL, 0,
                        NI_NAMEREQD);

    if (!error) {
        ret = (*env)->NewStringUTF(env, host);
    }
#endif /* AF_INET6 */

    if (ret == NULL) {
        JNU_ThrowByName(env, JNU_JAVANETPKG "UnknownHostException", NULL);
    }

    return ret;
}

在Linux中,getnameinfo是glibc的一个函数,作用就是获取主机信息

那么在glibc的代码中,关键的代码如下:

https://github.com/lattera/glibc/blob/master/inet/getnameinfo.copen in new window

代码语言:javascript
复制
/* Convert host name, AF_INET/AF_INET6 case, name only.  */
static int
gni_host_inet_name (struct scratch_buffer *tmpbuf,
		    const struct sockaddr *sa, socklen_t addrlen,
		    char *host, socklen_t hostlen, int flags)
{
  int herrno;
  struct hostent th;
  struct hostent *h = NULL;
  if (sa->sa_family == AF_INET6)
    {
      const struct sockaddr_in6 *sin6p = (const struct sockaddr_in6 *) sa;
      while (__gethostbyaddr_r (&sin6p->sin6_addr, sizeof(struct in6_addr),
				AF_INET6, &th, tmpbuf->data, tmpbuf->length,
				&h, &herrno))
	if (herrno == NETDB_INTERNAL && errno == ERANGE)
	  {
	    if (!scratch_buffer_grow (tmpbuf))
	      {
		__set_h_errno (herrno);
		return EAI_MEMORY;
	      }
	  }
	else
	  break;
    }
  else
    {
      const struct sockaddr_in *sinp = (const struct sockaddr_in *) sa;
      // 关键代码在这
      while (__gethostbyaddr_r (&sinp->sin_addr, sizeof(struct in_addr),
				AF_INET, &th, tmpbuf->data, tmpbuf->length,
				&h, &herrno))
	if (herrno == NETDB_INTERNAL && errno == ERANGE)
	    {
	      if (!scratch_buffer_grow (tmpbuf))
		{
		  __set_h_errno (herrno);
		  return EAI_MEMORY;
		}
	    }
	else
	  break;
    }

好了,不追下去了,看到while语句块,大概是在这里阻塞了10几20秒。。。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • # 海豚调度连接Zookeeper超时定位
    • # 过程
      • # 进阶
      相关产品与服务
      云服务器
      云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档