并发编程艺术--AQS底层源码解析(一)
Lock:
对于并发编程最常用来控制同步的方式就是锁。一般来说,一个锁能防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。而在jdk5之前都是采用的synchronized关键字来实现锁功能的,而在jdk5之后则引入了Lock接口用来实现锁功能,他提供了与synchronized类似的同步功能,区别是在使用时候要显示的获取锁和释放锁,虽然缺少了synchronized隐式获取锁和释放锁的便捷性,但是却提供了synchronized不具备的灵活性的特性。
public interface Lock {void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock();boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;void unlock();Condition newCondition();
}
上述源码可以看出Lock具备非阻塞式获取锁以及超时获取锁,中断获取锁等功能,相比synchronized来说可以更加灵活的控制锁。通过lock这个接口的规范后续退出了一系列的锁如reentrantlock,Semaphore,ReentrantReadWriteLock。这些都是Lock接口的子类,对于这些锁他是面向锁使用者的,但是这些锁的实现底层则是通过一个AbstractQueuedSynchronizer(AQS)抽象同步队列来实现的,也就是说AQS是这些锁的核心,下面我们就来详细展开讲讲AQS
AbstractQueuedSynchronizer:
AbstractQueuedSynchronizer是一个抽象类,这个类的主要作用就是用来构建锁或者其他同步组件的基础框架,他的主要结构是内置一个同步状态(同步资源)int类型的成员变量,通过内置的FIFO队列来完成资源获取线程的排队工作。
而AbstractQueuedSynchroni预留了一些方法给子类交给他们进行自定义实现,而对于底层的一些方法AbstractQueuedSynchronizer则自己进行实现了,同时AbstractQueuedSynchronizer使用的是模板方法设计模式来管理整个同步的流程。
同步器是是实现锁的基石,在锁的实现中聚合同步器,利用同步器实现锁的语义。
锁是面向锁使用者的,同步器是面向锁构建者的。
这样抽象的说明可能会不太理解,附上reentrantlock源码
public class ReentrantLock implements Lock, Serializable {private static final long serialVersionUID = 7373984872572414699L;private final Sync sync;public ReentrantLock() {this.sync = new NonfairSync();}//。。。。。 abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;Sync() {}//。。。。。 }}
上述源码可以看出在reentrantlock当中实际上有一个Sync 的静态内部类来,这个类是继承于AQS的类,reentrantlock的一些方法都是基于Sync类的一些方法进行包装后返回的数据。可以看出将锁的底层与锁的方法剥离开来,使得业务更加清晰,服务的对象也更加明朗--服务于锁使用者还是锁开发者。
讲完了AbstractQueuedSynchroni与锁的关系下面我们来讲一讲AbstractQueuedSynchroni是如何实现同步功能的:
实际上同步器是依赖于内部的同步队列来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列(此时加入同步队列后会设置变量Tail指向当前的节点由于这个操作是并发操作因此采用CAS的方式来进行同步保证),同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。如果获取成功则会将当前的head值置为本节点,而这个操作只会有当前线程执行因此不会采用同步策略。同时当本节点业务完成之后会唤醒后续的节点
源码解析:
首先我们对独占锁的同步获取锁的源码进行分析:
final void lock() {this.acquire(1);}
上述lock是锁的方法,方法内部调用了AQS的acquire方法,同时传递了一个参数1,这个参数1则表示的是AQS中的state也就是资源量,也就是尝试获取一个资源
public final void acquire(int var1) {if (!this.tryAcquire(var1) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), var1)) {selfInterrupt();}}
可以看到acquire内部方法首先调用tryAcquire表示尝试获取锁,如果获取成功则直接进入selfInterrupt方法中去,否则则先通过addWaiter创造一个Node节点进行初始化,之后再通过acquireQueued放入一个"死循环"中去进行自旋等待。
protected boolean tryAcquire(int var1) {throw new UnsupportedOperationException();}
在AQS中的tryAcquire方法则是留给子类去实现,这个方法的作用主要是尝试获取锁看是否成功
this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE)
而这个方法则表示创建Node节点同时EXCLUSIVE表示节点类型是什么。
static final class Node {volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;}
上述代码则是Node节点内部的主要属性,其中waitStatus表示当前节点的状态(唤醒、取消、条件等待等),prev表示前置节点而next表示后置节点,thread则是存储的当前线程,nextWaiter标记节点竞争模式(独占 / 共享)
private Node addWaiter(Node var1) {Node var2 = new Node(Thread.currentThread(), var1);Node var3 = this.tail;if (var3 != null) {var2.prev = var3;if (this.compareAndSetTail(var3, var2)) {var3.next = var2;return var2;}}this.enq(var2);return var2;}
addWaiter方法则开始完全初始化Node节点:
1. 初始化构建Node节点
2.得到尾节点Tail将其设置为前置节点并且通过CAS将其赋值为本节点
3.设置成功则返回本节点未成功则调用enq方法,这个方法会不断循环进行CAS设置同时更新前置节点
acquireQueued方法则是未占有资源后的核心操作
final boolean acquireQueued(Node var1, int var2) {boolean var3 = true;try {boolean var4 = false;while(true) {Node var5 = var1.predecessor();if (var5 == this.head && this.tryAcquire(var2)) {this.setHead(var1);var5.next = null;var3 = false;boolean var6 = var4;return var6;}if (shouldParkAfterFailedAcquire(var5, var1) && this.parkAndCheckInterrupt()) {var4 = true;}}} finally {if (var3) {this.cancelAcquire(var1);}}}
可以看到本方法的核心就是在于这个死循环中,首先拿出本节点的前置节点与head头节点进行比较查看前置节点是否为头节点,如果是则进行抢占锁的操作不是则不会抢占。原因之前的结构图已经说过只有头节点才能占有锁,其他节点都会在队列中等待。
如果前置节点是head节点同时获取锁成功,那么就会将本身设置为head节点同时跳出循环。如果不是头节点或者获取不成功那么就会调用shouldParkAfterFailedAcquire方法和parkAndCheckInterrupt方法
private static boolean shouldParkAfterFailedAcquire(Node var0, Node var1) {int var2 = var0.waitStatus;if (var2 == -1) {return true;} else {if (var2 > 0) {do {var1.prev = var0 = var0.prev;} while(var0.waitStatus > 0);var0.next = var1;} else {compareAndSetWaitStatus(var0, var2, -1);}return false;}}
shouldParkAfterFailedAcquire代码块则是判断前置节点是否合理以及是否本节点需要进入阻塞状态,如果前置节点符合条件则直接返回将本节点进入到阻塞状态,但是如果前置节点已经中断获知其他原因出现异常的情况就会重新遍历选取合适的新的前置节点,同时不进入阻塞状态
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
而LockSupport则是直接将本线程进入阻塞状态以此来减少cpu资源的消耗,并且检查当前线程的中断状态
之后我们继续看acquireQueued方法
final boolean acquireQueued(Node var1, int var2) {boolean var3 = true;try {boolean var4 = false;while(true) {Node var5 = var1.predecessor();if (var5 == this.head && this.tryAcquire(var2)) {this.setHead(var1);var5.next = null;var3 = false;boolean var6 = var4;return var6;}if (shouldParkAfterFailedAcquire(var5, var1) && this.parkAndCheckInterrupt()) {var4 = true;}}} finally {if (var3) {this.cancelAcquire(var1);}}}
var4变量实际上是用来记录当前线程的中断状态的而在独占同步的方法中这个var4虽然记录线程被中断但是并不会进行响应也就是说还是会继续循环,那么var4的作用在哪里会体现呢,后续的可中断锁则会用到var4变量。
至此acquireQueued则是介绍完毕,下面我们继续回到acquire方法查看最后的一个方法
public final void acquire(int var1) {if (!this.tryAcquire(var1) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), var1)) {selfInterrupt();}}
static void selfInterrupt() {Thread.currentThread().interrupt();}
对于selfInterrupt则是继续标志thread为中断状态防止中断状态丢失,为什么要一直记录中断状态呢,虽然AQS没有对中断状态进行处理但是可以交给子类进行处理,或者交给上层锁构建者来对中断进行不同的处理。
至此不可中断独占式阻塞获取锁的AQS源码解读已经讲完了,下一章节将会讲述其他类型的获取锁源码。