先看下Dubbo源码中demo启动类
@SpringBootApplication
@EnableDubbo(scanBasePackages = {"org.apache.dubbo.springboot.demo.provider"})
public class ProviderApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(ProviderApplication.class, args);
System.out.println("dubbo service started");
new CountDownLatch(1).await();
}
}
通过@EnableDubbo注解启动,该注解整合了@EnableDubboConfig和@DubboCompoentScan,这两个注解分别import了DubboConfigConfigurationRegistrar和DubboComponentScanRegistrar
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Import(DubboConfigConfigurationRegistrar.class)
public @interface EnableDubboConfig {......}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {......}
DubboConfigConfigurationRegistrar和DubboComponentScanRegistrar都是在BeanDefinitionRegistry加载BeanDefinition的过程中向BeanDefinitionRegistry注册自定义的BeanDefinition,代码如下,都调用了DubboSpringInitializer.initialize方法初始化DubboSpringInitContext(如果存在就不再初始化)
public class DubboConfigConfigurationRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
// initialize dubbo beans
DubboSpringInitializer.initialize(registry);BeanDefinitionRegistry
}
}
public class DubboComponentScanRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
// initialize dubbo beans
DubboSpringInitializer.initialize(registry);
Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
registerServiceAnnotationPostProcessor(packagesToScan, registry);
}
}
DubboSpringInitializer.initialize最终调用DubboBeanUtils.registerCommonBeans注册一系列Dubbo的基础服务类,如下
static void registerCommonBeans(BeanDefinitionRegistry registry) {
registerInfrastructureBean(registry, ServicePackagesHolder.BEAN_NAME, ServicePackagesHolder.class);
registerInfrastructureBean(registry, ReferenceBeanManager.BEAN_NAME, ReferenceBeanManager.class);
// Since 2.5.7 Register @Reference Annotation Bean Processor as an infrastructure Bean
registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
ReferenceAnnotationBeanPostProcessor.class);
// TODO Whether DubboConfigAliasPostProcessor can be removed ?
// Since 2.7.4 [Feature] https://github.com/apache/dubbo/issues/5093
registerInfrastructureBean(registry, DubboConfigAliasPostProcessor.BEAN_NAME,
DubboConfigAliasPostProcessor.class);
// register ApplicationListeners
registerInfrastructureBean(registry, DubboDeployApplicationListener.class.getName(), DubboDeployApplicationListener.class);
registerInfrastructureBean(registry, DubboConfigApplicationListener.class.getName(), DubboConfigApplicationListener.class);
// Since 2.7.6 Register DubboConfigDefaultPropertyValueBeanPostProcessor as an infrastructure Bean
registerInfrastructureBean(registry, DubboConfigDefaultPropertyValueBeanPostProcessor.BEAN_NAME,
DubboConfigDefaultPropertyValueBeanPostProcessor.class);
// Dubbo config initializer
registerInfrastructureBean(registry, DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);
// register infra bean if not exists later
registerInfrastructureBean(registry, DubboInfraBeanRegisterPostProcessor.BEAN_NAME, DubboInfraBeanRegisterPostProcessor.class);
}
这里主要先提下这两个类,后面详细分析
ReferenceAnnotationBeanPostProcessor:处理服务消费者的Processor
DubboDeployApplicationListener:Provider服务发布和Consumer引用的Listener
上面的所有内容在之前的文章中已经提到过,这样再说明一下,以便知道入口在哪里。
ReferenceAnnotationBeanPostProcessor实现了BeanFactoryPostProcessor接口,在BeanDefinition加载完了之后会调用其postProcessBeanFactory方法完成ReferenceBean的BeanDefinition注册,代码如下
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
String[] beanNames = beanFactory.getBeanDefinitionNames();
for (String beanName : beanNames) {
Class<?> beanType;
if (beanFactory.isFactoryBean(beanName)) {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
if (isReferenceBean(beanDefinition)) {
continue;
}
if (isAnnotatedReferenceBean(beanDefinition)) {
// 处理@DubboReference和@Bean一起使用的场景,如下
// @Bean
// @DubboReference(scope = "remote", version = "1.0.0", group = "group1")
// public ReferenceBean<DemoService> demoService() {
// return new ReferenceBean<>();
// }
// 这种情况已经注册了该ReferenceBean,所以processReferenceAnnotatedBeanDefinition就
// 不需要再重复注册,只需要做一些其他的信息设置的工作
processReferenceAnnotatedBeanDefinition(beanName, (AnnotatedBeanDefinition) beanDefinition);
continue;
}
String beanClassName = beanDefinition.getBeanClassName();
beanType = ClassUtils.resolveClass(beanClassName, getClassLoader());
} else {
beanType = beanFactory.getType(beanName);
}
if (beanType != null) {
// 下面处理通过字段注入的场景,如下
// @DubboReference(scope = "remote", version = "1.0.0", group = "group1")
// private DemoService demoService;
// 这种场景需要找到带有@DubboReference注解的字段信息,在prepareInjection方法中注册一个
// ReferenceBean
AnnotatedInjectionMetadata metadata = findInjectionMetadata(beanName, beanType, null);
try {
prepareInjection(metadata);
} catch (BeansException e) {
throw e;
} catch (Exception e) {
throw new IllegalStateException("Prepare dubbo reference injection element failed", e);
}
}
}
......
try {
// 发布一个DubboConfigInitEvent事件
applicationContext.publishEvent(new DubboConfigInitEvent(applicationContext));
} catch (Exception e) {
logger.warn(CONFIG_DUBBO_BEAN_INITIALIZER, "", "", "publish early application event failed, please upgrade spring version to 4.2.x or later: " + e);
}
}
另外ReferenceAnnotationBeanPostProcessor同样实现了InstantiationAwareBeanPostProcessor接口,并且通过实现postProcessProperties方法来实现@DubboReference的自动注入,如下,这点和原生的@Autowired实现原理一模一样
public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName)
throws BeansException {
try {
AnnotatedInjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
prepareInjection(metadata);
metadata.inject(bean, beanName, pvs);
} catch (BeansException ex) {
throw ex;
} catch (Throwable ex) {
throw new BeanCreationException(beanName, "Injection of @" + getAnnotationType().getSimpleName()
+ " dependencies is failed", ex);
}
return pvs;
}
注册完成了之后发布一个DubboConfigInitEvent事件,看看这个事件对应的Listener的逻辑
public class DubboConfigApplicationListener implements ApplicationListener<DubboConfigInitEvent>, ApplicationContextAware {
private ApplicationContext applicationContext;
private ModuleModel moduleModel;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
this.moduleModel = DubboBeanUtils.getModuleModel(applicationContext);
}
@Override
public void onApplicationEvent(DubboConfigInitEvent event) {
if (nullSafeEquals(applicationContext, event.getSource())) {
// It's expected to be notified at org.springframework.context.support.AbstractApplicationContext.registerListeners(),
// before loading non-lazy singleton beans. At this moment, all BeanFactoryPostProcessor have been processed,
if (initialized.compareAndSet(false, true)) {
initDubboConfigBeans();
}
}
}
private void initDubboConfigBeans() {
// load DubboConfigBeanInitializer to init config beans
if (applicationContext.containsBean(DubboConfigBeanInitializer.BEAN_NAME)) {
applicationContext.getBean(DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);
} else {
logger.warn(CONFIG_DUBBO_BEAN_NOT_FOUND, "", "", "Bean '" + DubboConfigBeanInitializer.BEAN_NAME + "' was not found");
}
// All infrastructure config beans are loaded, initialize dubbo here
moduleModel.getDeployer().prepare();
}
}
最终是通过DubboConfigBeanInitializer来做初始化,DubboConfigBeanInitializer实现了InitializingBean接口,afterPropertiesSet方法中的Init方法最终调用initReferenceBean将referenceBean添加到ConfigManager中
private synchronized void initReferenceBean(ReferenceBean referenceBean) throws Exception {
......
// 获取referenceKey
String referenceKey = getReferenceKeyByBeanName(referenceBean.getId());
if (StringUtils.isEmpty(referenceKey)) {
referenceKey = ReferenceBeanSupport.generateReferenceKey(referenceBean, applicationContext);
}
......
ReferenceConfig referenceConfig = referenceConfigMap.get(referenceKey);
if (referenceConfig == null) {
// 放入缓存
referenceConfigMap.put(referenceKey, referenceConfig);
// 和provider一样,将referenceBean添加到ConfigManager中
moduleModel.getConfigManager().addReference(referenceConfig);
}
......
}
DubboDeployApplicationListener主要用来处理Provider的服务发布和消费端的服务引用(上面提到过),其实现了ApplicationListener<ApplicationContextEvent>接口,主要监听ApplicationContext相关的各种事件,比如Spring启动完成后会发布一个ContextRefreshedEvent事件
public void onApplicationEvent(ApplicationContextEvent event) {
if (nullSafeEquals(applicationContext, event.getSource())) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
}
同样DubboDeployApplicationListener也实现了ApplicationContextAware接口,实例化后通过setApplicationContext方法将ModuleModel设置到自己的属性当中用户后续服务发布
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
......
this.moduleModel = DubboBeanUtils.getModuleModel(applicationContext);
......
}
DubboDeployApplicationListener监听到ContextRefreshedEvent事件之后,调用onContextRefreshedEvent方法进行服务发布
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
// 通过ModuleModel获取发布器
ModuleDeployer deployer = moduleModel.getDeployer();
Assert.notNull(deployer, "Module deployer is null");
// 开始服务发布的流程
Future future = deployer.start();
......
}
最终调用DefaultModuleDeployer.startSync方法处理
private synchronized Future startSync() throws IllegalStateException {
......
try {
// new了一个CompletableFuture来以获取start的结果,并且发布一个start事件
onModuleStarting();
......
// 发布服务
exportServices();
// 服务引用
referServices();
......
// 下面都是根据不同的发布结果,发布不同的事件的处理
if (asyncExportingFutures.isEmpty() && asyncReferringFutures.isEmpty()) {
onModuleStarted();
} else {
frameworkExecutorRepository.getSharedExecutor().submit(() -> {
try {
// wait for export finish
waitExportFinish();
// wait for refer finish
waitReferFinish();
} catch (Throwable e) {
logger.warn(CONFIG_FAILED_WAIT_EXPORT_REFER, "", "", "wait for export/refer services occurred an exception", e);
} finally {
onModuleStarted();
}
});
}
} catch (Throwable e) {
onModuleFailed(getIdentifier() + " start failed: " + e, e);
throw e;
}
return startFuture;
}
其中exportServices是发布服务,之前的文章中已经分析过,referServices是服务引用,可异步可同步,如下
private void referServices() {
configManager.getReferences().forEach(rc -> {
try {
ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
if (!referenceConfig.isRefreshed()) {
referenceConfig.refresh();
}
if (rc.shouldInit()) {
if (referAsync || rc.shouldReferAsync()) {
ExecutorService executor = executorRepository.getServiceReferExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
referenceCache.get(rc);
} catch (Throwable t) {
logger.error(CONFIG_FAILED_EXPORT_SERVICE, "", "", "Failed to async export service config: " + getIdentifier() + " , catch error : " + t.getMessage(), t);
}
}, executor);
asyncReferringFutures.add(future);
} else {
referenceCache.get(rc);
}
}
} catch (Throwable t) {
logger.error(CONFIG_FAILED_REFERENCE_MODEL, "", "", "Model reference failed: " + getIdentifier() + " , catch error : " + t.getMessage(), t);
referenceCache.destroy(rc);
throw t;
}
});
}
本质来说就是调用referenceCache.get方法,看看实现
public <T> T get(ReferenceConfigBase<T> rc) {
String key = generator.generateKey(rc);
Class<?> type = rc.getInterfaceClass();
boolean singleton = rc.getSingleton() == null || rc.getSingleton();
T proxy = null;
// 先从缓存里面去取,其实就是根据key到referenceKeyMap中获取
if (singleton) {
proxy = get(key, (Class<T>) type);
} else {
logger.warn(CONFIG_API_WRONG_USE, "", "", "Using non-singleton ReferenceConfig and ReferenceCache at the same time may cause memory leak. " +
"Call ReferenceConfig#get() directly for non-singleton ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");
}
if (proxy == null) {
// 缓存中没有,再调用rc.get方法获取
List<ReferenceConfigBase<?>> referencesOfType = ConcurrentHashMapUtils.computeIfAbsent(referenceTypeMap, type, _t -> Collections.synchronizedList(new ArrayList<>()));
referencesOfType.add(rc);
List<ReferenceConfigBase<?>> referenceConfigList = ConcurrentHashMapUtils.computeIfAbsent(referenceKeyMap, key, _k -> Collections.synchronizedList(new ArrayList<>()));
referenceConfigList.add(rc);
proxy = rc.get();
}
return proxy;
}
先从缓存里面去取,其实就是根据key到referenceKeyMap中获取,缓存中没有,再调用rc.get方法获取,看下rc.get方法
public T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
// ensure start module, compatible with old api usage
getScopeModel().getDeployer().start();
init();
}
return ref;
}
protected synchronized void init() {
if (initialized && ref != null) {
return;
}
try {
......
ref = createProxy(referenceParameters);
......
} catch (Throwable t) {
// 异常处理逻辑
......
}
initialized = true;
}
rc.get方法中调用init方法,然后调用createProxy方法生成代理对象,
private T createProxy(Map<String, String> referenceParameters) {
......
createInvoker();
......
// 创建代理对象,默认使用javassist
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
而代理的invoker则是由createInvoker方法生成
private void createInvoker() {
if (urls.size() == 1) {
URL curUrl = urls.get(0);
invoker = protocolSPI.refer(interfaceClass, curUrl);
// registry url, mesh-enable and unloadClusterRelated is true, not need Cluster.
if (!UrlUtils.isRegistry(curUrl) &&
!curUrl.getParameter(UNLOAD_CLUSTER_RELATED, false)) {
List<Invoker<?>> invokers = new ArrayList<>();
invokers.add(invoker);
invoker = Cluster.getCluster(getScopeModel(), Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
}
} else {
List<Invoker<?>> invokers = new ArrayList<>();
URL registryUrl = null;
for (URL url : urls) {
// For multi-registry scenarios, it is not checked whether each referInvoker is available.
// Because this invoker may become available later.
invokers.add(protocolSPI.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
// use last registry url
registryUrl = url;
}
}
if (registryUrl != null) {
// registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker
// (RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);
} else {
// not a registry url, must be direct invoke.
if (CollectionUtils.isEmpty(invokers)) {
throw new IllegalArgumentException("invokers == null");
}
URL curUrl = invokers.get(0).getUrl();
String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);
invoker = Cluster.getCluster(getScopeModel(), cluster).join(new StaticDirectory(curUrl, invokers), true);
}
}
}
和provider类似,通过protocolSPI.refer方法来处理,protocolSPI第一步获取类型为registry的protocol,获取到的其实是一个链表,链表中的每个元素依次进行refer
直接看最后一个元素InterfaceCompatibleRegistryProtocol的refer方法,这个类继承自RegistryProtocol,其实是调用RegistryProtocol的refer方法
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
Registry registry = getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
String group = qs.get(GROUP_KEY);
if (StringUtils.isNotEmpty(group)) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
}
}
Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url, qs);
}
核心代码就最后两行,获取Cluster,默认是FailoverCluster,调用deRefer方法
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());
consumerAttribute.remove(REFER_KEY);
String p = isEmpty(parameters.get(PROTOCOL_KEY)) ? CONSUMER : parameters.get(PROTOCOL_KEY);
URL consumerUrl = new ServiceConfigURL(
p,
null,
null,
parameters.get(REGISTER_IP_KEY),
0, getPath(parameters, type),
parameters,
consumerAttribute
);
url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
return interceptInvoker(migrationInvoker, url, consumerUrl);
}
doRefer方法中首先获取Invoker,获取到的是一个MigrationInvoker的实例,属性详情如下
然后调用interceptInvoker方法对Invoker做后续处理
protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, invoker, consumerUrl, url);
}
return invoker;
}
在interceptInvoker方法中找到可用的RegistryProtocolListener列表,依次执行RegistryProtocolListener的onRefer方法
可用的就一个MigrationRuleListener,看下它的onRefer方法
public void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker, URL consumerUrl, URL registryURL) {
MigrationRuleHandler<?> migrationRuleHandler = ConcurrentHashMapUtils.computeIfAbsent(handlers, (MigrationInvoker<?>) invoker, _key -> {
((MigrationInvoker<?>) invoker).setMigrationRuleListener(this);
return new MigrationRuleHandler<>((MigrationInvoker<?>) invoker, consumerUrl);
});
migrationRuleHandler.doMigrate(rule);
}
以MigrationInvoker和consumerUrl为入参构造了一个migrationRuleHandler,然后调用doMigrate方法,doMigrate
调用refreshInvoker方法处理
private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {
if (step == null || threshold == null) {
throw new IllegalStateException("Step or threshold of migration rule cannot be null");
}
MigrationStep originStep = currentStep;
if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {
boolean success = true;
switch (step) {
case APPLICATION_FIRST:
migrationInvoker.migrateToApplicationFirstInvoker(newRule);
break;
case FORCE_APPLICATION:
success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
break;
case FORCE_INTERFACE:
default:
success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
}
if (success) {
setCurrentStepAndThreshold(step, threshold);
logger.info("Succeed Migrated to " + step + " mode. Service Name: " + consumerURL.getDisplayServiceKey());
report(step, originStep, "true");
} else {
// migrate failed, do not save new step and rule
logger.warn(INTERNAL_ERROR, "unknown error in registry module", "", "Migrate to " + step + " mode failed. Probably not satisfy the threshold you set "
+ threshold + ". Please try re-publish configuration if you still after check.");
report(step, originStep, "false");
}
return success;
}
// ignore if step is same with previous, will continue override rule for MigrationInvoker
return true;
最后调用了migrationInvoker.migrateToApplicationFirstInvoker方法
public void migrateToApplicationFirstInvoker(MigrationRule newRule) {
CountDownLatch latch = new CountDownLatch(0);
refreshInterfaceInvoker(latch);
refreshServiceDiscoveryInvoker(latch);
// directly calculate preferred invoker, will not wait until address notify
// calculation will re-occurred when address notify later
calcPreferredInvoker(newRule);
}
详细看看refreshInterfaceInvoker方法的逻辑
protected void refreshInterfaceInvoker(CountDownLatch latch) {
clearListener(invoker);
if (needRefresh(invoker)) {
if (logger.isDebugEnabled()) {
logger.debug("Re-subscribing interface addresses for interface " + type.getName());
}
if (invoker != null) {
invoker.destroy();
}
invoker = registryProtocol.getInvoker(cluster, registry, type, url);
}
setListener(invoker, () -> {
latch.countDown();
if (reportService.hasReporter()) {
reportService.reportConsumptionStatus(
reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "interface"));
}
if (step == APPLICATION_FIRST) {
calcPreferredInvoker(rule);
}
});
}
该方法中主要就是设置了migrationInvoker的invoker属性,通过registryProtocol.getInvoker获取,这里的registryProtocol就是
上文中提到过了InterfaceCompatibleRegistryProtocol
public <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
DynamicDirectory<T> directory = new RegistryDirectory<>(type, url);
return doCreateInvoker(directory, cluster, registry, type);
}
getInvoker中又继续调用父类RegistryProtocol.doCreateInvoker方法
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());
URL urlToRegistry = new ServiceConfigURL(
parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),
parameters.remove(REGISTER_IP_KEY),
0,
getPath(parameters, type),
parameters
);
urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());
urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());
if (directory.isShouldRegister()) {
// 将消费者信息写入注册中心
directory.setRegisteredConsumerUrl(urlToRegistry);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(urlToRegistry);
// 生成所有提供者的FilterInvoker,此处的Filter是针对单个invoker的Filter(最底层的是DubboInvoker)
directory.subscribe(toSubscribeUrl(urlToRegistry));
// 生成集群级别的FilterInvoker,包含了大部分默认的Filter
return (ClusterInvoker<T>) cluster.join(directory, true);
}
详细看看生成提供者的FilterInvoker的过程,通过directory.subscribe方法
public void subscribe(URL url) {
ApplicationModel applicationModel = url.getApplicationModel();
MetricsEventBus.post(RegistryEvent.toSubscribeEvent(applicationModel),() ->
{
super.subscribe(url);
return null;
}
);
if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, org.apache.dubbo.registry.Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
consumerConfigurationListener.addNotifyListener(this);
referenceConfigurationListener = new ReferenceConfigurationListener(moduleModel, this, url);
}
}
核心逻辑是通过调用super.subscribe方法来处理
public void subscribe(URL url) {
setSubscribeUrl(url);
registry.subscribe(url, this);
}
接着看registry.subscribe方法,入参listener为最初的调用者directory
public void subscribe(URL url, NotifyListener listener) {
try {
if (registry != null) {
// 在这里生成所有提供者的FilterInvoker,并且监听提供者的的变化
// 这里的registry是一个ZookeeperRegistry实例
registry.subscribe(url, listener);
}
} finally {
listenerEvent(serviceListener -> serviceListener.onSubscribe(url, registry));
}
}
最后调用ZookeeperRegistry类的doSubscribe方法
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
checkDestroyed();
if (ANY_VALUE.equals(url.getServiceInterface())) {
// 由于我本地跑的都是具体的interface,所以不会走到这个分支
} else {
CountDownLatch latch = new CountDownLatch(1);
try {
List<URL> urls = new ArrayList<>();
/*
Iterate over the category value in URL.
With default settings, the path variable can be when url is a consumer URL:
/dubbo/[service name]/providers,
/dubbo/[service name]/configurators
/dubbo/[service name]/routers
*/
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = ConcurrentHashMapUtils.computeIfAbsent(zkListeners, url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = ConcurrentHashMapUtils.computeIfAbsent(listeners, listener, k -> new RegistryChildListenerImpl(url, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
// create "directories".
zkClient.create(path, false, true);
// 添加提供者的监听器,并且获取到所有的提供者添加到urls中
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// The invocation point that may cause 1-1.
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 生成提供者的Invoker
notify(url, listener, urls);
} finally {
// tells the listener to run only after the sync notification of main thread finishes.
latch.countDown();
}
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
先注册了一个zkListener,zkListener为一个RegistryChildListenerImpl的实例
private class RegistryChildListenerImpl implements ChildListener {
......
@Override
public void childChanged(String path, List<String> children) {
// Notify 'notifiers' one by one.
try {
latch.await();
} catch (InterruptedException e) {
logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Zookeeper children listener thread was interrupted unexpectedly, may cause race condition with the main thread.");
}
notifier.notify(path, children);
}
}
当提供者发生变化时,通过childChanged方法调用notifier.notify最终调用到以下的notify方法更新invoker,并且在注册完zkListener后也是调用这个方法生成提供者的invoker
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list
logger.error(REGISTRY_FAILED_NOTIFY_EVENT, "", "", "Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);
}
}
最终又调回listener的notify方法,并且通过listener的refreshOverrideAndInvoker方法刷新invoker
protected synchronized void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
this.directoryUrl = overrideWithConfigurator(getOriginalConsumerUrl());
refreshInvoker(urls);
}
至此directory.subscribe(toSubscribeUrl(urlToRegistry))方法就执行完了,最后再通过cluster.join(directory,true)方法生成ClusterInvoker,本质是一个ClusterFilterInvoker,在其构造方法中构造集群级别的Filter链
public ClusterFilterInvoker(AbstractClusterInvoker<T> invoker) {
List<FilterChainBuilder> builders = ScopeModelUtil.getApplicationModel(invoker.getUrl().getScopeModel()).getExtensionLoader(FilterChainBuilder.class).getActivateExtensions();
if (CollectionUtils.isEmpty(builders)) {
filterInvoker = invoker;
} else {
ClusterInvoker<T> tmpInvoker = invoker;
for (FilterChainBuilder builder : builders) {
tmpInvoker = builder.buildClusterInvokerChain(tmpInvoker, REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
filterInvoker = tmpInvoker;
}
}
最后再来看看生成单个provider的invoker的逻辑
private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {
Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
......
Invoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);
if (invoker == null) { // Not in the cache, refer again
try {
......
if (enabled) {
// 核心代码就这一行
invoker = protocol.refer(serviceType, url);
}
} catch (Throwable t) {
// 异常处理逻辑省略
......
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(url, invoker);
}
} else {
newUrlInvokerMap.put(url, invoker);
}
}
return newUrlInvokerMap;
}
核心代码就一行调用protocol.refer方法处理,这里的protocol其实是也是一个适配工具类,即上文中提到的protocolSPI,所以第一步也是去获取protocol的处理链表,只是这次获取的是类型为dubbo的protocol,如下
最后到DubboProtocol的refer方法处理
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
checkDestroyed();
return protocolBindingRefer(type, url);
}
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
checkDestroyed();
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
新建了一个DubboInvoker,其构造函数第三个入参是根据url生成的一个NettyClient(具体生成的逻辑就不看了,比较简单)
最后再来看看最终生成的Invoker的结构,最外层是一个MigrationInvoker,最里层是一个带有Filter的集群级别的FilterInvoker
FilterInvoker有一个originalInvoker属性,看看它的结构
FilterInvoker的directory属性中包含了所有提供者的invoker,最里层是一个DubboInvoker
自此Consume端的启动完成
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。