本系列课程主要针对于Ehcache缓存框架功能的开发实践全流程技术指南!
在高并发的情况下,使用Ehcache缓存时,由于并发的读与写,我们读的数据有可能是错误的,我们写的数据也有可能意外的被覆盖。所幸的是Ehcache为我们提供了针对于缓存元素Key的Read(读)、Write(写)锁。当一个线程获取了某一Key的Read锁之后,其它线程获取针对于同一个Key的Read锁不会受到限制,但其它线程(包括获取了该Key的Read锁的线程)如果想获取针对同一个Key的Write锁就不行,它需要等到针对于该Key的Read锁释放后才能获取其Write锁;当一个线程获取了某一Key的Write锁之后,其它线程获取同一个Key的Read锁或者Write锁的请求将等待针对于该Key的Write锁释放后才能继续进行,但是同一个线程获取该Key对应的Read锁或者Write锁将不需要等待。获取了对应的锁之后,记得在不再需要该锁后释放该锁。并且需要注意不要引起死锁。
在Ehcache接口中为我们定义了几个与Read、Write锁相关的方法,具体方法如下所示:
public interface Ehcache {
/**
* 获取给定Key的Read锁
* @param key
*/
public void acquireReadLockOnKey(Object key);
/**
* 获取给定Key的Write锁
* @param key
*/
public void acquireWriteLockOnKey(Object key);
/**
* 尝试着获取给定Key的Read锁,如果在给定timeout时间内还没有获取到对应的Read锁,则返回false,否则返回true。
* @param key
* @param timeout 超时时间,单位是毫秒
* @return表示是否获取到了对应的Read锁
* @throws InterruptedException
*/
public boolean tryReadLockOnKey(Object key, long timeout) throws InterruptedException;
/**
* 尝试着获取给定Key的Write锁,如果在给定timeout时间内还没有获取到对应的Write锁,则返回false,否则返回true。
* @param key
* @param timeout 超时时间,单位是毫秒
* @return表示是否获取到了对应的Write锁
* @throws InterruptedException
*/
public boolean tryWriteLockOnKey(Object key, long timeout) throws InterruptedException;
/**
* 释放所持有的给定Key的Read锁
* @param key
*/
public void releaseReadLockOnKey(Object key);
/**
* 释放所持有的给定Key的Write锁
* @param key
*/
public void releaseWriteLockOnKey(Object key);
}
我们常用的Cache类已经为我们实现了这些方法,我们可以直接在程序中进行使用。以下是直接在程序中使用锁的一个简单示例。
@Test
public void test() {
CacheManager cacheManager = CacheManager.create();
cacheManager.addCache("test");
Cache cache = cacheManager.getCache("test");
final String key = "abc";
cache.acquireWriteLockOnKey(key);
try {
cache.put(new Element(key, "123"));
} finally {
System.out.println(cache.get(key));
cache.releaseWriteLockOnKey(key);
}
}
记得需要在合适的时候释放所获取的锁。
在上一节我们提到了显示使用Ehcache锁的问题,其实我们还可以隐式的来使用Ehcache的锁,那就是通过BlockingCache。BlockingCache是Ehcache的一个封装类,可以让我们对Ehcache进行并发操作。其内部的锁机制是使用的net.sf.ehcache.concurrent.ReadWriteLockSync,与显示锁调用是一样的实现,而ReadWriteLockSync内部使用的是Java的java.util.concurrent.locks.ReadWriteLock。
BlockingCache拥有两个构造函数,它们都接收一个Ehcache对象,其中一个还接收一个指定并发数量的参数numberOfStripes,另一个没有numberOfStripes参数,但其将使用默认值,默认值为2048。numberOfStripes的值必须大于0,且为2的指数。接收的参数cache表示真正进行操作的Ehcache对象,BlockingCache只是对其进行了封装,使其支持并发操作。
public BlockingCache(final Ehcache cache, int numberOfStripes) throws CacheException {
super(cache);
this.stripes = numberOfStripes;
this.cacheLockProviderReference = new AtomicReference<CacheLockProvider>();
}
public BlockingCache(final Ehcache cache) throws CacheException {
this(cache, StripedReadWriteLockSync.DEFAULT_NUMBER_OF_MUTEXES);
}
虽然我们的Ehcache是支持锁调用的,但BlockingCache与显示锁调用不同的是它不是直接通过所持有的Ehcache来获取锁和释放锁的,而是由其内部完成的。
在从BlockingCache中get元素时,是支持并发读的,这没有问题,但如果对应key对应的元素不存在,则线程将被阻塞,直到调用了对应的put()方法存放了一个对应的key的元素为止。这是怎么做到的呢?我们来看一下BlockingCache的源码。
public Element get(final Object key) throws RuntimeException, LockTimeoutException {
getObserver.begin();
Sync lock = getLockForKey(key);
acquiredLockForKey(key, lock, LockType.READ);
Element element;
try {
element = underlyingCache.get(key);
} finally {
lock.unlock(LockType.READ);
}
if (element == null) {
acquiredLockForKey(key, lock, LockType.WRITE);
element = underlyingCache.get(key);
if (element != null) {
lock.unlock(LockType.WRITE);
getObserver.end(GetOutcome.HIT);
} else {
getObserver.end(GetOutcome.MISS_AND_LOCKED);
}
return element;
} else {
getObserver.end(GetOutcome.HIT);
return element;
}
}
上述是BlockingCache的核心get()方法。我们可以看到首先我们获取到了对应key的Sync,Sync是一个接口,其实现类通过持有的Lock对象可以对对应的key进行Read Lock或Write Lock。另外有一点需要注意的是对于同一个key而言,我们使用的是同一个Lock对象。通过上一节对Ehcache显示锁介绍,我们知道Read Lock之间是不会阻塞的。所以当我们在试图get一个元素时:
通过上面的代码和分析我们知道,如果在利用BlockingCache的get()方法获取一个元素时,如果对应的元素不存在,则除最终获取到Write锁的线程以外的线程都将被阻塞,而获取到了对应key的Write锁的线程该如何释放其Write锁呢?这是通过往BlockingCache中put一个对应key的元素来释放的。BlockingCache是实现了Ehcache接口的,所以Ehcache拥有的put*()方法,BlockingCache都有,但是在BlockingCache的put*()方法中都加入了一个doAndReleaseWriteLock的逻辑。我们先来看一个put()方法的实现。
public void put(final Element element) {
doAndReleaseWriteLock(new PutAction<Void>(element) {
@Override
public Void put() {
if (element.getObjectValue() != null) {
underlyingCache.put(element);
} else {
underlyingCache.remove(element.getObjectKey());
}
returnnull;
}
});
}
我们可以看到在该put()方法内部调用了一个doAndReleaseWriteLock()方法,从该方法名以及其接收的参数我们可以看出,doAndReleaseWriteLock()方法的作用就是执行接收的参数PutAction的put()方法,然后释放对应key的Write锁,而且PutAction的构造是接收一个Element参数的,这样在PutAction中的put()方法中我们就可以使用该Element对象了。
doAndReleaseWriteLock()方法的实现如下所示。
1.private <V> V doAndReleaseWriteLock(PutAction<V> putAction) {
2.
3. if (putAction.element == null) {
4. returnnull;
5. }
6. Object key = putAction.element.getObjectKey();
7. Sync lock = getLockForKey(key);
8. if (!lock.isHeldByCurrentThread(LockType.WRITE)) {
9. lock.lock(LockType.WRITE);
10. }
try {
return putAction.put();
} finally {
//Release the writelock here. This will have been acquired in the get, where the element was null
lock.unlock(LockType.WRITE);
}
}
从源代码我们可以看到,其内部实现跟我们设想的差不多。在PutAction所持有的Element不为null的情况下会判断当前线程是否持有对应key的Write锁,如果没有对应key的Write锁,则将试图获取其Write锁,这个时候如果该key的Write锁已经被别的线程获取了,则在这里将进行阻塞。拥有了Write锁之后就可以执行PutAction对象的put()方法了,执行完后就可以释放对应key的Write锁了。
回过头来看,之前从BlockingCache中get元素时,如果对应元素不存在,则该线程将获取到对应key的Write锁(并发情况下,究竟是哪一个线程会获取到该key的Write锁是不定的),将使其它试图获取该key的Write锁或Read锁的线程阻塞。如果该线程此时往BlockingCache中put一个对应key的元素,则该线程所持有的Write锁将会释放,其它线程可以顺利的获取该key的Read锁和Write锁,即可以顺利的调用BlockingCache的get()方法获取对应的元素。