理解AQS的原理并学习源码
文章目录
- 什么是AQS?
- 核心功能
- 状态管理
- 线程等待
- 加锁流程
- 解锁流程
- 共享模式
- 总结
什么是AQS?
AbstractQueuedSynchronizer(抽象队列同步器)是Java并发包java.util.concurrent中的核心基础组件,它提供了一个用于构建锁和其他同步组件的框架,在其内部通过volatile int state变量表示同步状态,并且定义了一个内部类来使用双向链表实现等待队列,同时支持独占和共享模式。
核心功能
状态管理
AQS使用一个32位的整数来表示同步状态:
private volatile int state;
这个状态变量的含义完全由子类来定义。例如:在ReentrantLock中,state表示锁被持有的次数。
线程等待
如果被请求的共享资源空闲,则将当前的请求线程设置为有效的⼯作线程,然后将对应的共享资源设置为锁定状态。如果共享资源被占⽤,则通过CLH同步队列将暂时⽤不到的线程封装成⼀个节点加⼊到队列中,同时在适当的时候对其进⾏阻塞和唤醒。
加锁流程
以ReentrantLock的NonfairSync为例:
- 直接尝试CAS抢占
NonfairSync首先会直接尝试CAS将state从0设置为1:
final void lock() {if (compareAndSetState(0, 1)) // // 直接抢占setExclusiveOwnerThread(Thread.currentThread());elseacquire(1); // // 抢占失败,进入AQS流程}
- 进入AQS的acquire流程
如果CAS失败,调用acquire(1):
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
再次调用tryAcquire()尝试获取锁(非公平特性体现):
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
如果当前线程已持有锁,增加重入次数;获取失败则进入队列等待。
- 加入等待队列
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}
获取失败时,将线程包装成Node并通过CAS操作安全地加入队列尾部。
- 队列中自旋和阻塞
在队列中的处理逻辑:
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
检查前驱节点是否为head,如果是则尝试获取锁;如果获取成功则将自己设为新的head节点,获取失败则判断是否需要阻塞(shouldParkAfterFailedAcquire), 需要阻塞则调用LockSupport.park()挂起线程。
解锁流程
释放锁的代码:
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
将state减1,判断当前线程是不是独占线程,如果是就抛异常;当state变为0时,设置独占线程为null,并且后续返回true。
共享模式
除了独占模式,AQS还支持共享模式,其特点是多个线程可以同时持有资源,主要方法包括:
- acquireShared(int arg):共享模式获取资源
- releaseShared(int arg):共享模式释放资源
总结
AQS提供了一个用于构建锁和其他同步组件的框架,在其内部通过volatile int state变量表示同步状态,并且定义了一个内部类来使用双向链表实现等待队列,同时支持独占和共享模式。