前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊jetcache的ProxyCache

聊聊jetcache的ProxyCache

原创
作者头像
code4it
发布2024-06-19 09:26:15
700
发布2024-06-19 09:26:15
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下jetcache的ProxyCache

ProxyCache

jetcache-core/src/main/java/com/alicp/jetcache/ProxyCache.java

代码语言:javascript
复制
public interface ProxyCache<K, V> extends Cache<K, V> {
    Cache<K, V> getTargetCache();

    @Override
    default <T> T unwrap(Class<T> clazz) {
        return getTargetCache().unwrap(clazz);
    }

}

ProxyCache继承了Cache接口,它定义了getTargetCache方法,并默认实现了unwrap方法

SimpleProxyCache

jetcache-core/src/main/java/com/alicp/jetcache/SimpleProxyCache.java

代码语言:javascript
复制
public class SimpleProxyCache<K, V> implements ProxyCache<K, V> {

    protected Cache<K, V> cache;

    public SimpleProxyCache(Cache<K, V> cache) {
        this.cache = cache;
    }

    @Override
    public CacheConfig<K, V> config() {
        return cache.config();
    }

    @Override
    public Cache<K, V> getTargetCache() {
        return cache;
    }

    @Override
    public V get(K key) {
        return cache.get(key);
    }

    @Override
    public Map<K, V> getAll(Set<? extends K> keys) {
        return cache.getAll(keys);
    }

    @Override
    public void put(K key, V value) {
        cache.put(key, value);
    }

    @Override
    public void putAll(Map<? extends K, ? extends V> map) {
        cache.putAll(map);
    }

    @Override
    public boolean putIfAbsent(K key, V value) {
        return cache.putIfAbsent(key, value);
    }

    @Override
    public boolean remove(K key) {
        return cache.remove(key);
    }

    @Override
    public void removeAll(Set<? extends K> keys) {
        cache.removeAll(keys);
    }

    @Override
    public <T> T unwrap(Class<T> clazz) {
        return cache.unwrap(clazz);
    }

    @Override
    public AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit) {
        return cache.tryLock(key, expire, timeUnit);
    }

    @Override
    public boolean tryLockAndRun(K key, long expire, TimeUnit timeUnit, Runnable action) {
        return cache.tryLockAndRun(key, expire, timeUnit, action);
    }

    @Override
    public CacheGetResult<V> GET(K key) {
        return cache.GET(key);
    }

    @Override
    public MultiGetResult<K, V> GET_ALL(Set<? extends K> keys) {
        return cache.GET_ALL(keys);
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader) {
        return cache.computeIfAbsent(key, loader);
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull) {
        return cache.computeIfAbsent(key, loader, cacheNullWhenLoaderReturnNull);
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull, long expireAfterWrite, TimeUnit timeUnit) {
        return cache.computeIfAbsent(key, loader, cacheNullWhenLoaderReturnNull, expireAfterWrite, timeUnit);
    }

    @Override
    public void put(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
        cache.put(key, value, expireAfterWrite, timeUnit);
    }

    @Override
    public CacheResult PUT(K key, V value) {
        return cache.PUT(key, value);
    }

    @Override
    public CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
        return cache.PUT(key, value, expireAfterWrite, timeUnit);
    }

    @Override
    public void putAll(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
        cache.putAll(map, expireAfterWrite, timeUnit);
    }

    @Override
    public CacheResult PUT_ALL(Map<? extends K, ? extends V> map) {
        return cache.PUT_ALL(map);
    }

    @Override
    public CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
        return cache.PUT_ALL(map, expireAfterWrite, timeUnit);
    }

    @Override
    public CacheResult REMOVE(K key) {
        return cache.REMOVE(key);
    }

    @Override
    public CacheResult REMOVE_ALL(Set<? extends K> keys) {
        return cache.REMOVE_ALL(keys);
    }

    @Override
    public CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
        return cache.PUT_IF_ABSENT(key, value, expireAfterWrite, timeUnit);
    }

    @Override
    public void close() {
        cache.close();
    }
}

SimpleProxyCache实现了ProxyCache接口,它定义了cache属性,其方法均委托给了cache对象

LoadingCache

jetcache-core/src/main/java/com/alicp/jetcache/LoadingCache.java

代码语言:javascript
复制
public class LoadingCache<K, V> extends SimpleProxyCache<K, V> {

    protected Consumer<CacheEvent> eventConsumer;

    protected CacheConfig<K, V> config;

    public LoadingCache(Cache<K, V> cache) {
        super(cache);
        this.config = config();
        eventConsumer = CacheUtil.getAbstractCache(cache)::notify;
    }

    @Override
    public V get(K key) throws CacheInvokeException {
        CacheLoader<K, V> loader = config.getLoader();
        if (loader != null) {
            return AbstractCache.computeIfAbsentImpl(key, loader,
                    config.isCacheNullValue() ,0, null, this);
        } else {
            return cache.get(key);
        }
    }

    protected boolean needUpdate(V loadedValue, CacheLoader<K, V> loader) {
        if (loadedValue == null && !config.isCacheNullValue()) {
            return false;
        }
        if (loader.vetoCacheUpdate()) {
            return false;
        }
        return true;
    }

    @Override
    public Map<K, V> getAll(Set<? extends K> keys) throws CacheInvokeException {
        CacheLoader<K, V> loader = config.getLoader();
        if (loader != null) {
            MultiGetResult<K, V> r = GET_ALL(keys);
            Map<K, V> kvMap;
            if (r.isSuccess() || r.getResultCode() == CacheResultCode.PART_SUCCESS) {
                kvMap = r.unwrapValues();
            } else {
                kvMap = new HashMap<>();
            }
            Set<K> keysNeedLoad = new LinkedHashSet<>();
            keys.forEach((k) -> {
                if (!kvMap.containsKey(k)) {
                    keysNeedLoad.add(k);
                }
            });
            if (!config.isCachePenetrationProtect()) {
                if (eventConsumer != null) {
                    loader = CacheUtil.createProxyLoader(cache, loader, eventConsumer);
                }
                Map<K, V> loadResult;
                try {
                    loadResult = loader.loadAll(keysNeedLoad);

                    CacheLoader<K, V> theLoader = loader;
                    Map<K, V> updateValues = new HashMap<>();
                    loadResult.forEach((k,v)->{
                        if (needUpdate(v, theLoader)){
                            updateValues.put(k, v);
                        }
                    });
                    // batch put
                    if (!updateValues.isEmpty()) {
                        PUT_ALL(updateValues);
                    }
                } catch (Throwable e) {
                    throw new CacheInvokeException(e);
                }
                kvMap.putAll(loadResult);
            } else {
                AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
                loader = CacheUtil.createProxyLoader(cache, loader, eventConsumer);
                for(K key : keysNeedLoad) {
                    Consumer<V> cacheUpdater = (v) -> {
                        if(needUpdate(v, config.getLoader())) {
                            PUT(key, v);
                        }
                    };
                    V v = AbstractCache.synchronizedLoad(config, abstractCache, key, loader, cacheUpdater);
                    kvMap.put(key, v);
                }
            }
            return kvMap;
        } else {
            return cache.getAll(keys);
        }

    }
}

LoadingCache继承了SimpleProxyCache,其get方法会判断config.getLoader()是否为null,不为null则执行AbstractCache.computeIfAbsentImpl(key, loader,config.isCacheNullValue() ,0, null, this);getAll方法多了loader以及config.isCachePenetrationProtect()的处理逻辑,对于开启cachePenetrationProtect的会通过CacheUtil.createProxyLoader包装一下loader,然后AbstractCache.synchronizedLoad去加载value

createProxyLoader

jetcache-core/src/main/java/com/alicp/jetcache/CacheUtil.java

代码语言:javascript
复制
public class CacheUtil {

    private interface ProxyLoader<K, V> extends CacheLoader<K, V> {
    }

    public static <K, V> ProxyLoader<K, V> createProxyLoader(Cache<K, V> cache,
                                                             CacheLoader<K, V> loader,
                                                             Consumer<CacheEvent> eventConsumer) {
        if (loader instanceof ProxyLoader) {
            return (ProxyLoader<K, V>) loader;
        }
        return new ProxyLoader<K, V>() {
            @Override
            public V load(K key) throws Throwable {
                long t = System.currentTimeMillis();
                V v = null;
                boolean success = false;
                try {
                    v = loader.load(key);
                    success = true;
                } finally {
                    t = System.currentTimeMillis() - t;
                    CacheLoadEvent event = new CacheLoadEvent(cache, t, key, v, success);
                    eventConsumer.accept(event);
                }
                return v;
            }

            @Override
            public Map<K, V> loadAll(Set<K> keys) throws Throwable {
                long t = System.currentTimeMillis();
                boolean success = false;
                Map<K, V> kvMap = null;
                try {
                    kvMap = loader.loadAll(keys);
                    success = true;
                } finally {
                    t = System.currentTimeMillis() - t;
                    CacheLoadAllEvent event = new CacheLoadAllEvent(cache, t, keys, kvMap, success);
                    eventConsumer.accept(event);
                }
                return kvMap;
            }

            @Override
            public boolean vetoCacheUpdate() {
                return loader.vetoCacheUpdate();
            }
        };
    }

    public static <K, V> ProxyLoader<K, V> createProxyLoader(Cache<K, V> cache,
                                                          Function<K, V> loader,
                                                          Consumer<CacheEvent> eventConsumer) {
        if (loader instanceof ProxyLoader) {
            return (ProxyLoader<K, V>) loader;
        }
        if (loader instanceof CacheLoader) {
            return createProxyLoader(cache, (CacheLoader) loader, eventConsumer);
        }
        return k -> {
            long t = System.currentTimeMillis();
            V v = null;
            boolean success = false;
            try {
                v = loader.apply(k);
                success = true;
            } finally {
                t = System.currentTimeMillis() - t;
                CacheLoadEvent event = new CacheLoadEvent(cache, t, k, v, success);
                eventConsumer.accept(event);
            }
            return v;
        };
    }


    public static <K, V> AbstractCache<K, V> getAbstractCache(Cache<K, V> c) {
        while (c instanceof ProxyCache) {
            c = ((ProxyCache) c).getTargetCache();
        }
        return (AbstractCache) c;
    }

}

CacheUtil提供了两个createProxyLoader,一个loader类型为CacheLoader,一个为Function;其load及loadAll方法委托给了loader,主要是增加了耗时统计并创建CacheLoadEvent,交给eventConsumer.accept(event)去消费

synchronizedLoad

jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java

代码语言:javascript
复制
    static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache,
                                     K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) {
        ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap();
        Object lockKey = buildLoaderLockKey(abstractCache, key);
        while (true) {
            boolean create[] = new boolean[1];
            LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
                create[0] = true;
                LoaderLock loaderLock = new LoaderLock();
                loaderLock.signal = new CountDownLatch(1);
                loaderLock.loaderThread = Thread.currentThread();
                return loaderLock;
            });
            if (create[0] || ll.loaderThread == Thread.currentThread()) {
                try {
                    CacheGetResult<V> getResult = abstractCache.GET(key);
                    if (getResult.isSuccess()) {
                        ll.success = true;
                        ll.value = getResult.getValue();
                        return getResult.getValue();
                    } else {
                        V loadedValue = newLoader.apply(key);
                        ll.success = true;
                        ll.value = loadedValue;
                        cacheUpdater.accept(loadedValue);
                        return loadedValue;
                    }
                } finally {
                    if (create[0]) {
                        ll.signal.countDown();
                        loaderMap.remove(lockKey);
                    }
                }
            } else {
                try {
                    Duration timeout = config.getPenetrationProtectTimeout();
                    if (timeout == null) {
                        ll.signal.await();
                    } else {
                        boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
                        if(!ok) {
                            logger.info("loader wait timeout:" + timeout);
                            return newLoader.apply(key);
                        }
                    }
                } catch (InterruptedException e) {
                    logger.warn("loader wait interrupted");
                    return newLoader.apply(key);
                }
                if (ll.success) {
                    return (V) ll.value;
                } else {
                    continue;
                }

            }
        }
    }

AbstractCache提供了synchronizedLoad静态方法,它先通过buildLoaderLockKey来创建lockKey,之后while循环,computeIfAbsent指定lockKey的LoaderLock,对于loaderThread为当前线程的先通过GET获取result,若为success则返回,否则通过loader加载,然后通知cacheUpdater,最后执行ll.signal.countDown(),对于非当前thread的则根据指定的timeout执行ll.signal.await(),若await超时或者InterruptedException则执行newLoader.apply,否则对于success的直接返回value,非success则继续循环处理

RefreshCache

jetcache-core/src/main/java/com/alicp/jetcache/RefreshCache.java

代码语言:javascript
复制
public class RefreshCache<K, V> extends LoadingCache<K, V> {

    private static final Logger logger = LoggerFactory.getLogger(RefreshCache.class);

    public static final byte[] LOCK_KEY_SUFFIX = "_#RL#".getBytes();
    public static final byte[] TIMESTAMP_KEY_SUFFIX = "_#TS#".getBytes();

    private ConcurrentHashMap<Object, RefreshTask> taskMap = new ConcurrentHashMap<>();

    private boolean multiLevelCache;

    public RefreshCache(Cache cache) {
        super(cache);
        multiLevelCache = isMultiLevelCache();
    }

    protected void stopRefresh() {
        List<RefreshTask> tasks = new ArrayList<>();
        tasks.addAll(taskMap.values());
        tasks.forEach(task -> task.cancel());
    }

    @Override
    public void close() {
        stopRefresh();
        super.close();
    }


    private boolean hasLoader() {
        return config.getLoader() != null;
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader) {
        return computeIfAbsent(key, loader, config().isCacheNullValue());
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull) {
        return AbstractCache.computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
                0, null, this);
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
                             long expireAfterWrite, TimeUnit timeUnit) {
        return AbstractCache.computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
                expireAfterWrite, timeUnit, this);
    }

    private boolean isMultiLevelCache() {
        Cache c = getTargetCache();
        while (c instanceof ProxyCache) {
            c = ((ProxyCache) c).getTargetCache();
        }
        return c instanceof MultiLevelCache;
    }

    private Object getTaskId(K key) {
        Cache c = concreteCache();
        if (c instanceof AbstractEmbeddedCache) {
            return ((AbstractEmbeddedCache) c).buildKey(key);
        } else if (c instanceof AbstractExternalCache) {
            byte[] bs = ((AbstractExternalCache) c).buildKey(key);
            return ByteBuffer.wrap(bs);
        } else {
            logger.error("can't getTaskId from " + c.getClass());
            return null;
        }
    }

    @Override
    public V get(K key) throws CacheInvokeException {
        if (config.getRefreshPolicy() != null && hasLoader()) {
            addOrUpdateRefreshTask(key, null);
        }
        return super.get(key);
    }

    @Override
    public Map<K, V> getAll(Set<? extends K> keys) throws CacheInvokeException {
        if (config.getRefreshPolicy() != null && hasLoader()) {
            for (K key : keys) {
                addOrUpdateRefreshTask(key, null);
            }
        }
        return super.getAll(keys);
    }

    //......

    private byte[] combine(byte[] bs1, byte[] bs2) {
        byte[] newArray = Arrays.copyOf(bs1, bs1.length + bs2.length);
        System.arraycopy(bs2, 0, newArray, bs1.length, bs2.length);
        return newArray;
    }
}

RefreshCache继承了LoadingCache,它定义了taskMap用于存储RefreshTask,它覆盖了computeIfAbsent方法,复用了AbstractCache定义的静态方法computeIfAbsentImpl,传入的cache为this;其get及getAll方法对于config.getRefreshPolicy()及config.getLoader()不为null的会额外执行addOrUpdateRefreshTask

addOrUpdateRefreshTask

代码语言:javascript
复制
    protected void addOrUpdateRefreshTask(K key, CacheLoader<K,V> loader) {
        RefreshPolicy refreshPolicy = config.getRefreshPolicy();
        if (refreshPolicy == null) {
            return;
        }
        long refreshMillis = refreshPolicy.getRefreshMillis();
        if (refreshMillis > 0) {
            Object taskId = getTaskId(key);
            RefreshTask refreshTask = taskMap.computeIfAbsent(taskId, tid -> {
                logger.debug("add refresh task. interval={},  key={}", refreshMillis , key);
                RefreshTask task = new RefreshTask(taskId, key, loader);
                task.lastAccessTime = System.currentTimeMillis();
                ScheduledFuture<?> future = JetCacheExecutor.heavyIOExecutor().scheduleWithFixedDelay(
                        task, refreshMillis, refreshMillis, TimeUnit.MILLISECONDS);
                task.future = future;
                return task;
            });
            refreshTask.lastAccessTime = System.currentTimeMillis();
        }
    }

addOrUpdateRefreshTask方法先获取refreshMillis,对于refreshMillis小于等于0的不处理,大于0的先通过getTaskId(key)获取taskId,然后通过taskMap.computeIfAbsent获取或者创建refreshTask,最后更新refreshTask的lastAccessTime;当新创建refreshTask时,会通过JJetCacheExecutor.heavyIOExecutor().scheduleWithFixedDelay(task, refreshMillis, refreshMillis, TimeUnit.MILLISECONDS)去调度该refreshTask,即每隔refreshMillis调度执行refreshTask

RefreshTask

代码语言:javascript
复制
    class RefreshTask implements Runnable {
        private Object taskId;
        private K key;
        private CacheLoader<K, V> loader;

        private long lastAccessTime;
        private ScheduledFuture future;

        RefreshTask(Object taskId, K key, CacheLoader<K, V> loader) {
            this.taskId = taskId;
            this.key = key;
            this.loader = loader;
        }

        private void cancel() {
            logger.debug("cancel refresh: {}", key);
            future.cancel(false);
            taskMap.remove(taskId);
        }

        private void load() throws Throwable {
            CacheLoader<K,V> l = loader == null? config.getLoader(): loader;
            if (l != null) {
                l = CacheUtil.createProxyLoader(cache, l, eventConsumer);
                V v = l.load(key);
                if (needUpdate(v, l)) {
                    cache.PUT(key, v);
                }
            }
        }

        private void externalLoad(final Cache concreteCache, final long currentTime)
                throws Throwable {
            byte[] newKey = ((AbstractExternalCache) concreteCache).buildKey(key);
            byte[] lockKey = combine(newKey, LOCK_KEY_SUFFIX);
            long loadTimeOut = RefreshCache.this.config.getRefreshPolicy().getRefreshLockTimeoutMillis();
            long refreshMillis = config.getRefreshPolicy().getRefreshMillis();
            byte[] timestampKey = combine(newKey, TIMESTAMP_KEY_SUFFIX);

            // AbstractExternalCache buildKey method will not convert byte[]
            CacheGetResult refreshTimeResult = concreteCache.GET(timestampKey);
            boolean shouldLoad = false;
            if (refreshTimeResult.isSuccess()) {
                shouldLoad = currentTime >= Long.parseLong(refreshTimeResult.getValue().toString()) + refreshMillis;
            } else if (refreshTimeResult.getResultCode() == CacheResultCode.NOT_EXISTS) {
                shouldLoad = true;
            }

            if (!shouldLoad) {
                if (multiLevelCache) {
                    refreshUpperCaches(key);
                }
                return;
            }

            Runnable r = () -> {
                try {
                    load();
                    // AbstractExternalCache buildKey method will not convert byte[]
                    concreteCache.put(timestampKey, String.valueOf(System.currentTimeMillis()));
                } catch (Throwable e) {
                    throw new CacheException("refresh error", e);
                }
            };

            // AbstractExternalCache buildKey method will not convert byte[]
            boolean lockSuccess = concreteCache.tryLockAndRun(lockKey, loadTimeOut, TimeUnit.MILLISECONDS, r);
            if(!lockSuccess && multiLevelCache) {
                JetCacheExecutor.heavyIOExecutor().schedule(
                        () -> refreshUpperCaches(key), (long)(0.2 * refreshMillis), TimeUnit.MILLISECONDS);
            }
        }

        private void refreshUpperCaches(K key) {
            MultiLevelCache<K, V> targetCache = (MultiLevelCache<K, V>) getTargetCache();
            Cache[] caches = targetCache.caches();
            int len = caches.length;

            CacheGetResult cacheGetResult = caches[len - 1].GET(key);
            if (!cacheGetResult.isSuccess()) {
                return;
            }
            for (int i = 0; i < len - 1; i++) {
                caches[i].PUT(key, cacheGetResult.getValue());
            }
        }

        @Override
        public void run() {
            try {
                if (config.getRefreshPolicy() == null || (loader == null && !hasLoader())) {
                    cancel();
                    return;
                }
                long now = System.currentTimeMillis();
                long stopRefreshAfterLastAccessMillis = config.getRefreshPolicy().getStopRefreshAfterLastAccessMillis();
                if (stopRefreshAfterLastAccessMillis > 0) {
                    if (lastAccessTime + stopRefreshAfterLastAccessMillis < now) {
                        logger.debug("cancel refresh: {}", key);
                        cancel();
                        return;
                    }
                }
                logger.debug("refresh key: {}", key);
                Cache concreteCache = concreteCache();
                if (concreteCache instanceof AbstractExternalCache) {
                    externalLoad(concreteCache, now);
                } else {
                    load();
                }
            } catch (Throwable e) {
                logger.error("refresh error: key=" + key, e);
            }
        }
    }

    protected Cache concreteCache() {
        Cache c = getTargetCache();
        while (true) {
            if (c instanceof ProxyCache) {
                c = ((ProxyCache) c).getTargetCache();
            } else if (c instanceof MultiLevelCache) {
                Cache[] caches = ((MultiLevelCache) c).caches();
                c = caches[caches.length - 1];
            } else {
                return c;
            }
        }
    }    

RefreshTask的run方法先判断stopRefreshAfterLastAccessMillis,若stopRefreshAfterLastAccessMillis大于0且lastAccessTime + stopRefreshAfterLastAccessMillis小于now则执行cancel,然后返回;之后通过concreteCache获取具体的cache,对于AbstractExternalCache类型的执行externalLoad,否则执行load load方法通过loader去加载,对于needUpdate的执行PUT去更新;对于externalLoad的会先获取refreshTimeResult,对于CacheResultCode.NOT_EXISTS的则shouldLoad为true,对于refreshTimeResult.isSuccess()的对于currentTime大于等于Long.parseLong(refreshTimeResult.getValue().toString()) + refreshMillis的则shouldLoad为true,对于非shouldLoad的且是multiLevelCache则执行refreshUpperCaches然后返回;否则使用concreteCache.tryLockAndRun去加锁然后执行load方法,若加锁不成功且是multiLevelCache的,则通过JetCacheExecutor.heavyIOExecutor().schedule去调度执行refreshUpperCaches

SimpleCacheManager.create

jetcache-core/src/main/java/com/alicp/jetcache/SimpleCacheManager.java

代码语言:javascript
复制
    private Cache create(QuickConfig config) {
        Cache cache;
        if (config.getCacheType() == null || config.getCacheType() == CacheType.REMOTE) {
            cache = buildRemote(config);
        } else if (config.getCacheType() == CacheType.LOCAL) {
            cache = buildLocal(config);
        } else {
            Cache local = buildLocal(config);
            Cache remote = buildRemote(config);


            boolean useExpireOfSubCache = config.getLocalExpire() != null;
            cache = MultiLevelCacheBuilder.createMultiLevelCacheBuilder()
                    .expireAfterWrite(remote.config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS)
                    .addCache(local, remote)
                    .useExpireOfSubCache(useExpireOfSubCache)
                    .cacheNullValue(config.getCacheNullValue() != null ?
                            config.getCacheNullValue() : DEFAULT_CACHE_NULL_VALUE)
                    .buildCache();
        }
        if (config.getRefreshPolicy() != null) {
            cache = new RefreshCache(cache);
        } else if (config.getLoader() != null) {
            cache = new LoadingCache(cache);
        }
        cache.config().setRefreshPolicy(config.getRefreshPolicy());
        cache.config().setLoader(config.getLoader());


        boolean protect = config.getPenetrationProtect() != null ? config.getPenetrationProtect()
                : cacheBuilderTemplate.isPenetrationProtect();
        cache.config().setCachePenetrationProtect(protect);
        cache.config().setPenetrationProtectTimeout(config.getPenetrationProtectTimeout());

        for (CacheMonitorInstaller i : cacheBuilderTemplate.getCacheMonitorInstallers()) {
            i.addMonitors(this, cache, config);
        }
        return cache;
    }

SimpleCacheManager的create方法对于config.getRefreshPolicy()不为null的则创建RefreshCache去包装cache,否则再对于config.getLoader()不为null使用LoadingCache去包装cache

小结

ProxyCache继承了Cache接口,它定义了getTargetCache方法,并默认实现了unwrap方法;SimpleProxyCache实现了ProxyCache接口,它定义了cache属性,其方法均委托给了cache对象

LoadingCache继承了SimpleProxyCache,其get方法会判断config.getLoader()是否为null,不为null则执行AbstractCache.computeIfAbsentImpl(key, loader,config.isCacheNullValue() ,0, null, this);getAll方法多了loader以及config.isCachePenetrationProtect()的处理逻辑,对于开启cachePenetrationProtect的会通过CacheUtil.createProxyLoader包装一下loader,然后AbstractCache.synchronizedLoad去加载value RefreshCache继承了LoadingCache,它定义了taskMap用于存储RefreshTask,它覆盖了computeIfAbsent方法,复用了AbstractCache定义的静态方法computeIfAbsentImpl,传入的cache为this;其get及getAll方法对于config.getRefreshPolicy()及config.getLoader()不为null的会额外执行addOrUpdateRefreshTask SimpleCacheManager的create方法根据QuickConfig先构造local或者remote或者multiLevelCache,再根据config的loader或者refreshPolicy去创建包装类,对于config.getRefreshPolicy()不为null的则创建RefreshCache去包装cache,否则再对于config.getLoader()不为null使用LoadingCache去包装cache

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ProxyCache
  • SimpleProxyCache
    • LoadingCache
      • createProxyLoader
      • synchronizedLoad
    • RefreshCache
      • addOrUpdateRefreshTask
      • RefreshTask
  • SimpleCacheManager.create
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档