前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >apollo源码

apollo源码

原创
作者头像
eeaters
修改2021-09-17 17:22:50
修改2021-09-17 17:22:50
1.1K00
代码可运行
举报
文章被收录于专栏:阿杰阿杰
运行总次数:0
代码可运行

结论

启动流程

  • Spring启动
    • 调用 ApolloApplicationContextInitializer
      • 通过spi方式开始初始化
      • ConfigService ->ConfigManager ->ConfigFactory ->Config ->ConfigRepository
        • 首次初始化时会同步获取一次资源 RemoteConfigRepository
          • 同步调用一次 this.trySync();
          • 开启异步定时任务 this.schedulePeriodicRefresh();
          • 开启长轮询,及时获取配置调整 this.scheduleLongPollingRefresh();
    • bean初始化前阶段 - SpringValueProcessor
      • 将配置与bean的关系注册到SpringValueRegistry中

配置发生变更流程

  • RemoteConfigRepository - sync (长轮询收到变更也会调用该方法)
    • 加载远端最新的配置
      • 将配置与本地进行对比,可能是新增/删除/更新,归并后发送变更的事件
        • 监听器从SpringValueRegistry中获取出所有使用配置的bean,通过反射将属性进行替换

JDK及Spring一些基本知识

远程配置

远程配置的更新分为推和拉两种模式, apollo属于推拉结合的

推: 配置中心将变更信息主动发送给服务器 拉: 服务器会定时调用配置中心获取最新的配置

JDK SPI

全称Service Provider Interface, jdk自带的服务发现, 在资源目录中的META-INF/services下的文件

Spring SPI

spring的spi, 在资源目录下的META-INF/spring.factories文件,Spring Boot的自动装配机制的源头; 与spring boot进行整合的jar包入口类大多在这里

EnvironmentPostProcessor

资源加载后置处理器, 在资源准备阶段可以自定义加载外部化配置;

调用入口
  • prepareEnvironment
    • listeners.environmentPrepared(environment);

ApplicationContextInitializer

可以参考一下: ServletContainerInitializer、ServletContextInitializer有什么区别

调用入口
  • prepareContext
    • applyInitializers

BeanPostProcessor

Spring的Bean存在一系列的生命周期; 不同阶段可以通过处理器来增加一些额外的处理

Apollo类梳理

ApolloApplicationContextInitializer

因为apollo是做配置管理的, 这个类是在SpringBoot的启动阶段来初始化apollo的入口, 我个人倾向于对这个有了解即可, 这个类同时实现了ApplicationContextInitializer和EnvironmentPostProcessor; 对应的也就是初始化的时机

Injector

jdk的标准实现可能会把ConfigManager,ConfigFactoryManager等这些东西全部作为spi,而apollo则是只管理Injector

这个类可以获取到具体类的作用域,以及具体的实现类

代码语言:javascript
代码运行次数:0
复制
public class DefaultInjector implements Injector {
  private com.google.inject.Injector m_injector;
​
  public DefaultInjector() {
      m_injector = Guice.createInjector(new ApolloModule());
  }
  @Override
  public <T> T getInstance(Class<T> clazz) {
      return m_injector.getInstance(clazz);
  }
  private static class ApolloModule extends AbstractModule {
    @Override
    protected void configure() {
      bind(ConfigManager.class).to(DefaultConfigManager.class).in(Singleton.class);
      bind(ConfigFactoryManager.class).to(DefaultConfigFactoryManager.class).in(Singleton.class);
      bind(ConfigRegistry.class).to(DefaultConfigRegistry.class).in(Singleton.class);
      bind(ConfigFactory.class).to(DefaultConfigFactory.class).in(Singleton.class);
      bind(ConfigUtil.class).in(Singleton.class);
      bind(HttpUtil.class).in(Singleton.class);
      bind(ConfigServiceLocator.class).in(Singleton.class);
      bind(RemoteConfigLongPollService.class).in(Singleton.class);
      bind(YamlParser.class).in(Singleton.class);
    }
  }
}

DefaultConfigFactory

在apollo中,不同的namespace会生成与之对应的Config,该类可以看到配置的创建过程,这个地方可以获取到一个信息

小知识:

-Denv=LOCAL 启动可以跳过获取远成配置,直接读取本地配置,在连接不到远程服务器上,可以本地文件启动项目

代码语言:javascript
代码运行次数:0
复制
public class DefaultConfigFactory implements ConfigFactory {
  @Override
  public Config create(String namespace) {
    ConfigFileFormat format = determineFileFormat(namespace);
    if (ConfigFileFormat.isPropertiesCompatible(format)) {
      return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
    }
    return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
  }
​
  LocalFileConfigRepository createLocalConfigRepository(String namespace) {
    if (m_configUtil.isInLocalMode()) {
      return new LocalFileConfigRepository(namespace);
    }
    return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
  }
}

RemoteConfigRepository

一般使用apollo的两个功能:

  • 外部化配置
  • 属性动态刷新

而这两个功能都与这个类有关系,

代码语言:javascript
代码运行次数:0
复制
public class RemoteConfigRepository extends AbstractConfigRepository {    
    public RemoteConfigRepository(String namespace) {
        m_namespace = namespace;
        m_configCache = new AtomicReference<>();
        m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
        m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
        m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
        remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
        m_longPollServiceDto = new AtomicReference<>();
        m_remoteMessages = new AtomicReference<>();
        m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
        m_configNeedForceRefresh = new AtomicBoolean(true);
        m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
            m_configUtil.getOnErrorRetryInterval() * 8);
        gson = new Gson();
        //构造器中同步调用获取一次配置
        this.trySync();
        //启动一个job, 定时轮询获取远端服务配置
        this.schedulePeriodicRefresh();
        //启动一个长链接,可以实时获取到修改的配置
        this.scheduleLongPollingRefresh();
      }
    
    //默认5分钟拉取一次的定时job
    private void schedulePeriodicRefresh() {
        m_executorService.scheduleAtFixedRate(()-> trySync() },
                 m_configUtil.getRefreshInterval(),     
                 m_configUtil.getRefreshInterval(),
                m_configUtil.getRefreshIntervalTimeUnit());
     }
   //启动一个长链接
  private void doLongPollingRefresh(String appId, String cluster, String dataCenter) {
    while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
        //...
        List<ServiceDTO> configServices = getConfigServices();
        lastServiceDto = configServices.get(random.nextInt(configServices.size()));
​
        logger.debug("Long polling from {}", url);
        HttpRequest request = new HttpRequest(url);
        //90S
        request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);  
        final HttpResponse<List<ApolloConfigNotification>> response =
            m_httpUtil.doGet(request, m_responseType);
​
        if (response.getStatusCode() == 200 && response.getBody() != null) {
          updateNotifications(response.getBody());
          updateRemoteNotifications(response.getBody());
          //内部依旧会调用sync方法
          notify(lastServiceDto, response.getBody());
        }
    }
  }
  //部分删减
  @Override
  protected synchronized void sync() {
      ApolloConfig previous = m_configCache.get();
      //方法代码多,但是没有复杂逻辑,
      ApolloConfig current = loadApolloConfig();
      m_configCache.set(current);
      //本地事件发布
      this.fireRepositoryChange(m_namespace, this.getConfig());
  }
}

DefaultConfig

类的功能就是管理配置,事件监听, 然后计算出发生变更的配置进行修改,发送配置变更事件 在DefaultConfigFactory中创建的Config是DefaultConfig

在RemoteConfigRepository中本地事件发布的监听器也是DefaultConfig本身, @see initialize()

代码语言:javascript
代码运行次数:0
复制
public class DefaultConfig extends AbstractConfig implements RepositoryChangeListener {
      public DefaultConfig(String namespace, ConfigRepository configRepository) {
    m_namespace = namespace;
    m_resourceProperties = loadFromResource(m_namespace);
    m_configRepository = configRepository;
    m_configProperties = new AtomicReference<>();
    m_warnLogRateLimiter = RateLimiter.create(0.017); // 1 warning log output per minute
    initialize();
  }
    
  private void initialize() {
      updateConfig(m_configRepository.getConfig(), m_configRepository.getSourceType());
      m_configRepository.addChangeListener(this);
  }
    
  @Override
  public synchronized void onRepositoryChange(String namespace, Properties newProperties) {
    if (newProperties.equals(m_configProperties.get())) {
      return;
    }
​
    ConfigSourceType sourceType = m_configRepository.getSourceType();
    Properties newConfigProperties = new Properties();
    newConfigProperties.putAll(newProperties);
​
    //将每一个配置进行对比,配置变更有三种: 新增,修改,删除  ,变更的类型放到ConfigChange中
    Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties, sourceType);
​
    //check double checked result
    if (actualChanges.isEmpty()) {
      return;
    }
    this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges));
  }
    
  protected void fireConfigChange(final ConfigChangeEvent changeEvent) {
        for (final ConfigChangeListener listener : m_listeners) {
            //代码简化了一下, 就是异步发送一下事件
            m_executorService.submit(()-> listener.onChange(changeEvent)});
        }
  }
}

AutoUpdateConfigChangeListener

DefaultConfig发布的配置变更事件的监听器,实现动态刷新的地方 动态刷新的方式简单粗暴,就是SpringValueRegistry中已经存放好了配置对应的Bean对象, 通过反射强制修改注入字段的值

代码语言:javascript
代码运行次数:0
复制
public class AutoUpdateConfigChangeListener implements ConfigChangeListener{
   //...
  private final SpringValueRegistry springValueRegistry;
    
  @Override
  public void onChange(ConfigChangeEvent changeEvent) {
    Set<String> keys = changeEvent.changedKeys();
    if (CollectionUtils.isEmpty(keys)) {
      return;
    }
    for (String key : keys) {
      // 1. check whether the changed key is relevant
      Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);
      if (targetValues == null || targetValues.isEmpty()) {
        continue;
      }
​
      // 2. update the value
      for (SpringValue val : targetValues) {
        updateSpringValue(val);
      }
    }
  }
    
  private void updateSpringValue(SpringValue springValue) {
      Object value = resolvePropertyValue(springValue);
      springValue.update(value);
    }
}

SpringValueRegistry

SpringValue的注册中心,配置与bean之间的关系的注册器,registry这个字段已经能够看出他的功能

注册的时机,见后面的SpringValueProcessor

代码语言:javascript
代码运行次数:0
复制
public class SpringValueRegistry {
      private final Map<BeanFactory, Multimap<String, SpringValue>> registry = Maps.newConcurrentMap();
}

SpringValue

就是属性所属的bean对象,为了呼应一下AutoUpdateConfigChangeListener中的updateSpringValue方法而写一下

代码语言:javascript
代码运行次数:0
复制
public class SpringValue {
  private Field field;
  private WeakReference<Object> beanRef;
  private String beanName;
  private String key;
  private String placeholder;
  private Class<?> targetType;
  private Type genericType;
  private boolean isJson;
  public void update(Object newVal) throws IllegalAccessException, InvocationTargetException {
    if (isField()) {
      injectField(newVal);
    } else {
      injectMethod(newVal);
    }
  }
  private void injectField(Object newVal) throws IllegalAccessException {
    Object bean = beanRef.get();
    if (bean == null) {
      return;
    }
    boolean accessible = field.isAccessible();
    field.setAccessible(true);
    field.set(bean, newVal);
    field.setAccessible(accessible);
  }
  private void injectMethod(Object newVal)
      throws InvocationTargetException, IllegalAccessException {
    Object bean = beanRef.get();
    if (bean == null) {
      return;
    }
    methodParameter.getMethod().invoke(bean, newVal);
  }
}

SpringValueProcessor

总感觉Apollo中类的职责还是比较多的; 该类同时是BeanFactoryPostProcessor和BeanPostProcessor

这个类的话就是在Bean的初始化前阶段(postProcessBeforeInitialization) 将属性->Bean的对应关系存储到springValueRegistry注册器中

代码语言:javascript
代码运行次数:0
复制
public class SpringValueProcessor extends ApolloProcessor implements BeanFactoryPostProcessor, BeanFactoryAware {
​
  private final ConfigUtil configUtil;
  private final PlaceholderHelper placeholderHelper;
  private final SpringValueRegistry springValueRegistry;
​
  private BeanFactory beanFactory;
  private Multimap<String, SpringValueDefinition> beanName2SpringValueDefinitions;
    
      @Override
  public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
      throws BeansException {
    if (configUtil.isAutoUpdateInjectedSpringPropertiesEnabled() && beanFactory instanceof BeanDefinitionRegistry) {
      beanName2SpringValueDefinitions = SpringValueDefinitionProcessor
          .getBeanName2SpringValueDefinitions((BeanDefinitionRegistry) beanFactory);
    }
  }
​
  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName)
      throws BeansException {
    if (configUtil.isAutoUpdateInjectedSpringPropertiesEnabled()) {
      super.postProcessBeforeInitialization(bean, beanName);
        //  --- super()
          Class clazz = bean.getClass();
            for (Field field : findAllField(clazz)) {
              processField(bean, beanName, field);
            }
            for (Method method : findAllMethod(clazz)) {
                processMethod(bean, beanName, method);
            }
            return bean;
        //--- super end
      processBeanPropertyValues(bean, beanName);
    }
    return bean;
  }
​
  @Override
  protected void processField(Object bean, String beanName, Field field) {
    // register @Value on field
    Value value = field.getAnnotation(Value.class);
    if (value == null) {
      return;
    }
    Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());
​
    if (keys.isEmpty()) {
      return;
    }
​
    for (String key : keys) {
      SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, field, false);
      springValueRegistry.register(beanFactory, key, springValue);
    }
  }
​
  @Override
  protected void processMethod(Object bean, String beanName, Method method) {
    //register @Value on method
    Value value = method.getAnnotation(Value.class);
    if (value == null) {
      return;
    }
    //skip Configuration bean methods
    if (method.getAnnotation(Bean.class) != null) {
      return;
    }
    if (method.getParameterTypes().length != 1) {
      return;
    }
​
    Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());
    if (keys.isEmpty()) {
      return;
    }
      SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, method, false);
      springValueRegistry.register(beanFactory, key, springValue);
    }
  }
​
  private void processBeanPropertyValues(Object bean, String beanName) {
    Collection<SpringValueDefinition> propertySpringValues = beanName2SpringValueDefinitions
        .get(beanName);
    if (propertySpringValues == null || propertySpringValues.isEmpty()) {
      return;
    }
​
    for (SpringValueDefinition definition : propertySpringValues) {
        PropertyDescriptor pd = BeanUtils
            .getPropertyDescriptor(bean.getClass(), definition.getPropertyName());
        Method method = pd.getWriteMethod();
        if (method == null) {
          continue;
        }
        SpringValue springValue = new SpringValue(definition.getKey(), definition.getPlaceholder(),
            bean, beanName, method, false);
        springValueRegistry.register(beanFactory, definition.getKey(), springValue);
        logger.debug("Monitoring {}", springValue);
    }
    beanName2SpringValueDefinitions.removeAll(beanName);
  }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 结论
    • 启动流程
    • 配置发生变更流程
  • JDK及Spring一些基本知识
    • 远程配置
    • JDK SPI
    • Spring SPI
    • EnvironmentPostProcessor
    • ApplicationContextInitializer
    • BeanPostProcessor
  • Apollo类梳理
    • ApolloApplicationContextInitializer
    • Injector
    • DefaultConfigFactory
      • 小知识:
    • RemoteConfigRepository
    • DefaultConfig
    • AutoUpdateConfigChangeListener
    • SpringValueRegistry
    • SpringValue
    • SpringValueProcessor
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档