vivo 互联网服务器团队- Li Wanghong
本文通过分析Dubbo中ZooKeeper注册中心的实现ZooKeeperResitry的继承体系结构,自顶向下分析了AbstractRegistry(提供了服务数据的本地缓存)、FailbackRegistry(服务注册订阅相关的异常重试)、CacheableFailbackRegistry(Dubbo在URL推送模型做的优化)、ZooKeeperRegistry(ZooKeeper注册中心实现原理)的源码,详细介绍了Dubbo中ZooKeeper注册中心的实现原理。
当服务提供者启动时,服务提供者将自己提供的服务信息注册到注册中心,注册中心将这些信息记录下来。服务消费者启动时,向注册中心发起订阅,将自己感兴趣的服务提供者的信息拉取到本地并缓存起来,用于后续远程调用。另外,注册中心能够感知到服务提供者新增或者下线,并更新自己的服务信息,同时通知服务消费者告知服务提供者新增/下线,从而消费者也可以动态感知到服务提供者的变化。
ZooKeeper 是 Apache 的顶级项目。ZooKeeper 为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名服务、配置管理和分布式锁等分布式的基础服务。在解决分布式数据一致性方面,ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议。
ZooKeeper 的数据模型是一个树形结构的文件系统。Dubbo服务在ZooKeeper中的数据结构(旧版模型,Dubbo2.7版本后路由数据、配置数据不再写到该目录下)如下图所示。
ZooKeeper中的节点分为持久节点、持久顺序节点、临时节点、临时顺序节点。Dubbo使用ZooKeeper作为注册中心时,不关心节点的创建顺序,只会创建持久节点和临时节点。
客户端和服务器维持数据交互通常有两种形式:
ZooKeeper采用的是方式2,主动向客户端推送数据。客户端注册监听它关心的节点(注册Watcher监听指定节点),当节点状态发生变化(数据变化、子节点增减变化)时,则相应的Watcher被触发,ZooKeeper 服务会通知客户端,这就是Watcher机制。
Watcher有以下特点:
ZooKeeper 客户端通过 TCP 长连接连接到 ZooKeeper 服务集群。会话 (Session) 从第一次连接开始就已经建立,之后通过心跳检测机制来保持有效的会话状态。通过这个连接,客户端可以发送请求并接收响应,同时也可以接收到 Watch 事件的通知。一旦一台客户端与一台服务器建立连接,这台服务器会为这个客户端创建一个新的会话。
每个会话都会有一个超时时间。若由于服务器压力过大、网络故障等各种原因导致客户端连接断开时,只要在会话超时时间之内能够重新连接上ZooKeeper服务器,那么之前创建的会话仍然有效。若服务器在超时时间内没有收到任何请求,则相应会话被视为过期。一旦会话过期,就无法再重新打开,且任何与该会话相关的临时 节点都会被删除。
通常来说,会话应该长期存在,而这需要由客户端来保证。客户端可以通过心跳方式来保持会话不过期。
如下图所示,服务提供者(集成了ZK客户端)在服务启动时,会通过ZK客户端与ZK服务端建立连接,将服务提供者信息(提供者的IP地址、端口、服务接口信息等)注册到ZooKeeper服务端,这时会在ZooKeeper服务端生成一个临时节点来存储这些服务信息,这就是服务提供者的注册操作。
服务消费者(集成了ZK客户端)在服务启动时,会通过ZK客户端与ZK服务端建立连接,拉取自己感兴趣的服务提供者信息并缓存到本地,同时也会对自己感兴趣的节点(比如服务提供节点)注册Watcher监听。后续发起远程调用时,会从本地缓存的服务提供者信息通过负载均衡等策略选择一台服务提供者发起服务调用。
如果服务提供者扩容新增了机器,服务提供者会向ZK发起新的注册操作,在对应的目录下创建临时节点(代表这台新增的服务提供者信息),同时会触发之前服务消费者注册的Watcher监听,ZooKeeper服务端会将变更信息推送到服务消费,服务消费者在接收到变更后会更新自己本地的服务提供者信息,这样就完成了服务的自动扩容。同样的,当某台服务提供者下线,它与ZooKeeper服务端的连接会断掉,因为服务注册时创建的是临时节点,因此当连接断掉后,该临时节点会被删除,同时会触发之前服务消费者注册的Watcher监听,相应的服务消费者会收到通知刷新自己本地的服务提供者信息缓存。
Node,Dubbo中用Node来表示抽象节点,Dubbo中的核心概念Registry、Invoker等都实现了接口Node。
RegistryService定义了注册服务需要提供的基本能力,即增删改查。Registry接口继承了Node和Registry接口,它表示拥有注册中心能力的节点。其中定义了4个默认方法,其中的reExportRegister和reExportUnRegiser方法都是委托给RegisterService的相应方法。
AbstractRegistry实现了Registry接口中的方法,它在内存中实现了注册数据的读写改动,实现了本地缓存的功能。
FailbackRegistry继承了AbstractRegistry,结合时间轮,提供了失败重试的能力。
CacheableFailbackRegistry针对URL推送模型做了优化,减少了URL的创建。
ZooKeeperRegistry提供了基于ZooKeeper的注册中心实现。
下面重点分析AbstractRegistry、FailbackRegistry、CacheableFailbackRegistry和ZooKeeperRegistry。
AbstractRegistry实现了Registry接口中的方法,它在内存中实现了注册数据的读写改动,从而可以就降低注册中心的压力。从前文继承体系结构可以看出,Dubbo中的注册中心实现都继承了该类。
AbstractRegistry的核心是通过Properties类型的properties与File类型的file字段来记录服务数据。properties基于内存存储了服务相关的数据,file则对应磁盘文件,两者的数据是同步的。在创建AbstractRegistry对象时,会根据传入的URL中的file.cache参数来决定是否开启本地缓存,默认开启。
如果开启,会将磁盘文件file中的数据加载到properties当中。当properties中的数据发生变化时,会同步到磁盘文件file中。如果传入的URL的save.file参数为false(默认是false),会通过线程池来异步同步properties的数据到file,如果是true则是在当前线程同步。
FailbackRegistry处理的是注册中心相关的逻辑处理异常时如何处理,它会使用时间轮来处理异常重试。像ZooKeeperRegistry、NacosRegistry等注册中心实现,都继承了FailbackRegistry,因此也都有了异常重试的能力。
FailbackRegistry继承了AbstractRegistry,复写了register/unregister、subscribe/unsubscribe、notify这5个核心方法,在这些方法中首先会调用父类AbstractRegistry对应的方法,然后真正的注册订阅等逻辑还是通过模板方法模式委托给了子类实现,重试逻辑则交由时间轮来处理。
FailbackRegistry的核心字段如下:
1)ConcurrentMap < URL,FailedRegisteredTask > failedRegistered,注册失败的URL集合。key是注册失败的URL,value是FailedRegisteredTask,也就是重试注册时要执行的逻辑,该注册重试逻辑会交给时间轮执行。当注册失败时,会调用方法addFailedRegistered添加注册失败的URL。
2)ConcurrentMap < URL,FailedUnregisteredTask > failedUnregistered,取消注册时发生失败的URL集合。key是取消注册失败的URL,value是FailedUnregisteredTask,也就是重试取消注册时要执行的逻辑。
3)ConcurrentMap < Holder,FailedSubscribedTask > failedSubscribed,订阅失败的URL集合。key是Holder(封装了订阅失败的URL以及对应的监听器NotifyListener),value是FailedSubscribedTask,也就是重试订阅时要执行的逻辑。
4)ConcurrentMap < Holder,FailedUnsubscribedTask >failedUnsubscribed,取消订阅发生失败的URL集合。key是Holder(封装了取消订阅失败的URL以及对应的监听器NotifyListener),value是FailedUnsubscribedTask,也就是重试取消订阅时要执行的逻辑。
5)int retryPeriod,重试操作时间间隔。
6)HashedWheelTimer retryTimer,时间轮,用于执行重试操作。
以subscribe为例来分析FailbackRegistry中是如何处理重试的(其余方法类似)。如下图所示,首先FailbackRegistry的subscribe方法会调用父类AbstractRegistry的subcribe方法,将订阅数据添加到内存中进行维护,接着会从订阅失败/取消订阅失败的集合中移除该URL相关的订阅数据。
然后调用子类的doSubscribe方法将真正的订阅逻辑交给子类实现,这是典型的模板方法设计模式。如果发生异常,会调用getCacheUrls方法获取缓存的服务提供者数据。如果缓存的服务数据非空,因为这边订阅失败了,所以需要手动触发下notify方法,回调相关的NotifyListener方法,刷新消费者本地的服务提供者列表、路由、配置的数据。如果缓存数据为空,则需要判断下,是否检测订阅失败以及订阅失败的异常是否可以跳过,根据这个来判断是否需要抛出异常还是忽略仅打印日志。
最后会调用addFailedSubscribed方法将订阅失败的信息添加到failedSubscribed集合,以及将任务添加到时间轮中,这样当时间到了,时间轮就可以处理该重试任务了。
这边有一点需要注意,如果任务已经存在failedSubscribed集合中,就不需要重复添加了。failedSubscribed是Map结构,通过key来判断数据是否存在,因此这边的Holder作为key时,需要复写hashCode和equals方法。
@Override
public void subscribe(URL url, NotifyListener listener) {
// 调用父类方法, 在内存中记录下订阅相关的数据
super.subscribe(url, listener);
// 从订阅失败集合、取消订阅失败集合中移除该URL相关的数据, 如果存在需要取消相关任务的执行
removeFailedSubscribed(url, listener);
try {
// 模板方法设计模式, 委托给子类实现真正的订阅逻辑
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
// 订阅发生了异常, 则使用本地缓存的服务提供者数据
List<URL> urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
// 调用notify方法回调相关的listener方法, 通知消费者刷新自己本地
// 的服务提供者、路由、配置等数据
notify(url, listener, urls);
} else {
// 根据check以及是否是可忽略的异常来判断是否需要抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
// 抛异常
} else {
// 仅打印错误日志
}
}
// 添加订阅失败的URL信息
addFailedSubscribed(url, listener);
}
}
protected void addFailedSubscribed(URL url, NotifyListener listener) {
// 封装成Holder
Holder h = new Holder(url, listener);
// 判断该Holder是否已经存在, 如果已经存在则直接返回, 不需要多次添加
FailedSubscribedTask oldOne = failedSubscribed.get(h);
if (oldOne != null) {
return;
}
// 创建订阅失败的重试任务
FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener);
// 向订阅失败的集合里面添加该任务, 如果返回null说明之前不存在该任务,
// 这次需要往时间轮中注册该任务
oldOne = failedSubscribed.putIfAbsent(h, newTask);
if (oldOne == null) {
// 将该任务添加到时间轮中
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
在2.7.8版本中并没有CacheableFailbackRegistry这个类。在Dubbo3.0中,针对服务数据的推送做了一系列的优化,CacheableFailbackRegistry正是其中一个改动,下面来进行具体的讲解。
下图所示是Dubbo2.7.8中的URL推送模型,消费者启动后会向ZooKeeper服务端订阅感兴趣的服务,当有新的消费者感兴趣的服务提供者节点(提供者3)加入服务时,该服务(提供者3)会把自己的服务信息注册到ZooKeeper服务端,接着ZooKeeper服务端会把providers节点下的所有服务实例信息(提供者1、2、3)全量推送给消费者,消费者收到后根据推送的数据全量生成URL列表,该URL列表后续会被消费者用来构建自己本地的可选的服务提供者列表数据,后续发起远程调用时,就可以从该服务提供者列表中选择一个节点。
可以看到,当服务提供者实例数量较小时,推送的数据量较小,消费者端构建URL的压力就小一些,但是当某个接口有大量的服务提供者时,当发生服务扩容/缩容,就会有大量的URL被创建。Dubbo3.0中的对该URL推送模型做了一系列的优化,主要是对URL的类结构做了一定调整,引入了多级缓存,下面具体分析。
如下两图分别是Dubbo2.7.8和Dubbo3.0.7中的URL结构,可以看到在3.0.7中新增了URLAddress和URLParam类,原本URL中的host、port等信息移到了URLAddress中,原本URL的参数信息则移到了URLParam中。为什么要这么做?很大程度上是为了利用缓存,避免重复创建URL对象。比如将URL中的host、port等信息抽取到URLAddress对象中,当发生服务数据变更推送时,根据host、port等信息很大程度上能够从缓存中找到已有的URLAddress对象,这样就避免了一些不必要的URL创建。
如下图所示,当消费者接收到ZooKeeper注册中心推送过来的全量服务提供者列表信息(providers)时,会调用到该方法进行ServiceAddressURL列表的创建。首先,会根据消费者URL尝试从缓存stringUrls中获取服务提供者信息对应的Map(key是服务提供者信息的字符串表示,value是对应的ServiceAddressURL)。如果该Map为空,则providers对应的ServiceAddressURL都需要创建。如果Map不为空,则先从该Map中根据provider的字符串key尝试获取缓存,如果存在则不需要调用createURL创建。
createURL也是依次从缓存stringAddress和stringParam获取对应的URLAddress和URLParam,如果缓存中不存在则创建。接着,利用刚刚获取到的URLAddress和URLParam以及consumerURL创建ServiceAddressURL。
创建完成后会判断该ServiceAddressURL是否和当前消费者匹配(version、group这些信息是否匹配,前文已经分析过)。如果不匹配则返回空,后续也不会被更新到stringUrls缓存中。
创建完成后会调用evictURLCache,将待清理的URL信息放入waitForRemove中,cacheRemovalScheduler会定时清理缓存数据。
protected List<URL> toUrlsWithoutEmpty(URL consumer, Collection<String> providers) {
// 根据消费者URL信息从stringUrls中获取对应的服务提供者信息
Map<String, ServiceAddressURL> oldURLs = stringUrls.get(consumer);
// create new urls
Map<String, ServiceAddressURL> newURLs;
// 去除consumerURL中的release、dubbo、methods、时间戳、dubbo.tag这些参数
URL copyOfConsumer = removeParamsFromConsumer(consumer);
// 如果缓存中没有, 肯定都是需要新建的
if (oldURLs == null) {
newURLs = new HashMap<>((int) (providers.size() / 0.75f + 1));
for (String rawProvider : providers) {
// 去除掉rawProvider中的时间戳、PID参数
rawProvider = stripOffVariableKeys(rawProvider);
// 调用createURL创建ServiceAddressURL
ServiceAddressURL cachedURL =
createURL(rawProvider, copyOfConsumer, getExtraParameters());
// 如果创建的为空则忽略不放入newURLs
if (cachedURL == null) {
continue;
}
newURLs.put(rawProvider, cachedURL);
}
} else {
newURLs = new HashMap<>((int) (providers.size() / 0.75f + 1));
// maybe only default , or "env" + default
for (String rawProvider : providers) {
rawProvider = stripOffVariableKeys(rawProvider);
// 从缓存中获取, 如果缓存中存在就不需要再创建了
ServiceAddressURL cachedURL = oldURLs.remove(rawProvider);
if (cachedURL == null) {
cachedURL = createURL(rawProvider, copyOfConsumer, getExtraParameters());
if (cachedURL == null) {
continue;
}
}
newURLs.put(rawProvider, cachedURL);
}
}
// 清除老的缓存数据
evictURLCache(consumer);
// 更新stringUrls缓存
stringUrls.put(consumer, newURLs);
return new ArrayList<>(newURLs.values());
}
protected ServiceAddressURL createURL(String rawProvider, URL consumerURL,
Map<String, String> extraParameters) {
boolean encoded = true;
int paramStartIdx = rawProvider.indexOf(ENCODED_QUESTION_MARK);
if (paramStartIdx == -1) {
encoded = false;
}
// 解析rawProvider, 一分为二, 一个用来创建URLAddress、一个用来创建URLParam
String[] parts = URLStrParser.parseRawURLToArrays(rawProvider, paramStartIdx);
if (parts.length <= 1) {
return DubboServiceAddressURL.valueOf(rawProvider, consumerURL);
}
String rawAddress = parts[0];
String rawParams = parts[1];
boolean isEncoded = encoded;
// 从缓存stringAddress缓存中获取URLAddress, 不存在则创建
URLAddress address =
stringAddress.computeIfAbsent(rawAddress,
k -> URLAddress.parse(k, getDefaultURLProtocol(), isEncoded));
address.setTimestamp(System.currentTimeMillis());
// 从缓存stringParam缓存中获取URLParam, 不存在则创建
URLParam param = stringParam.computeIfAbsent(rawParams,
k -> URLParam.parse(k, isEncoded, extraParameters));
param.setTimestamp(System.currentTimeMillis());
// 用获取到的URLAddress、URLParam直接new创建ServiceAddressURL
ServiceAddressURL cachedURL = createServiceURL(address, param, consumerURL);
// 判断创建出来的ServiceAddressURL是否和当前消费者匹配, 不匹配返回null
if (isMatch(consumerURL, cachedURL)) {
return cachedURL;
}
return null;
}
根据传入的URL生成该节点要在ZooKeeper服务端上注册的节点路径,值为如下形式:/dubbo/org.apache.dubbo.springboot.demo.DemoService/providers/服务信息,/dubbo是根路径,接下来是服务的接口名,然后/providers是类型信息(如果是消费者则是/consumers节点,路由信息则是/routers),最后是URL的字符串表示。得到节点路径后,会根据URL的dynamic参数(默认是true)决定在ZooKeeper服务端上创建的是临时节点,还是持久节点,默认是临时节点。
注册后数据结构如下图所示。
订阅的核心是通过ZooKeeperClient在指定的节点的添加ChildListener,当该节点的子节点数据发生变化时,ZooKeeper服务端会通知到该ChildListener的notify方法,然后调用到对应的NotifyListener方法,刷新消费者本地的服务提供者列表等信息。
doSubscribe方法主要分为两个分支:URL的interface参数明确指定了为*(订阅所有,通常实际使用中不会这么使用,Dubbo的控制后台会订阅所有)或者就订阅某个服务接口,接下来分析订阅某个指定服务接口这个分支代码。这块涉及到三个监听器(它们是一对一的):
1)NotifyListener,Dubbo中定义的通用的订阅相关的监听器。它是定义在dubbo-registry-api模块中的,不仅仅在ZooKeeper注册中心模块中使用。
2)ChildListener,Dubbo中定义的针对ZooKeeper注册中心的监听器,用来监听指定节点子节点的数据变化。
3)CuratorWatcher,Curator框架中的监听器,Curator是常用的ZooKeeper客户端,如果Dubbo采用其它ZooKeeper客户端工具,这块就是其它相关的监听器逻辑了。
当订阅的服务数据发生变化时,最先会触发到CuratorWatcher的process方法,process方法中会调用ChildListener的childChanged方法,在childChanged方法会继续触发调用到ZooKeeperRegistry的notify方法。这里有两点需要注意:
1)因为doSubscribe方法中通过ZooKeeperClient添加Watcher监听器时,使用的是usingWatcher,这是一次性的,所以在CuratorWatcher的实现CuratorWatcherImpl的process方法中,当收到ZooKeeper的变更数据推送时,会再次在path上注册Watcher。
2)在ChildListener的实现RegistryChildListenerImpl的doNotify方法中,会调用ZooKeeperRegistry的toUrlsWithEmpty将传入的字符串形式的服务提供者列表等数据转换成对应的ServiceAddressURL列表数据,以供后面使用。
明确了三个监听器的含义之后,接下来分析doSubscribe的逻辑就简单了。首先会调用toCategoriesPath方法获取三个path路径,分别是
表示当前消费者(url参数代表)需要订阅这三个节点,当任意一个节点的数据发生变化时,ZooKeeper服务端都会通知到当前注册中心客户端,更新本地的服务数据。依次遍历这三个path路径,分别在这三个path上注册监听器。
ZooKeeperRegistry通过zkListeners这个Map维护了所有消费者(URL)的所有监听器数据,首先根据url参数获取当前消费者对应的监听器listeners,这是一个Map,key是NotifyListener,value是对应的ChildListener,如果不存在则需要创建ChildListener的实现RegistryChildListenerImpl。当创建完成后,会在ZooKeeper服务端创建持久化的path路径,并且在该path路径上注册监听器。首次订阅注册监听器时,会获取到该路径下的所有服务数据的字符串形式,并调用toUrlsWithEmpty转成URL形式,接着调用notify方法刷新消费者本地的服务相关数据。
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
CountDownLatch latch = new CountDownLatch(1);
try {
List<URL> urls = new ArrayList<>();
// 获取三个路径, 分别是/providers、/routers、/configurators
for (String path : toCategoriesPath(url)) {
// 获取该消费者URL对应的监听器Map
ConcurrentMap<NotifyListener, ChildListener> listeners =
zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
// 查找NotifyListener对应的ChildListener是否存在, 不存在则创建
ChildListener zkListener = listeners.computeIfAbsent(listener,
k -> new RegistryChildListenerImpl(url, path, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
// 在ZooKeeper服务端创建持久节点
zkClient.create(path, false);
// 添加ChildListener监听器, 并全量拉取下相关服务数据
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// 将字符串形式的服务数据转换成URL列表数据
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 首次订阅后触发下notify逻辑, 刷新消费者本地的服务提供者列表等数据
notify(url, listener, urls);
} finally {
latch.countDown();
}
}
接下来看下AbstractZooKeeperClient的addChildListener方法,逻辑也比较简单,在指定path路径上添加一次性的Watcher。取消订阅的逻辑则是会将传入的UR和NotifyListener对应的ChildListener移除,不再监听。
public List<String> addChildListener(String path, final ChildListener listener) {
ConcurrentMap<ChildListener, TargetChildListener> listeners =
childListeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
TargetChildListener targetListener = listeners.computeIfAbsent(listener,
k -> createTargetChildListener(path, k));
return addTargetChildListener(path, targetListener);
}
@Override
public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
try {
return client.getChildren().usingWatcher(listener).forPath(path);
} catch (NoNodeException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
本文通过分析Dubbo3.0中AbstractRegistry、FailbackRegistry、ZooKeeperRegistry,详细介绍了Dubbo中ZooKeeper注册中心的实现原理,包括服务数据本地缓存、服务注册订阅异常重试等。另外,网上大部分文章是基于2.x版本的,本文基于Dubbo3.0,重点分析了Dubbo3.0中新引入的CacheableFailbackRegistry,详细介绍了Dubbo在URL推送模型上所做的优化。
参考资料:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。