并发编程 - 读写锁(ReentrantReadWriteLock)的探究
在并发场景中我们几乎会高频率的使用到独占式锁来解决线程安全的问题,java中通常使用synchronized或者JUC包下实现了Lock接口的ReentrantLock。它们都是独占式获取锁,也就是在同一时刻只有一个线程能够获取锁。但是在一些业务场景中, 大部分都是读数据, 写数据很少, 如果仅仅是读数据的话并不影响数据的正确性(出现脏读), 而如果在这种业务场景下, 依然使用独占锁的话, 很显然这就是出现性能瓶颈的地方。针对这种读多写少的情况, java专门提供了另外一个实现Lock接口的ReentrantReadWriteLock锁, 即我们今天的主角"读写锁"。
简介:
针对上面我们提到的这种情况我们来介绍下ReentrantReadWriteLock锁, 读写锁是一种特殊的自旋锁,它把对共享资源对访问者划分成了读者和写者,读者只对共享资源进行访问,写者则是对共享资源进行写操作。读写锁在ReentrantLock上进行了拓展使得该锁更适合读操作远远大于写操作对场景。一个读写锁同时只能存在一个写锁但是可以存在多个读锁,但不能同时存在写锁和读锁。
如果读写锁当前没有读者,也没有写者,那么写者可以立刻获的读写锁,否则必须自旋,直到没有任何的写锁或者读锁存在。如果读写锁没有写锁,那么读锁可以立马获取,否则必须等待写锁释放。(但是有一个例外,就是读写锁中的锁降级操作,当同一个线程获取写锁后,在写锁没有释放的情况下可以获取读锁再释放读锁这就是锁降级的一个过程)。
核心特点:
1、公平性选择:支持非公平性(默认)和公平的锁获取方式,吞吐量还是非公平优于公平;
2、重入性:支持重入,读锁获取后能再次获取,写锁获取之后能够再次获取写锁,同时也能够获取读锁;
3、锁降级:遵循获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁。
要想能够彻底的理解读写锁必须能够理解这样几个问题:
1.读写锁是怎样实现分别记录读写状态的?
2.写锁是怎样获取和释放的?
3.读锁是怎样获取和释放的?
4.锁降级是怎样理解的?
写锁的探究
1、获取
同步组件的实现聚合了同步器(AQS),并通过重写重写同步器(AQS)中的方法实现同步组件的同步语义。因此,写锁的实现依然也是采用这种方式。在同一时刻写锁是不能被多个线程所获取,很显然写锁是独占式锁,而实现写锁的同步语义是通过重写AQS中的tryAcquire方法实现的。源码为:
protected final boolean tryAcquire(int acquires) {/** Walkthrough:* 1. If read count nonzero or write count nonzero* and owner is a different thread, fail.* 2. If count would saturate, fail. (This can only* happen if count is already nonzero.)* 3. Otherwise, this thread is eligible for lock if* it is either a reentrant acquire or* queue policy allows it. If so, update state* and set owner.*/Thread current = Thread.currentThread();// 1. 获取写锁当前的同步状态int c = getState();// 2. 获取写锁获取的次数int w = exclusiveCount(c);if (c != 0) {// (Note: if c != 0 and w == 0 then shared count != 0)// 3.1 当读锁已被读线程获取或者当前线程不是已经获取写锁的线程的话// 当前线程获取写锁失败if (w == 0 || current != getExclusiveOwnerThread())return false;if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// Reentrant acquire// 3.2 当前线程获取写锁,支持可重复加锁setState(c + acquires);return true;}// 3.3 写锁未被任何线程获取,当前线程可获取写锁if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true;
}
这段代码的逻辑请看注释,这里有一个地方需要重点关注,exclusiveCount©方法,该方法源码为:
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
其中EXCLUSIVE_MASK为: static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; EXCLUSIVE _MASK为1左移16位然后减1,即为0x0000FFFF。而exclusiveCount方法是将同步状态(state为int类型)与0x0000FFFF相与,即取同步状态的低16位。那么低16位代表什么呢?根据exclusiveCount方法的注释为独占式获取的次数即写锁被获取的次数,现在就可以得出来一个结论同步状态的低16位用来表示写锁的获取次数。同时还有一个方法值得我们注意:
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
该方法是获取读锁被获取的次数,是将同步状态(int c)右移16次,即取同步状态的高16位,现在我们可以得出另外一个结论同步状态的高16位用来表示读锁被获取的次数。现在还记得我们开篇说的需要弄懂的第一个问题吗?读写锁是怎样实现分别记录读锁和写锁的状态的,现在这个问题的答案就已经被我们弄清楚了,其示意图如下图所示:
现在我们回过头来看写锁获取方法tryAcquire,其主要逻辑为:当读锁已经被读线程获取或者写锁已经被其他写线程获取,则写锁获取失败;否则,获取成功并支持重入,增加写状态。
2、释放
写锁释放通过重写AQS的tryRelease方法,源码为:
protected final boolean tryRelease(int releases) {if (!isHeldExclusively())throw new IllegalMonitorStateException();//1. 同步状态减去写状态int nextc = getState() - releases;//2. 当前写状态是否为0,为0则释放写锁boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);//3. 不为0则更新同步状态setState(nextc);return free;
}
源码的实现逻辑请看注释,不难理解与ReentrantLock基本一致,这里需要注意的是,减少写状态 int nextc = getState() - releases;只需要用当前同步状态直接减去写状态的原因正是我们刚才所说的写状态是由同步状态的低16位表示的。
读锁的探究
1、获取
看完了写锁,现在来看看读锁,读锁不是独占式锁,即同一时刻该锁可以被多个读线程获取也就是一种共享式锁。按照之前对AQS介绍,实现共享式同步组件的同步语义需要通过重写AQS的tryAcquireShared方法和tryReleaseShared方法。读锁的获取实现方法为:
protected final int tryAcquireShared(int unused) {/** Walkthrough:* 1. If write lock held by another thread, fail.* 2. Otherwise, this thread is eligible for* lock wrt state, so ask if it should block* because of queue policy. If not, try* to grant by CASing state and updating count.* Note that step does not check for reentrant* acquires, which is postponed to full version* to avoid having to check hold count in* the more typical non-reentrant case.* 3. If step 2 fails either because thread* apparently not eligible or CAS fails or count* saturated, chain to version with full retry loop.*/Thread current = Thread.currentThread();int c = getState();//1. 如果写锁已经被获取并且获取写锁的线程不是当前线程的话,当前// 线程获取读锁失败返回-1if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;int r = sharedCount(c);if (!readerShouldBlock() &&r < MAX_COUNT &&//2. 当前线程获取读锁compareAndSetState(c, c + SHARED_UNIT)) {//3. 下面的代码主要是新增的一些功能,比如getReadHoldCount()方法//返回当前获取读锁的次数if (r == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}//4. 处理在第二步中CAS操作失败的自旋已经实现重入性return fullTryAcquireShared(current);
}
代码的逻辑请看注释,需要注意的是 当写锁被其他线程获取后,读锁获取失败,否则获取成功利用CAS更新同步状态。另外,当前同步状态需要加上SHARED_UNIT( (1 << SHARED_SHIFT) 即0x00010000)的原因这是我们在上面所说的同步状态的高16位用来表示读锁被获取的次数。如果CAS失败或者已经获取读锁的线程再次获取读锁时,是靠fullTryAcquireShared方法实现的,这段代码就不展开说了,有兴趣可以看看。
2、释放
读锁释放的实现主要通过方法tryReleaseShared,源码如下,主要逻辑请看注释:
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();// 前面还是为了实现getReadHoldCount等新功能if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {int c = getState();// 读锁释放 将同步状态减去读状态即可int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}
}
锁降级
读写锁支持锁降级,遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁,不支持锁升级,关于锁降级下面的示例代码摘自ReentrantWriteReadLock源码中:
/** void processCachedData() {
* rwl.readLock().lock();
* if (!cacheValid) {
* // Must release read lock before acquiring write lock
* rwl.readLock().unlock();
* rwl.writeLock().lock();
* try {
* // Recheck state because another thread might have
* // acquired write lock and changed state before we did.
* if (!cacheValid) {
* data = ...
* cacheValid = true;
* }
* // Downgrade by acquiring read lock before releasing write lock
* rwl.readLock().lock();
* } finally {
* rwl.writeLock().unlock(); // Unlock write, still hold read
* }
* }
*
* try {
* use(data);
* } finally {
* rwl.readLock().unlock();
* }
* }
* }
*/
简单的理解锁降级就是从写锁降级成为读锁。在当前线程拥有写锁的情况下,再次获取到读锁,随后释放写锁的过程就是锁降级。
这里可以举个例子:
public class CacheDemo { private Map<String, Object> cache = new HashMap<String, Object>();private ReadWriteLock rwl = new ReentrantReadWriteLock();public ReadLock rdl = rwl.readLock();public WriteLock wl = rwl.writeLock();public volatile boolean update = false;public void processData(){rdl.lock();//获取读锁if(!update){rdl.unlock();//释放读锁wl.lock();//获取写锁try{if(!update){update =true;}rdl.lock();//获取读锁finally{wl.unlock();//释放写锁}}try{}finally{rdl.unlock();//释放读锁}
}
总结
读写锁是在重入锁ReentrantLock基础上的一大改造,其通过在重入锁上维护一个读锁一个写锁实现的。对于ReentrantLock和ReentrantreadWriteLock的使用需要在开发者自己根据实际项目的情况而定。对于读写锁当读的操作远远大于写操作的时候会增加程序很高的并发量和吞吐量。虽说在高并发的情况下,读写锁的效率很高,但是同时又会存在一些问题,比如当读并发很高时读操作长时间占有锁,导致写锁长时间无法被获取而导致的线程饥饿问题,因此在JDK1.8中又在ReentrantReadWriteLock的基础上新增了一个读写并发锁StampLock。