并发编程之ReentrantLock
一、引言
在并发编程的世界里,线程安全是开发者必须面对的挑战。Java 提供了内置的 synchronized 关键字来保证同步,但在高并发、复杂业务场景下,它的局限性逐渐显现,而 ReentrantLock的出现,正是为了解决这些问题。作为 java.util.concurrent
包中的可重入互斥锁,它不仅具备 synchronized
的基础功能,还提供了更加丰富的一些功能,本文将深入剖析 ReentrantLock的实现原理、使用场景,并通过对比 synchronized ,帮助你理解如何在高并发程序中做出更优的锁选择。
二、 ReentrantLock核心特性
首先我们先来看看他的核心特性,然后在依次为大家进行刨析。
-
可重入性:同一个线程可以多次获取同一个锁
-
可中断:支持获取锁时的中断操作
-
公平性选择:可以构造公平锁或非公平锁
-
条件变量支持:可以创建多个 Condition 对象
方法名称 | 方法描述 |
lock() | 阻塞式获取锁,如果锁被其他线程持有,则当前线程进入等待状态,直到获取锁。必须手动释放锁 |
tryLock() | 尝试非阻塞获取锁,适用于避免死锁 |
tryLock(long timeout,TimeUnti unti) | 带超时的尝试获取锁,避免长时间阻塞 |
lockInterruptibly() | 可中断地获取锁,如果锁被其他线程持有,当前线程进入等待状态,但可以被 interrupt() 中断,适用于需要响应中断的场景(如线程池任务取消) |
unlock() | 释放锁,必须在 finally 块中调用,否则可能导致死锁 |
三、ReentrantLock可重入性
可重入锁(Reentrant Lock) 是指同一个线程可以多次获取同一把锁,而不会导致死锁。每次获取锁时,锁的计数器会递增;释放锁时,计数器递减,直到计数器归零时锁才真正释放。
-
synchronized 是可重入的(JVM 内置支持)。
-
ReentrantLock 显式实现了可重入机制
可重入锁示例:
public class ReentrantLockExample {ReentrantLock lock = new ReentrantLock();public void recursiveMethod(int n) {lock.lock();try {if (n <= 0) return;System.out.println("Lock acquired, n=" + n);recursiveMethod(n - 1); // 递归调用,仍然可以获取锁} finally {lock.unlock();}}public static void main(String[] args) {// 调用new ReentrantLockExample().recursiveMethod(3);}
}
输出结果:
Lock acquired, n=3
Lock acquired, n=2
Lock acquired, n=1
我们可以清晰的看到同一个线程可以多次获取锁,锁计数器记录锁的重入次数。
我们在使用ReentrantLock的可重入特性的时候需要注意以下几点:
- 必须成对调用
lock()
和unlock()
,如果
果 lock() 调用次数大于unlock()
,锁不会被释放;反之会抛出IllegalMonitorStateException。
- 避免死锁,即使可重入,仍然可能因跨锁重入导致死锁(如
lockA
→lockB
→lockA
)。
四、ReentrantLock可中断性
ReentrantLock可中断是指线程在等待获取锁的过程中,可以响应Thread.interrupt()中断信号,从而退出等待状态。这是 ReentrantLock相比synchronized 的一个重要优势。
ReentrantLock可中断性主要依靠它里面的方法lockInterruptibly()。
可中断锁的场景适用情况,当任务在等待锁时需要支持外部取消/中断。主要场景包括如下:
-
线程池任务超时取消
-
响应系统关闭信号
-
长时间等待时的优雅退出
ExecutorService executor = Executors.newFixedThreadPool(4);
ReentrantLock lock = new ReentrantLock();Future<?> future = executor.submit(() -> {try {lock.lockInterruptibly(); // 可中断获取锁try {// 执行关键操作} finally {lock.unlock();}} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 恢复中断状态System.out.println("任务被取消");}
});// 需要时取消任务
future.cancel(true); // 中断正在等待锁的任务
五、ReentrantLock公平性选择
ReentrantLock提供了公平性选择机制,允许开发者根据需求选择公平锁或非公平锁。下面我将从多个角度详细解析这一特性。首先我们来看一下公平锁和非公平锁有什么不一样。
特性 | 公平锁 | 非公平锁 |
获取顺序 | 严格按照线程请求锁的顺序分配 | 允许"插队",新请求的线程可能立即获取锁 |
吞吐量 | 较低 | 较高 |
饥饿可能 | 不会发生 | 可能发生 |
构造方式 | new ReentrantLock(true) | new ReentrantLock() 或 new ReentrantLock(false)) |
5.1 公平性实现原理
简单来看一下它的实现原理,后面会专门详细讲解ReentrantLock的实现原理。
ReentrantLock通过 AQS (AbstractQueuedSynchronizer) 的 CLH 队列实现公平性:
// AQS 中的关键代码
public final boolean hasQueuedPredecessors() {Node h, s;return (h = head) != null && ((s = h.next) == null || s.thread != Thread.currentThread());
}
公平锁获取逻辑:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() && // 关键判断:是否有等待更久的线程compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// ... 重入逻辑
}
非公平锁获取逻辑:
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) { // 直接尝试获取,不检查队列setExclusiveOwnerThread(current);return true;}}// ... 重入逻辑
}
5.2 适合使用公平锁的场景
-
严格顺序要求:如订单处理、交易系统
-
避免线程饥饿:当任务执行时间差异很大时
-
调试目的:更容易重现并发问题
// 公平锁使用示例 ReentrantLock fairLock = new ReentrantLock(true);void processOrder(Order order) {fairLock.lock();try {// 严格按订单到达顺序处理} finally {fairLock.unlock();} }
六、ReentrantLock 条件变量 (Condition)
ReentrantLock
的条件变量 (Condition
) 提供了比 Object.wait()/notify()
更灵活的线程通信机制,允许在不同条件下进行精确的线程等待和唤醒。
6.1 Condition 基础概念
-
支持 多个等待队列(每个
Condition
维护一个独立的等待队列) -
替代传统的
Object.wait()
和Object.notify()
-
必须与
ReentrantLock
配合使用
6.2 Condition 基本使用
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition(); // 创建条件变量
典型等待/通知模式:
// 等待线程
lock.lock();
try {while (!conditionSatisfied) { // 必须用 while 防止虚假唤醒condition.await(); // 释放锁并等待}// 条件满足后的处理
} finally {lock.unlock();
}// 通知线程
lock.lock();
try {conditionSatisfied = true;condition.signal(); // 或 signalAll()
} finally {lock.unlock();
}
七、ReentrantLock实现原理
ReentrantLock
是基于 AQS(AbstractQueuedSynchronizer) 实现的 可重入互斥锁,核心结构如下:
public class ReentrantLock implements Lock, java.io.Serializable {private final Sync sync; // 核心同步器(继承AQS)abstract static class Sync extends AbstractQueuedSynchronizer { ... }// 非公平锁实现static final class NonfairSync extends Sync { ... }// 公平锁实现static final class FairSync extends Sync { ... }
}
-
Sync:继承
AQS
,提供锁的基本实现(包括tryAcquire
、tryRelease
)。 -
NonfairSync:非公平锁,新线程可以插队尝试获取锁。
-
FairSync:公平锁,严格按照 FIFO 顺序获取锁。
7.1 AQS 核心设计
AQS 是 Java 并发包的核心基础框架,ReentrantLock
、Semaphore
、CountDownLatch
等同步工具均基于它实现。我们接下来看看他的核心设计。
7.1.1 基本结构
AQS 使用 CLH 变种队列(双向链表) 管理等待线程,核心字段:
// 头节点(当前持有锁的线程)
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 同步状态(如 ReentrantLock 的 state 表示重入次数)
private volatile int state;
Node 节点结构:
static final class Node {volatile int waitStatus; // 节点状态(CANCELLED、SIGNAL、CONDITION 等)volatile Node prev; // 前驱节点volatile Node next; // 后继节点volatile Thread thread; // 关联的线程Node nextWaiter; // 条件队列专用
}
waitStatus关键状态:
-
CANCELLED (1):线程已取消(超时或中断)
-
SIGNAL (-1):后继节点需要被唤醒
-
CONDITION (-2):节点在条件队列中
-
PROPAGATE (-3):共享模式下传播唤醒
7.1.2 独占模式
资源同一时刻只能被一个线程获取,如 ReentrantLock。
public final void acquire(int arg) {if (!tryAcquire(arg) && // 尝试获取锁(子类实现)acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 加入队列并阻塞selfInterrupt(); // 恢复中断状态
}
调用 acquire 方法可以获取同步状态,底层就是调用须重写方法中的 tryAcquire。如果获取失败则进入同步队列中,即使后续对线程进行终端操作,线程也不会从同步队列中移除。
addWaiter(Node.EXCLUSIVE)
将当前线程包装为 Node
并加入队列:
private Node addWaiter(Node mode) {//将当前线程封装成对应模式下的Node节点Node node = new Node(Thread.currentThread(), mode);Node pred = tail;//尾节点if (pred != null) {//双端队列需要两个指针指向node.prev = pred;//通过CAS方式if (compareAndSetTail(pred, node)) {//添加到队列尾部pred.next = node;return node;}}//等待队列中没有节点,或者添加队列尾部失败则调用end方法enq(node);return node;
}//Node节点通过CAS自旋的方式被添加到队列尾部,直到添加成功为止。
private Node enq(final Node node) {//死循环,类似 while(1)for (;;) {Node t = tail;if (t == null) { // 须要初始化,代表队列的第一个元素if (compareAndSetHead(new Node()))//头节点就是尾节点tail = head;} else {//双端队列需要两个指针指向node.prev = t;//通过自旋放入队列尾部if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
关键点:
-
队列初始化时会创建一个 哑节点(dummy head)。
-
通过 CAS + 自旋 保证线程安全入队。
acquireQueued(node, arg):
在队列中自旋等待锁:
final boolean acquireQueued(final Node node, int arg) {//获取资源失败标识boolean failed = true;try {//线程是否被中断标识boolean interrupted = false;//死循环,类似 while(1)for (;;) {//获取当前节点的前趋节点final Node p = node.predecessor();//前趋节点是head,即队列的第二个节点,可以尝试获取资源if (p == head && tryAcquire(arg)) {//资源获取成功将当前节点设置为头节点setHead(node);p.next = null; // help GC,表示head节点出队列failed = false;return interrupted;}//判断当前线程是否可以进入waitting状态,详解见下方if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) //阻塞当前线程,详解见下方interrupted = true;}} finally {if (failed)//取消获取同步状态,源码见下方的取消获取同步状态章节cancelAcquire(node);}
}//将当前节点设置为头节点
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;
}//判断当前线程是否可以进入waitting状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//获取前趋节点的等待状态,含义见上方Node结构源码int ws = pred.waitStatus;if (ws == Node.SIGNAL) //表示当前节点的线程需要被唤醒return true;if (ws > 0) { //表示当前节点的线程被取消//则当前节点一直向前移动,直到找到一个waitStatus状态小于或等于0的节点do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);//排在这个节点的后面pred.next = node;} else {//通过CAS设置等待状态compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}//阻塞当前线程
private final boolean parkAndCheckInterrupt() {//底层调用的UnSafe类的方法 park:阻塞当前线程, unpark:使给定的线程停止阻塞LockSupport.park(this);//中断线程return Thread.interrupted();
}
acquireQueued 方法中,只有当前驱节点等于 head 节点时,才能够尝试获取同步状态,这时为什么呢?
因为 head 节点是占有资源的节点,它释放后才会唤醒它的后继节点,所以需要检测。还有一个原因是因为如果遇到了非 head 节点的其他节点出队或因中断而从等待中唤醒,这时种情况则需要判断前趋节点是否为 head 节点,是才允许获取同步状态。
7.1.3 释放锁(release)
public final boolean release(int arg) {if (tryRelease(arg)) { // 子类实现(如 ReentrantLock 的 tryRelease)Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h); // 唤醒后继节点return true;}return false;
}private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0); // 清除状态Node s = node.next;if (s == null || s.waitStatus > 0) { // 后继节点无效(如 CANCELLED)s = null;for (Node t = tail; t != null && t != node; t = t.prev) // 从尾向前找有效节点if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); // 唤醒线程
}
关键点:
-
唤醒策略:从
head
开始向后查找第一个有效节点(跳过CANCELLED
节点)。 -
从后向前遍历:因为
next
指针可能因并发插入失效,但prev
指针一定正确。
八、小结
ReentrantLock
提供了更强大的功能和灵活性,使其在复杂并发编程中成为非常有用的工具。虽然它相比 synchronized
更加复杂,但在高并发、高性能的系统中,ReentrantLock
可以为我们带来更多的控制和优化空间。
使用 ReentrantLock
时,我们需要特别注意锁的释放,避免因忘记释放锁而导致的死锁问题。同时,在选择使用 ReentrantLock
还是 synchronized
时,应根据具体需求做出决策,简单的并发控制可以使用 synchronized
,而复杂的并发需求则可以考虑 ReentrantLock
。