Java并发编程第八篇(CountDownLatch组件分析)
Java并发系列: JUC.CountDownLatch
- 一,JUC.CountDownLatch概况
- 1.1 例子
- 二,源码解析
- 2.1 属性
- 2.2 内部类
- 2.3 方法
- 2.3.1 void await()
- 2.3.2 void countDown()
- 三,总结
一,JUC.CountDownLatch概况
CountDownLatch作为开发中最常用的组件, 也作为面试中被问到的最高频的锁之一, 我们有必要来聊聊它的作用以及内部构造。
首先尝试用一句话对CountDownLatch进行概括: CountDownLatch基于AQS, 它实现了闩锁, 在开发中可以将其用作任务计数器。
若想要较为系统地去理解这些特性, 我觉得最好的方式就是通过源码, 在一览源码之后自己再动手实践一遍, 这样就能够做到知其然并知其所以然。
如果你从来没有接触过CountDownLatch, 那么可能会好奇, 到底什么是闩锁(Latch)? 这个组件又有什么用?
在源码的开头, 可以看到这么一行注释:
// A synchronization aid that allows one or more threads to wait until a set of
// operations being performed in other threads completes.
简单直译过来就是: CountDownLatch这种同步工具允许一条或多条线程等待其他线程中的一组操作完成后, 再继续执行。
1.1 例子
好像还是有点拗口, 举个简单的例子来辅助理解一下。
比如我需要收集七颗龙珠才能召唤神龙, 这七颗龙珠没有相互依赖关系。如果一颗一颗收集太慢了, 那么更好的方式就是派出七个人, 每个人都帮我去收集其中一颗, 这样效率就能够大大提升。从编程的角度来说, 这里需要建立七个工作线程分别同时去寻找指定编号的龙珠, 然后主线程来完成召唤神龙的操作。
但这里就存在一个问题, 因为每条工作线程找到相应龙珠需要花费的时间不一样, 那么主线程需要等待多久? 怎么才能得到“所有龙珠都已经被工作线程找到了”的这个信息呢?
这里就是CountDownLatch这个同步工具的用武之地, 正如它的介绍那样, 它能够帮助主线程在等待其他工作线程里的任务完成之后, 再继续执行。
但是有些情况下, 可能某个子任务耗时过长, 甚至内部已经出现异常, 主线程难道会无休止地等待下去吗? 这可能会导致主线程长时间阻塞, 表现为系统假死。
针对这个问题, 可以对CountDownLatch的await
方法设置超时时间。一旦主线程等待超时, 那么就直接被唤醒, 继续执行后续逻辑。感兴趣的读者可以尝试写一个带有超时情况的例子验证一下。
接下来, 我想从源码角度探寻一下, 为什么CountDownLatch这么简单易用, 它的下层是如何设计的, 能不能从中学习一些思想。
二,源码解析
在看源码之前, 我们可以先简单思考一下, 因为我们已经熟悉了AQS, 在此基础上, 如果让你来设计, 你会尝试怎么做?
我们需要解决的问题: 多个线程等待多个线程中任务的完成。为了便于表述, 这里就简单称为主线程等待子线程。
既然主线程在等待, 那么它就应该进入等待队列, 那么它被唤醒的条件是什么? 当然是子任务都完成的时候, AQS中state这个int值我觉得可以用来表示正在等待执行执行完成的任务数, 每当一个任务完成就, state值就自减1, 当state为0时, 唤醒正在等待的主线程。
这样的大致思路, 看上去没什么大问题, 事实上CountDownLatch确实也是结合AQS这么做的, 关键在于细节, 细节是魔鬼, 因为在并发场景下往往差之毫厘, 谬以千里。
2.1 属性
private final Sync sync;
和ReentrantLock一样, CountDownLatch只有一个类型为内部类Sync的成员属性sync, sync被final
修饰, 意味着一旦初始化, 就不可修改引用了。那么它的初始化时机是什么时候?
在CountDownLatch的唯一构造函数中:
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}
其中count
这个参数代表子任务的个数。
2.2 内部类
private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c - 1;if (compareAndSetState(c, nextc))return nextc == 0;}}
}
首先看到Sync继承了AQS, 那么它就拥有了AQS的所有能力, 再来类中的几个方法。
-
Sync(int count)
构造函数, 参数count即AQS的state值, 代表子任务的个数。 -
int getCount()
获取当前需要等待的任务数。 -
protected int tryAcquireShared(int acquires)
这个方法是对AQS内部方法的重写, 方法名被shared
修饰, 说明用到了共享模式。这里简单介绍一下AQS的共享模式, 远离上和独占模式差别不大。主要从两个方面去理解:
- state: 独占模式下, 当state为1, 代表锁正在被占用, 其他想要获取锁的线程必须等待。共享模式下, state值可以被多个线程同时修改, 增加1代表当前线程获取共享锁, 减去1代表当前线程释放共享锁。
- FIFO队列: 独占模式下, 只有即将出队的Node中的线程被唤醒。共享模式下, 除了即将出队的Node中的线程被唤醒, 也会唤醒后续处于共享模式下Node中的线程。
回到方法本身,
tryAcquireShared
内部逻辑就是, 当state为0, 代表子任务全部完成, 那么返回1, 否则返回-1。如果返回值是负数, 则代表尝试获取锁失败, 有需要的话可以进入队列等待; 如果返回值是0, 则代表尝试获取共享锁成功, 即使后续节点也在等待共享锁, 不需要唤醒后续节点; 如果返回值是1, 则代表尝试获取锁成功, 并唤醒后续节点, 前提是它也在等待共享锁。
-
protected boolean tryReleaseShared(int releases)
内部是一个自旋操作, 当state为0, 代表子任务已经全部完成, 不需要释放锁, 则返回false。否则, 使用CAS将state值自减1, 直至state为0, 说明锁已被完全释放, 才返回true。
看到这边呢, 有的读者可能依然一头雾水, 内部类Sync中方法的逻辑, 目前来看似乎和我们最初的需求没什么太直接的关联。
我们先带着这个问题再来看看公有方法的实现。
2.3 方法
2.3.1 void await()
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
当主线程调用await
时, 实际上内部调用的是AQS的acquireSharedInterruptibly
方法。
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
而acquireSharedInterruptibly
内部首先判断tryAcquireShared
的返回值, 如果为负数, 那么再执行doAcquireSharedInterruptibly
的逻辑。
我们可以这么理解: 若子任务已经全部全部完成, 那么await
直接返回。若还存在子任务没有完成, 则调用doAcquireSharedInterruptibly
方法, 而doAcquireSharedInterruptibly
内部主要逻辑则是初始化一个封装主线程的Node节点, 该节点进入AQS的FIFO队列, 并等待子任务的全部完成。
逻辑细节来看doAcquireSharedInterruptibly
方法的源码:
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
在方法的第2行将当前线程封装为Node插入FIFO队列队尾。后续逻辑和AQS独占模式类似, 在这里可以看到第9-10行, 当tryAcquireShared
返回值为正数, 说明子任务已经全部完成, 此时方法将会return, 调用await
的主线程将会返回。
这样, 从最上层的await
一层层往下分析, 就能理解其中的调用逻辑了。我觉得AQS是一个抽象程度比较高的框架, CountDownLatch是利用这种抽象实现了一种具体的功能。
2.3.2 void countDown()
public void countDown() {sync.releaseShared(1);
}
当工作线程调用countDown
方法时, 内部调用的是AQS的releaseShared
方法。
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}
大致逻辑是, 如果tryReleaseShared
返回值为true, 则调用doReleaseShared
方法, doReleaseShared
内部的大致逻辑就是唤醒后续等待的Node。
所以我们可以这么理解: 当工作线程调用countDown
时, 若任务还没有全部完成, 则直接返回; 若当前任务全部完成, 那么就唤醒FIFO队列中正在等待的Node, 其实也就是主线程节点。
因为从CountDownLatch
到AQS的调用链比较长, 我这里画了一张时序图来辅助大家理解。
图中浅黄色部分为CountDownLatch的方法调用, 浅绿色为AQS, 浅蓝色为内部类Sync。这里模拟了在主线程中提交两个子任务。
三,总结
至此为止, 我们就从头到尾将CountDownLatch从应用到源码都讲解了一遍。在应用演示部分, 只是举了一个很简单的例子, 但足够具有代表性。在对源码的解读部分, 主要还是基于对AQS中共享模式的理解。我个人认为, AQS是一个抽象程度比较高的框架, CountDownLatch是利用这种抽象实现了一种具体的功能。所以, 如果业务中出现了某种特殊的应用场景, 又没有通用的组件可以直接使用, 那么从什么角度去利用AQS的抽象, 将是我们需要思考的问题。