现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁(读多写少)。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源(读读可以并发);但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了(读写,写读,写写互斥)。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。
针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它内部,维护了 一对相关的锁,一个用于只读操作,称为读锁;一个用于写入操作,称为写锁。
线程进入读锁的前提条件:
线程进入写锁的前提条件:
读写锁有以下三个重要的特性:
看源码需要了解三个核心问题:
读写锁是怎样实现分别记录读写状态的?
写锁时怎么获取和释放的?
读锁时怎么获取和释放的?
首先看它的类信息:
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
}
可以发现该类实现了ReadWriteLock这个接口
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
该接口就实现了读锁和写锁的规范,以下就是相关的类图
下面看看ReentrantReadWriteLock的读写锁的实现逻辑,首先看写锁:
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
然后看看写锁是怎么实现的
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock( ) {
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
下面我们需要思考一个核心问题,读写锁的状态是怎么用AQS底层的state状态来维护的。其实这里的核心问题就是,如何用一个变量维护多种状态。在 ReentrantLock 中,使用 Sync ( 实际是 AQS )的 int 类型的 state 来表示同步状态,表示锁被一个线程重复获取的次数。但是,读写锁 ReentrantReadWriteLock 内部维护着一对读写锁,如果要用一个变量维护多种状态,需要采用“按位切割使用”的方式来维护这个变量,将其切分为两部分:高16为表示读,低16为表示写。分割之后,读写锁是如何迅速确定读锁和写锁的状态呢? 其实底层是通过位运算来实现的。假如当前同步状态为S, 那么写状态,等于 S & 0x0000FFFF(将高 16 位全部抹去)。 当写状态加1,等于S+1。读状态,等于 S >>> 16 (无符号补 0 右移 16 位)。当读状态加1,等于 S+(1<<16),也就是S+0x00010000。根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。
源码如下:
//该部分在Sync类中
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
HoldCounter计数器
读锁的内在机制其实就是一个共享锁。一次共享锁的操作就相当于对HoldCounter 计数器的操作。获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
写锁的获取
写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程, 则当前线程进入等待状态。
protected final boolean tryAcquire(int acquires) {
//获取当前线程
Thread current = Thread.currentThread();
//获取当前state的值
int c = getState();
//获取写锁的重入次数
int w = exclusiveCount(c);
//state!=0则表示当前有写锁或读锁
if (c != 0) {
//判断是否是重入
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//获取重入锁
setState(c + acquires);
return true;
}
// writerShouldBlock有公平与非公平的实现, 非公平返回false,会尝试通过cas加锁,c==0 写锁未被任何线程获取,当前线程是否阻塞或者cas尝试获取锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//设置锁由当前线程独占
setExclusiveOwnerThread(current);
return true;
}
上面简单的代码就实现了下面的逻辑:
写锁的释放
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//设置状态
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//如果state为0,表示释放写锁
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
读锁的获取
protected final int tryAcquireShared(int unused) {
//获取当前线程
Thread current = Thread.currentThread();
//获取state
int c = getState();
//exclusiveCount(c) != 0 判断是否有写锁
//getExclusiveOwnerThread() != current),判断当前线程是否是写锁的持有者
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
//cas加读锁
compareAndSetState(c, c + SHARED_UNIT)) {
//r==0表示第一次获取读锁
if (r == 0) {
//设置第一个读为当前线程
firstReader = current;
//设置当前读锁的重入次数
firstReaderHoldCount = 1;
} else if (firstReader == current) { //第一个读的重入
firstReaderHoldCount++;
} else {
//若不是第一个读,则用HoldCounter记录
HoldCounter rh = cachedHoldCounter;
//第一次读锁获取失败,再次尝试
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
//第一次读锁获取失败,再次尝试(fullTryAcquireShared)
}
上面代码的逻辑就实现了:
读锁的释放
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}