本文主要研究一下jetcache的CacheMonitor
jetcache-core/src/main/java/com/alicp/jetcache/CacheMonitor.java
@FunctionalInterface
public interface CacheMonitor {
void afterOperation(CacheEvent event);
}
CacheMonitor是一个FunctionalInterface,它定义了afterOperation方法用于消费CacheEvent,它有两个实现类,分别是DefaultCacheMonitor、CacheNotifyMonitor
jetcache-core/src/main/java/com/alicp/jetcache/event/CacheEvent.java
public class CacheEvent {
protected Cache cache;
public CacheEvent(Cache cache) {
this.cache = cache;
}
public Cache getCache() {
return cache;
}
}
CacheEvent定义了cache属性,它有CacheGetEvent、CacheGetAllEvent、CacheLoadEvent、CacheLoadAllEvent、CachePutEvent、CachePutAllEvent、CacheRemoveEvent、CacheRemoveAllEvent这几个子类
jetcache-core/src/main/java/com/alicp/jetcache/support/DefaultCacheMonitor.java
public class DefaultCacheMonitor implements CacheMonitor {
private static final Logger logger = LoggerFactory.getLogger(DefaultCacheMonitor.class);
protected CacheStat cacheStat;
private String cacheName;
public DefaultCacheMonitor(String cacheName) {
if (cacheName == null) {
throw new NullPointerException();
}
this.cacheName = cacheName;
resetStat();
}
public String getCacheName() {
return cacheName;
}
public synchronized void resetStat() {
cacheStat = new CacheStat();
cacheStat.setStatStartTime(System.currentTimeMillis());
cacheStat.setCacheName(cacheName);
}
public synchronized CacheStat getCacheStat() {
CacheStat stat = cacheStat.clone();
stat.setStatEndTime(System.currentTimeMillis());
return stat;
}
@Override
public synchronized void afterOperation(CacheEvent event) {
if (event instanceof CacheGetEvent) {
CacheGetEvent e = (CacheGetEvent) event;
afterGet(e.getMillis(), e.getKey(), e.getResult());
} else if (event instanceof CachePutEvent) {
CachePutEvent e = (CachePutEvent) event;
afterPut(e.getMillis(), e.getKey(), e.getValue(), e.getResult());
} else if (event instanceof CacheRemoveEvent) {
CacheRemoveEvent e = (CacheRemoveEvent) event;
afterRemove(e.getMillis(), e.getKey(), e.getResult());
} else if (event instanceof CacheLoadEvent) {
CacheLoadEvent e = (CacheLoadEvent) event;
afterLoad(e.getMillis(), e.getKey(), e.getLoadedValue(), e.isSuccess());
} else if (event instanceof CacheGetAllEvent) {
CacheGetAllEvent e = (CacheGetAllEvent) event;
afterGetAll(e.getMillis(), e.getKeys(), e.getResult());
} else if (event instanceof CacheLoadAllEvent) {
CacheLoadAllEvent e = (CacheLoadAllEvent) event;
afterLoadAll(e.getMillis(), e.getKeys(), e.getLoadedValue(), e.isSuccess());
} else if (event instanceof CachePutAllEvent) {
CachePutAllEvent e = (CachePutAllEvent) event;
afterPutAll(e.getMillis(), e.getMap(), e.getResult());
} else if (event instanceof CacheRemoveAllEvent) {
CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;
afterRemoveAll(e.getMillis(), e.getKeys(), e.getResult());
}
}
private void afterGet(long millis, Object key, CacheGetResult result) {
cacheStat.minGetTime = Math.min(cacheStat.minGetTime, millis);
cacheStat.maxGetTime = Math.max(cacheStat.maxGetTime, millis);
cacheStat.getTimeSum += millis;
cacheStat.getCount++;
parseSingleGet(result);
}
private void parseSingleGet(CacheGetResult result) {
switch (result.getResultCode()) {
case SUCCESS:
cacheStat.getHitCount++;
break;
case NOT_EXISTS:
cacheStat.getMissCount++;
break;
case EXPIRED:
cacheStat.getExpireCount++;
break;
case FAIL:
cacheStat.getFailCount++;
break;
default:
logger.warn("jetcache get return unexpected code: " + result.getResultCode());
}
}
private void afterPut(long millis, Object key, Object value, CacheResult result) {
cacheStat.minPutTime = Math.min(cacheStat.minPutTime, millis);
cacheStat.maxPutTime = Math.max(cacheStat.maxPutTime, millis);
cacheStat.putTimeSum += millis;
cacheStat.putCount++;
switch (result.getResultCode()) {
case SUCCESS:
cacheStat.putSuccessCount++;
break;
case FAIL:
case PART_SUCCESS:
cacheStat.putFailCount++;
break;
case EXISTS:
break;
default:
logger.warn("jetcache PUT return unexpected code: " + result.getResultCode());
}
}
private void afterRemove(long millis, Object key, CacheResult result) {
cacheStat.minRemoveTime = Math.min(cacheStat.minRemoveTime, millis);
cacheStat.maxRemoveTime = Math.max(cacheStat.maxRemoveTime, millis);
cacheStat.removeTimeSum += millis;
cacheStat.removeCount++;
switch (result.getResultCode()) {
case SUCCESS:
case NOT_EXISTS:
cacheStat.removeSuccessCount++;
break;
case FAIL:
case PART_SUCCESS:
cacheStat.removeFailCount++;
break;
default:
logger.warn("jetcache REMOVE return unexpected code: " + result.getResultCode());
}
}
private void afterLoad(long millis, Object key, Object loadedValue, boolean success) {
cacheStat.minLoadTime = Math.min(cacheStat.minLoadTime, millis);
cacheStat.maxLoadTime = Math.max(cacheStat.maxLoadTime, millis);
cacheStat.loadTimeSum += millis;
cacheStat.loadCount++;
if (success) {
cacheStat.loadSuccessCount++;
} else {
cacheStat.loadFailCount++;
}
}
private void afterLoadAll(long millis, Set keys, Map loadedValue, boolean success) {
if (keys == null) {
return;
}
int count = keys.size();
cacheStat.minLoadTime = Math.min(cacheStat.minLoadTime, millis);
cacheStat.maxLoadTime = Math.max(cacheStat.maxLoadTime, millis);
cacheStat.loadTimeSum += millis;
cacheStat.loadCount += count;
if (success) {
cacheStat.loadSuccessCount += count;
} else {
cacheStat.loadFailCount += count;
}
}
private void afterGetAll(long millis, Set keys, MultiGetResult result) {
if (keys == null) {
return;
}
int keyCount = keys.size();
cacheStat.minGetTime = Math.min(cacheStat.minGetTime, millis);
cacheStat.maxGetTime = Math.max(cacheStat.maxGetTime, millis);
cacheStat.getTimeSum += millis;
cacheStat.getCount += keyCount;
Map resultValues = result.getValues();
if (resultValues == null) {
cacheStat.getFailCount += keyCount;
} else {
for (Object singleResult : resultValues.values()) {
CacheGetResult r = ((CacheGetResult) singleResult);
parseSingleGet(r);
}
}
}
private void afterRemoveAll(long millis, Set keys, CacheResult result) {
if (keys == null) {
return;
}
int keyCount = keys.size();
cacheStat.minRemoveTime = Math.min(cacheStat.minRemoveTime, millis);
cacheStat.maxRemoveTime = Math.max(cacheStat.maxRemoveTime, millis);
cacheStat.removeTimeSum += millis;
cacheStat.removeCount += keyCount;
if (result.isSuccess()) {
cacheStat.removeSuccessCount += keyCount;
} else {
cacheStat.removeFailCount += keyCount;
}
}
private void afterPutAll(long millis, Map map, CacheResult result) {
if (map == null) {
return;
}
int keyCount = map.size();
cacheStat.minPutTime = Math.min(cacheStat.minPutTime, millis);
cacheStat.maxPutTime = Math.max(cacheStat.maxPutTime, millis);
cacheStat.putTimeSum += millis;
cacheStat.putCount += keyCount;
if (result.isSuccess()) {
cacheStat.putSuccessCount += keyCount;
} else {
cacheStat.putFailCount += keyCount;
}
}
}
DefaultCacheMonitor实现了CacheMonitor接口,其afterOperation方法根据CacheEvent的具体类型来执行不同的处理逻辑,CacheGetEvent执行afterGet,CachePutEvent执行afterPut,CacheRemoveEvent执行afterRemove,CacheLoadEvent执行afterLoad,CacheGetAllEvent执行afterGetAll,CacheLoadAllEvent执行afterLoadAll,CachePutAllEvent执行afterPutAll,CacheRemoveAllEvent执行afterRemoveAll;这些方法主要是维护CacheStat的相关属性
jetcache-core/src/main/java/com/alicp/jetcache/support/CacheStat.java
public class CacheStat implements Serializable, Cloneable {
private static final long serialVersionUID = -8802969946750554026L;
protected String cacheName;
protected long statStartTime;
protected long statEndTime;
protected long getCount;
protected long getHitCount;
protected long getMissCount;
protected long getFailCount;
protected long getExpireCount;
protected long getTimeSum;
protected long minGetTime = Long.MAX_VALUE;
protected long maxGetTime = 0;
protected long putCount;
protected long putSuccessCount;
protected long putFailCount;
protected long putTimeSum;
protected long minPutTime = Long.MAX_VALUE;
protected long maxPutTime = 0;
protected long removeCount;
protected long removeSuccessCount;
protected long removeFailCount;
protected long removeTimeSum;
protected long minRemoveTime = Long.MAX_VALUE;
protected long maxRemoveTime = 0;
protected long loadCount;
protected long loadSuccessCount;
protected long loadFailCount;
protected long loadTimeSum;
protected long minLoadTime = Long.MAX_VALUE;
protected long maxLoadTime = 0;
//......
}
CacheStat定义了get、put、remove、load这几类动作的相关指标
jetcache-core/src/main/java/com/alicp/jetcache/support/CacheNotifyMonitor.java
public class CacheNotifyMonitor implements CacheMonitor {
private final BroadcastManager broadcastManager;
private final String area;
private final String cacheName;
private final String sourceId;
public CacheNotifyMonitor(CacheManager cacheManager, String area, String cacheName) {
this.broadcastManager = cacheManager.getBroadcastManager(area);
this.area = area;
this.cacheName = cacheName;
if (broadcastManager != null) {
this.sourceId = broadcastManager.getSourceId();
} else {
this.sourceId = null;
}
}
public CacheNotifyMonitor(CacheManager cacheManager, String cacheName) {
this(cacheManager, CacheConsts.DEFAULT_AREA, cacheName);
}
private Object convertKey(Object key, AbstractEmbeddedCache localCache) {
Function keyConvertor = localCache.config().getKeyConvertor();
if (keyConvertor == null) {
return key;
} else {
return keyConvertor.apply(key);
}
}
private AbstractEmbeddedCache getLocalCache(AbstractCache absCache) {
if (!(absCache instanceof MultiLevelCache)) {
return null;
}
for (Cache c : ((MultiLevelCache) absCache).caches()) {
if (c instanceof AbstractEmbeddedCache) {
return (AbstractEmbeddedCache) c;
}
}
return null;
}
@Override
public void afterOperation(CacheEvent event) {
if (this.broadcastManager == null) {
return;
}
AbstractCache absCache = CacheUtil.getAbstractCache(event.getCache());
if (absCache.isClosed()) {
return;
}
AbstractEmbeddedCache localCache = getLocalCache(absCache);
if (localCache == null) {
return;
}
if (event instanceof CachePutEvent) {
CacheMessage m = new CacheMessage();
m.setArea(area);
m.setCacheName(cacheName);
m.setSourceId(sourceId);
CachePutEvent e = (CachePutEvent) event;
m.setType(CacheMessage.TYPE_PUT);
m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});
broadcastManager.publish(m);
} else if (event instanceof CacheRemoveEvent) {
CacheMessage m = new CacheMessage();
m.setArea(area);
m.setCacheName(cacheName);
m.setSourceId(sourceId);
CacheRemoveEvent e = (CacheRemoveEvent) event;
m.setType(CacheMessage.TYPE_REMOVE);
m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});
broadcastManager.publish(m);
} else if (event instanceof CachePutAllEvent) {
CacheMessage m = new CacheMessage();
m.setArea(area);
m.setCacheName(cacheName);
m.setSourceId(sourceId);
CachePutAllEvent e = (CachePutAllEvent) event;
m.setType(CacheMessage.TYPE_PUT_ALL);
if (e.getMap() != null) {
m.setKeys(e.getMap().keySet().stream().map(k -> convertKey(k, localCache)).toArray());
}
broadcastManager.publish(m);
} else if (event instanceof CacheRemoveAllEvent) {
CacheMessage m = new CacheMessage();
m.setArea(area);
m.setCacheName(cacheName);
m.setSourceId(sourceId);
CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;
m.setType(CacheMessage.TYPE_REMOVE_ALL);
if (e.getKeys() != null) {
m.setKeys(e.getKeys().stream().map(k -> convertKey(k, localCache)).toArray());
}
broadcastManager.publish(m);
}
}
}
CacheNotifyMonitor实现了CacheMonitor接口,其构造器通过cacheManager获取broadcastManager,其afterOperation对于broadcastManager为null或者cache是close的或者localCache为null不做处理,否则根据不同的event具体类型来构建不同的type的CacheMessage,最后通过broadcastManager.publish(m)去发布消息
jetcache-core/src/main/java/com/alicp/jetcache/support/BroadcastManager.java
public abstract class BroadcastManager implements AutoCloseable {
private static Logger logger = LoggerFactory.getLogger(BroadcastManager.class);
private final String sourceId = UUID.randomUUID().toString();
private final CacheManager cacheManager;
public BroadcastManager(CacheManager cacheManager) {
this.cacheManager = cacheManager;
}
protected void checkConfig(ExternalCacheConfig config) {
if (config.getBroadcastChannel() == null) {
throw new CacheConfigException("BroadcastChannel not set");
}
if (config.getValueEncoder() == null) {
throw new CacheConfigException("no value encoder");
}
if (config.getValueDecoder() == null) {
throw new CacheConfigException("no value decoder");
}
}
public abstract CacheResult publish(CacheMessage cacheMessage);
public abstract void startSubscribe();
@Override
public void close() throws Exception {
}
//......
}
BroadcastManager是个抽象类,实现了AutoCloseable接口,其close方法默认为空实现,它定义了publish及startSubscribe两个抽象方法;它主要有MockRemoteCacheBuilder的匿名实现、RedisBroadcastManager、LettuceBroadcastManager、SpringDataBroadcastManager、RedissonBroadcastManager这几个实现
jetcache-core/src/main/java/com/alicp/jetcache/template/CacheMonitorInstaller.java
public interface CacheMonitorInstaller {
void addMonitors(CacheManager cacheManager, Cache cache, QuickConfig quickConfig);
}
CacheMonitorInstaller接口定义了addMonitors方法,它有MetricsMonitorInstaller、NotifyMonitorInstaller两个实现类
jetcache-core/src/main/java/com/alicp/jetcache/template/MetricsMonitorInstaller.java
public class MetricsMonitorInstaller extends AbstractLifecycle implements CacheMonitorInstaller {
private final Consumer<StatInfo> metricsCallback;
private final Duration interval;
private DefaultMetricsManager metricsManager;
public MetricsMonitorInstaller(Consumer<StatInfo> metricsCallback, Duration interval) {
this.metricsCallback = metricsCallback;
this.interval = interval;
}
@Override
protected void doInit() {
if (metricsCallback != null && interval != null) {
metricsManager = new DefaultMetricsManager((int) interval.toMinutes(),
TimeUnit.MINUTES, metricsCallback);
metricsManager.start();
}
}
@Override
protected void doShutdown() {
if (metricsManager != null) {
metricsManager.stop();
metricsManager.clear();
metricsManager = null;
}
}
@Override
public void addMonitors(CacheManager cacheManager, Cache cache, QuickConfig quickConfig) {
if (metricsManager == null) {
return;
}
cache = CacheUtil.getAbstractCache(cache);
if (cache instanceof MultiLevelCache) {
MultiLevelCache mc = (MultiLevelCache) cache;
if (mc.caches().length == 2) {
Cache local = mc.caches()[0];
Cache remote = mc.caches()[1];
DefaultCacheMonitor localMonitor = new DefaultCacheMonitor(quickConfig.getName() + "_local");
local.config().getMonitors().add(localMonitor);
DefaultCacheMonitor remoteMonitor = new DefaultCacheMonitor(quickConfig.getName() + "_remote");
remote.config().getMonitors().add(remoteMonitor);
metricsManager.add(localMonitor, remoteMonitor);
}
}
DefaultCacheMonitor monitor = new DefaultCacheMonitor(quickConfig.getName());
cache.config().getMonitors().add(monitor);
metricsManager.add(monitor);
}
}
MetricsMonitorInstaller实现了CacheMonitorInstaller接口,其addMonitors方法会创建DefaultCacheMonitor并添加到cache的config中,对于MultiLevelCache会分别创建localMonitor、remoteMonitor并添加到对应cache的config中
jetcache-core/src/main/java/com/alicp/jetcache/template/NotifyMonitorInstaller.java
public class NotifyMonitorInstaller implements CacheMonitorInstaller {
private final Function<String, CacheBuilder> remoteBuilderTemplate;
public NotifyMonitorInstaller(Function<String, CacheBuilder> remoteBuilderTemplate) {
this.remoteBuilderTemplate = remoteBuilderTemplate;
}
@Override
public void addMonitors(CacheManager cacheManager, Cache cache, QuickConfig quickConfig) {
if (quickConfig.getSyncLocal() == null || !quickConfig.getSyncLocal()) {
return;
}
if (!(CacheUtil.getAbstractCache(cache) instanceof MultiLevelCache)) {
return;
}
String area = quickConfig.getArea();
final ExternalCacheBuilder cacheBuilder = (ExternalCacheBuilder) remoteBuilderTemplate.apply(area);
if (cacheBuilder == null || !cacheBuilder.supportBroadcast()
|| cacheBuilder.getConfig().getBroadcastChannel() == null) {
return;
}
if (cacheManager.getBroadcastManager(area) == null) {
BroadcastManager cm = cacheBuilder.createBroadcastManager(cacheManager);
if (cm != null) {
cm.startSubscribe();
cacheManager.putBroadcastManager(area, cm);
}
}
CacheMonitor monitor = new CacheNotifyMonitor(cacheManager, area, quickConfig.getName());
cache.config().getMonitors().add(monitor);
}
}
NotifyMonitorInstaller实现了CacheMonitorInstaller接口,其addMonitors方法对于BroadcastManager为null的不做处理,否则执行其startSubscribe方法,然后添加到cacheManager,最后创建CacheNotifyMonitor,添加到cache的config中
public class ConfigProvider extends AbstractLifecycle {
private static final Logger logger = LoggerFactory.getLogger(ConfigProvider.class);
@Resource
protected GlobalCacheConfig globalCacheConfig;
protected EncoderParser encoderParser;
protected KeyConvertorParser keyConvertorParser;
private Consumer<StatInfo> metricsCallback;
private CacheBuilderTemplate cacheBuilderTemplate;
public ConfigProvider() {
encoderParser = new DefaultEncoderParser();
keyConvertorParser = new DefaultKeyConvertorParser();
metricsCallback = new StatInfoLogger(false);
}
@Override
protected void doInit() {
cacheBuilderTemplate = new CacheBuilderTemplate(globalCacheConfig.isPenetrationProtect(),
globalCacheConfig.getLocalCacheBuilders(), globalCacheConfig.getRemoteCacheBuilders());
for (CacheBuilder builder : globalCacheConfig.getLocalCacheBuilders().values()) {
EmbeddedCacheBuilder eb = (EmbeddedCacheBuilder) builder;
if (eb.getConfig().getKeyConvertor() instanceof ParserFunction) {
ParserFunction f = (ParserFunction) eb.getConfig().getKeyConvertor();
eb.setKeyConvertor(parseKeyConvertor(f.getValue()));
}
}
for (CacheBuilder builder : globalCacheConfig.getRemoteCacheBuilders().values()) {
ExternalCacheBuilder eb = (ExternalCacheBuilder) builder;
if (eb.getConfig().getKeyConvertor() instanceof ParserFunction) {
ParserFunction f = (ParserFunction) eb.getConfig().getKeyConvertor();
eb.setKeyConvertor(parseKeyConvertor(f.getValue()));
}
if (eb.getConfig().getValueEncoder() instanceof ParserFunction) {
ParserFunction f = (ParserFunction) eb.getConfig().getValueEncoder();
eb.setValueEncoder(parseValueEncoder(f.getValue()));
}
if (eb.getConfig().getValueDecoder() instanceof ParserFunction) {
ParserFunction f = (ParserFunction) eb.getConfig().getValueDecoder();
eb.setValueDecoder(parseValueDecoder(f.getValue()));
}
}
initCacheMonitorInstallers();
}
//......
}
ConfigProvider继承了AbstractLifecycle,它覆盖了doInit方法,该方法先创建cacheBuilderTemplate,之后根据globalCacheConfig.getLocalCacheBuilders()去设置setKeyConvertor,根据globalCacheConfig.getRemoteCacheBuilders()设置setKeyConvertor、setValueEncoder、setValueDecoder,最后执行initCacheMonitorInstallers方法
protected void initCacheMonitorInstallers() {
cacheBuilderTemplate.getCacheMonitorInstallers().add(metricsMonitorInstaller());
cacheBuilderTemplate.getCacheMonitorInstallers().add(notifyMonitorInstaller());
for (CacheMonitorInstaller i : cacheBuilderTemplate.getCacheMonitorInstallers()) {
if (i instanceof AbstractLifecycle) {
((AbstractLifecycle) i).init();
}
}
}
protected CacheMonitorInstaller metricsMonitorInstaller() {
Duration interval = null;
if (globalCacheConfig.getStatIntervalMinutes() > 0) {
interval = Duration.ofMinutes(globalCacheConfig.getStatIntervalMinutes());
}
MetricsMonitorInstaller i = new MetricsMonitorInstaller(metricsCallback, interval);
i.init();
return i;
}
protected CacheMonitorInstaller notifyMonitorInstaller() {
return new NotifyMonitorInstaller(area -> globalCacheConfig.getRemoteCacheBuilders().get(area));
}
initCacheMonitorInstallers会执行metricsMonitorInstaller、notifyMonitorInstaller并添加到cacheBuilderTemplate.getCacheMonitorInstallers(),最后遍历cacheBuilderTemplate.getCacheMonitorInstallers()对于是AbstractLifecycle类型的挨个执行init方法 metricsMonitorInstaller方法创建MetricsMonitorInstaller并执行其init方法;notifyMonitorInstaller方法创建NotifyMonitorInstaller
DefaultCacheMonitor实现了CacheMonitor接口,其afterOperation方法根据CacheEvent的具体类型来执行不同的处理逻辑,主要是维护CacheStat的相关属性 CacheNotifyMonitor实现了CacheMonitor接口,根据不同的event具体类型来构建不同的type的CacheMessage,最后通过broadcastManager.publish(m)去发布消息 BroadcastManager是个抽象类,主要有MockRemoteCacheBuilder的匿名实现、RedisBroadcastManager、LettuceBroadcastManager、SpringDataBroadcastManager、RedissonBroadcastManager这几个实现