聊聊JetCache的CachePenetrationProtect
序
本文主要研究一下JetCache的CachePenetrationProtect
CachePenetrationProtect
com/alicp/jetcache/anno/CachePenetrationProtect.java
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.FIELD})
public @interface CachePenetrationProtect {boolean value() default true;int timeout() default CacheConsts.UNDEFINED_INT;TimeUnit timeUnit() default TimeUnit.SECONDS;
}
它定义value、timeout、timeUnit属性
computeIfAbsentImpl
com/alicp/jetcache/AbstractCache.java
static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify);CacheGetResult<V> r;if (cache instanceof RefreshCache) {RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache);r = refreshCache.GET(key);refreshCache.addOrUpdateRefreshTask(key, newLoader);} else {r = cache.GET(key);}if (r.isSuccess()) {return r.getValue();} else {Consumer<V> cacheUpdater = (loadedValue) -> {if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {if (timeUnit != null) {cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult();} else {cache.PUT(key, loadedValue).waitForResult();}}};V loadedValue;if (cache.config().isCachePenetrationProtect()) {loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater);} else {loadedValue = newLoader.apply(key);cacheUpdater.accept(loadedValue);}return loadedValue;}}
AbstractCache的computeIfAbsentImpl方法,在cache.config().isCachePenetrationProtect()执行的是synchronizedLoad
synchronizedLoad
static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache,K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) {ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap();Object lockKey = buildLoaderLockKey(abstractCache, key);while (true) {boolean create[] = new boolean[1];LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {create[0] = true;LoaderLock loaderLock = new LoaderLock();loaderLock.signal = new CountDownLatch(1);loaderLock.loaderThread = Thread.currentThread();return loaderLock;});if (create[0] || ll.loaderThread == Thread.currentThread()) {try {V loadedValue = newLoader.apply(key);ll.success = true;ll.value = loadedValue;cacheUpdater.accept(loadedValue);return loadedValue;} finally {if (create[0]) {ll.signal.countDown();loaderMap.remove(lockKey);}}} else {try {Duration timeout = config.getPenetrationProtectTimeout();if (timeout == null) {ll.signal.await();} else {boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS);if(!ok) {logger.info("loader wait timeout:" + timeout);return newLoader.apply(key);}}} catch (InterruptedException e) {logger.warn("loader wait interrupted");return newLoader.apply(key);}if (ll.success) {return (V) ll.value;} else {continue;}}}}
synchronizedLoad会判断如果loaderThread不是当前线程,则通过ll.signal.await进行等待,等待timeout或者InterruptedException时则执行加载newLoader.apply(key)
小结
JetCache提供了@CachePenetrationProtect注解,支持多线程并发去回源的时候,控制在指定超时时间内整个JVM中只有1个去回源,过了超时时间若未有结果则自行加载,若已有结果则返回loader线程加载的结果。
doc
- MethodCache