远程配置的更新分为推和拉两种模式, apollo属于推拉结合的
推: 配置中心将变更信息主动发送给服务器 拉: 服务器会定时调用配置中心获取最新的配置
全称Service Provider Interface, jdk自带的服务发现, 在资源目录中的META-INF/services
下的文件
spring的spi, 在资源目录下的META-INF/spring.factories
文件,Spring Boot的自动装配机制的源头; 与spring boot进行整合的jar包入口类大多在这里
资源加载后置处理器, 在资源准备阶段可以自定义加载外部化配置;
可以参考一下: ServletContainerInitializer、ServletContextInitializer有什么区别
Spring的Bean存在一系列的生命周期; 不同阶段可以通过处理器来增加一些额外的处理
因为apollo是做配置管理的, 这个类是在SpringBoot的启动阶段来初始化apollo的入口, 我个人倾向于对这个有了解即可, 这个类同时实现了ApplicationContextInitializer和EnvironmentPostProcessor; 对应的也就是初始化的时机
jdk的标准实现可能会把ConfigManager,ConfigFactoryManager等这些东西全部作为spi,而apollo则是只管理Injector
这个类可以获取到具体类的作用域,以及具体的实现类
public class DefaultInjector implements Injector {
private com.google.inject.Injector m_injector;
public DefaultInjector() {
m_injector = Guice.createInjector(new ApolloModule());
}
@Override
public <T> T getInstance(Class<T> clazz) {
return m_injector.getInstance(clazz);
}
private static class ApolloModule extends AbstractModule {
@Override
protected void configure() {
bind(ConfigManager.class).to(DefaultConfigManager.class).in(Singleton.class);
bind(ConfigFactoryManager.class).to(DefaultConfigFactoryManager.class).in(Singleton.class);
bind(ConfigRegistry.class).to(DefaultConfigRegistry.class).in(Singleton.class);
bind(ConfigFactory.class).to(DefaultConfigFactory.class).in(Singleton.class);
bind(ConfigUtil.class).in(Singleton.class);
bind(HttpUtil.class).in(Singleton.class);
bind(ConfigServiceLocator.class).in(Singleton.class);
bind(RemoteConfigLongPollService.class).in(Singleton.class);
bind(YamlParser.class).in(Singleton.class);
}
}
}
在apollo中,不同的namespace会生成与之对应的Config,该类可以看到配置的创建过程,这个地方可以获取到一个信息
-Denv=LOCAL
启动可以跳过获取远成配置,直接读取本地配置,在连接不到远程服务器上,可以本地文件启动项目
public class DefaultConfigFactory implements ConfigFactory {
@Override
public Config create(String namespace) {
ConfigFileFormat format = determineFileFormat(namespace);
if (ConfigFileFormat.isPropertiesCompatible(format)) {
return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
}
return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
}
LocalFileConfigRepository createLocalConfigRepository(String namespace) {
if (m_configUtil.isInLocalMode()) {
return new LocalFileConfigRepository(namespace);
}
return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
}
}
一般使用apollo的两个功能:
而这两个功能都与这个类有关系,
public class RemoteConfigRepository extends AbstractConfigRepository {
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
gson = new Gson();
//构造器中同步调用获取一次配置
this.trySync();
//启动一个job, 定时轮询获取远端服务配置
this.schedulePeriodicRefresh();
//启动一个长链接,可以实时获取到修改的配置
this.scheduleLongPollingRefresh();
}
//默认5分钟拉取一次的定时job
private void schedulePeriodicRefresh() {
m_executorService.scheduleAtFixedRate(()-> trySync() },
m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshIntervalTimeUnit());
}
//启动一个长链接
private void doLongPollingRefresh(String appId, String cluster, String dataCenter) {
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
//...
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
//90S
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpUtil.doGet(request, m_responseType);
if (response.getStatusCode() == 200 && response.getBody() != null) {
updateNotifications(response.getBody());
updateRemoteNotifications(response.getBody());
//内部依旧会调用sync方法
notify(lastServiceDto, response.getBody());
}
}
}
//部分删减
@Override
protected synchronized void sync() {
ApolloConfig previous = m_configCache.get();
//方法代码多,但是没有复杂逻辑,
ApolloConfig current = loadApolloConfig();
m_configCache.set(current);
//本地事件发布
this.fireRepositoryChange(m_namespace, this.getConfig());
}
}
类的功能就是管理配置,事件监听, 然后计算出发生变更的配置进行修改,发送配置变更事件 在DefaultConfigFactory中创建的Config是DefaultConfig
在RemoteConfigRepository中本地事件发布的监听器也是DefaultConfig本身, @see initialize()
public class DefaultConfig extends AbstractConfig implements RepositoryChangeListener {
public DefaultConfig(String namespace, ConfigRepository configRepository) {
m_namespace = namespace;
m_resourceProperties = loadFromResource(m_namespace);
m_configRepository = configRepository;
m_configProperties = new AtomicReference<>();
m_warnLogRateLimiter = RateLimiter.create(0.017); // 1 warning log output per minute
initialize();
}
private void initialize() {
updateConfig(m_configRepository.getConfig(), m_configRepository.getSourceType());
m_configRepository.addChangeListener(this);
}
@Override
public synchronized void onRepositoryChange(String namespace, Properties newProperties) {
if (newProperties.equals(m_configProperties.get())) {
return;
}
ConfigSourceType sourceType = m_configRepository.getSourceType();
Properties newConfigProperties = new Properties();
newConfigProperties.putAll(newProperties);
//将每一个配置进行对比,配置变更有三种: 新增,修改,删除 ,变更的类型放到ConfigChange中
Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties, sourceType);
//check double checked result
if (actualChanges.isEmpty()) {
return;
}
this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges));
}
protected void fireConfigChange(final ConfigChangeEvent changeEvent) {
for (final ConfigChangeListener listener : m_listeners) {
//代码简化了一下, 就是异步发送一下事件
m_executorService.submit(()-> listener.onChange(changeEvent)});
}
}
}
DefaultConfig发布的配置变更事件的监听器,实现动态刷新的地方 动态刷新的方式简单粗暴,就是SpringValueRegistry中已经存放好了配置对应的Bean对象, 通过反射强制修改注入字段的值
public class AutoUpdateConfigChangeListener implements ConfigChangeListener{
//...
private final SpringValueRegistry springValueRegistry;
@Override
public void onChange(ConfigChangeEvent changeEvent) {
Set<String> keys = changeEvent.changedKeys();
if (CollectionUtils.isEmpty(keys)) {
return;
}
for (String key : keys) {
// 1. check whether the changed key is relevant
Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);
if (targetValues == null || targetValues.isEmpty()) {
continue;
}
// 2. update the value
for (SpringValue val : targetValues) {
updateSpringValue(val);
}
}
}
private void updateSpringValue(SpringValue springValue) {
Object value = resolvePropertyValue(springValue);
springValue.update(value);
}
}
SpringValue的注册中心,配置与bean之间的关系的注册器,registry这个字段已经能够看出他的功能
注册的时机,见后面的SpringValueProcessor
public class SpringValueRegistry {
private final Map<BeanFactory, Multimap<String, SpringValue>> registry = Maps.newConcurrentMap();
}
就是属性所属的bean对象,为了呼应一下AutoUpdateConfigChangeListener中的updateSpringValue方法而写一下
public class SpringValue {
private Field field;
private WeakReference<Object> beanRef;
private String beanName;
private String key;
private String placeholder;
private Class<?> targetType;
private Type genericType;
private boolean isJson;
public void update(Object newVal) throws IllegalAccessException, InvocationTargetException {
if (isField()) {
injectField(newVal);
} else {
injectMethod(newVal);
}
}
private void injectField(Object newVal) throws IllegalAccessException {
Object bean = beanRef.get();
if (bean == null) {
return;
}
boolean accessible = field.isAccessible();
field.setAccessible(true);
field.set(bean, newVal);
field.setAccessible(accessible);
}
private void injectMethod(Object newVal)
throws InvocationTargetException, IllegalAccessException {
Object bean = beanRef.get();
if (bean == null) {
return;
}
methodParameter.getMethod().invoke(bean, newVal);
}
}
总感觉Apollo中类的职责还是比较多的; 该类同时是BeanFactoryPostProcessor和BeanPostProcessor
这个类的话就是在Bean的初始化前阶段(postProcessBeforeInitialization) 将属性->Bean的对应关系存储到springValueRegistry注册器中
public class SpringValueProcessor extends ApolloProcessor implements BeanFactoryPostProcessor, BeanFactoryAware {
private final ConfigUtil configUtil;
private final PlaceholderHelper placeholderHelper;
private final SpringValueRegistry springValueRegistry;
private BeanFactory beanFactory;
private Multimap<String, SpringValueDefinition> beanName2SpringValueDefinitions;
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
throws BeansException {
if (configUtil.isAutoUpdateInjectedSpringPropertiesEnabled() && beanFactory instanceof BeanDefinitionRegistry) {
beanName2SpringValueDefinitions = SpringValueDefinitionProcessor
.getBeanName2SpringValueDefinitions((BeanDefinitionRegistry) beanFactory);
}
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
if (configUtil.isAutoUpdateInjectedSpringPropertiesEnabled()) {
super.postProcessBeforeInitialization(bean, beanName);
// --- super()
Class clazz = bean.getClass();
for (Field field : findAllField(clazz)) {
processField(bean, beanName, field);
}
for (Method method : findAllMethod(clazz)) {
processMethod(bean, beanName, method);
}
return bean;
//--- super end
processBeanPropertyValues(bean, beanName);
}
return bean;
}
@Override
protected void processField(Object bean, String beanName, Field field) {
// register @Value on field
Value value = field.getAnnotation(Value.class);
if (value == null) {
return;
}
Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());
if (keys.isEmpty()) {
return;
}
for (String key : keys) {
SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, field, false);
springValueRegistry.register(beanFactory, key, springValue);
}
}
@Override
protected void processMethod(Object bean, String beanName, Method method) {
//register @Value on method
Value value = method.getAnnotation(Value.class);
if (value == null) {
return;
}
//skip Configuration bean methods
if (method.getAnnotation(Bean.class) != null) {
return;
}
if (method.getParameterTypes().length != 1) {
return;
}
Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());
if (keys.isEmpty()) {
return;
}
SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, method, false);
springValueRegistry.register(beanFactory, key, springValue);
}
}
private void processBeanPropertyValues(Object bean, String beanName) {
Collection<SpringValueDefinition> propertySpringValues = beanName2SpringValueDefinitions
.get(beanName);
if (propertySpringValues == null || propertySpringValues.isEmpty()) {
return;
}
for (SpringValueDefinition definition : propertySpringValues) {
PropertyDescriptor pd = BeanUtils
.getPropertyDescriptor(bean.getClass(), definition.getPropertyName());
Method method = pd.getWriteMethod();
if (method == null) {
continue;
}
SpringValue springValue = new SpringValue(definition.getKey(), definition.getPlaceholder(),
bean, beanName, method, false);
springValueRegistry.register(beanFactory, definition.getKey(), springValue);
logger.debug("Monitoring {}", springValue);
}
beanName2SpringValueDefinitions.removeAll(beanName);
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。