当前位置: 首页 > news >正文

并发编程——09 CountDownLatch源码分析

1 概述

  • CountDownLatch 是 Java 并发包中用于线程同步的工具类,核心逻辑是:

    • 初始化时指定一个计数count

    • 线程执行完任务后调用 countDown(),使计数count减1;

    • 其他线程调用 await() 会被阻塞,直到count减到0时,阻塞的线程才会被唤醒并继续执行;

  • 流程步骤:

    在这里插入图片描述

    • 初始化:创建 CountDownLatch 时指定count=4(即图中的state=4),代表需要等待4个线程完成任务;

    • 主线程阻塞main线程调用 await(),进入阻塞状态(图中粉色“main 阻塞”块),直到count减为0;

    • 子线程执行并计数递减

      • Thread1 执行完任务,调用 countDown()count从4→3(图中state=3);
      • Thread2 调用 countDown()count从3→2(图中state=2);
      • Thread3 调用 countDown()count从2→1(图中state=1);
      • Thread4 调用 countDown()count从1→0(图中state=0);
  • 主线程唤醒并继续:当count=0时,main线程的await()不再阻塞,继续执行后续逻辑(图中绿色“main Done”块);

  • 源码方法总览:

    在这里插入图片描述

2 构造函数

public CountDownLatch(int count) {// 确保传入的计数 count 不能为负数,否则直接抛出异常,保证了 CountDownLatch 初始化的合理性if (count < 0) throw new IllegalArgumentException("count < 0");// 初始化sync属性this.sync = new Sync(count);
}
  • SyncCountDownLatch 内部基于 AQS(抽象队列同步器) 实现的同步组件,它将传入的 count 作为 AQS 的同步状态(state),为后续的 countDown() 计数递减和 await() 阻塞唤醒逻辑提供了底层支持。

3 Sync-队列同步器

// Sync 是 AQS 的子类,用于实现 CountDownLatch 的同步逻辑
private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;// 构造函数通过 setState(count) 将 CountDownLatch 的初始化计数赋值给 AQS 的 state 属性(state 是 AQS 用于维护同步状态的核心变量)Sync(int count) {setState(count);}// 获取当前 AQS 中 state 的值,即 CountDownLatch 剩余的未完成线程数int getCount() {return getState();}// 判断是否可以“获取共享资源”(即 CountDownLatch 的计数是否已减到 0)// 若 state == 0,返回 1(表示可以获取,await() 方法不会阻塞)// 若 state != 0,返回 -1(表示无法获取,await() 方法会阻塞线程)protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}// 释放锁protected boolean tryReleaseShared(int releases) {// 自旋for (;;) {int c = getState(); // 获取 AQS 的 stateif (c == 0) // 计数已为0,无需再释放return false;int nextc = c-1; // 计数减1if (compareAndSetState(c, nextc)) // CAS原子更新statereturn nextc == 0; // 若减到0,返回true(触发唤醒所有阻塞线程)}}
}

4 await()-阻塞等待

  • CountDownLatch#await():入口方法

    public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
    }
    
    • 作用:使当前线程阻塞等待,直到 CountDownLatch 的计数减到 0;

    • 实现:委托给 sync(基于 AQS 的同步组件)的 acquireSharedInterruptibly(1) 方法执行,1 是 AQS 共享式获取的参数(此处无实际数值意义,仅为兼容方法签名);

  • AQS#acquireSharedInterruptibly(int arg):共享式可中断获取逻辑

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException(); // 检查线程中断,若已中断则抛异常if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); // 若获取失败,进入阻塞队列逻辑
    }
    
    • 中断检查:先判断线程是否被中断,若已中断则立即抛出 InterruptedException

    • 尝试获取资源:调用 tryAcquireShared(arg)(由 CountDownLatchSync 实现),若返回值 < 0(表示计数未到 0),则进入 doAcquireSharedInterruptibly(arg) 处理阻塞逻辑;

  • CountDownLatch#Sync#tryAcquireShared(int acquires):共享式获取的状态判断

    protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
    }
    
    • 作用:判断 CountDownLatch 的计数是否已减到 0
      • state == 0,返回 1(表示可以获取,await() 不阻塞);
      • state != 0,返回 -1(表示无法获取,await() 会阻塞)。
  • AQS#doAcquireSharedInterruptibly(int arg):共享式阻塞获取(可中断)

    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 将当前线程包装为“共享模式”节点,加入等待队列boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor(); // 获取前驱节点if (p == head) { // 若前驱是头节点,说明当前节点有资格尝试获取资源int r = tryAcquireShared(arg);if (r >= 0) { // 计数已到0,获取成功setHeadAndPropagate(node, r); // 设置头节点并传播唤醒(共享模式特有)p.next = null; // 断开原头节点引用,帮助GCfailed = false;return;}}// 若获取失败,判断是否需要挂起线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) // 挂起线程并检查中断throw new InterruptedException();}} finally {if (failed)cancelAcquire(node); // 若获取过程中失败,取消节点的获取请求}
    }
    
    • 加入等待队列addWaiter(Node.SHARED) 将当前线程包装为“共享模式”的节点,加入 AQS 的等待队列尾部;

    • 自旋尝试获取:循环中先判断“前驱是否为头节点”(若为头节点,说明当前节点是队列中最有资格获取资源的线程),然后再次尝试 tryAcquireShared(arg)

    • 线程挂起与中断:若获取失败,通过 shouldParkAfterFailedAcquire 判断是否需要挂起线程;若线程在挂起期间被中断,parkAndCheckInterrupt() 会返回 true,进而抛出 InterruptedException

5 countDown()-释放锁资源

  • CountDownLatch#countDown():入口方法

    public void countDown() {sync.releaseShared(1);
    }
    
    • 作用:将 CountDownLatch 的计数减 1,若计数减到 0,则唤醒所有等待的线程;

    • 实现:委托给sync(基于 AQS 的同步组件)的releaseShared(1)方法执行,1表示要递减的计数(此处固定为 1,因为 countDown() 每次只减 1);

  • AQS#releaseShared(int arg):共享式释放逻辑

    public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { // 尝试递减计数doReleaseShared(); // 唤醒等待队列中的线程return true;}return false;
    }
    
    • 尝试释放资源:调用 tryReleaseShared(arg)(由 CountDownLatchSync 实现),若递减成功则进入下一步;

    • 唤醒等待线程:调用 doReleaseShared() 唤醒 AQS 等待队列中阻塞的线程(即调用 await() 的线程);

  • CountDownLatch#Sync#tryReleaseShared(int releases):计数递减的核心逻辑

    protected boolean tryReleaseShared(int releases) {for (;;) { // 自旋保证原子性int c = getState(); // 获取当前计数(AQS的state)if (c == 0) return false; // 计数已为0,无需再递减int nextc = c - 1; // 计数减1if (compareAndSetState(c, nextc)) { // CAS原子更新计数return nextc == 0; // 若减到0,返回true(触发唤醒所有阻塞线程)}}
    }
    
    • 自旋 + CAS 保证原子性:通过循环尝试 CAS 操作,保证多线程同时调用 countDown() 时计数递减的原子性;

    • 唤醒触发条件:当 nextc == 0 时返回 true,AQS 会感知到这个状态变化,进而调用 doReleaseShared() 唤醒所有等待的线程;

  • AQS#doReleaseShared():唤醒等待队列的共享线程

    private void doReleaseShared() {for (;;) { // 自旋保证唤醒可靠性Node h = head; // 获取等待队列的头节点if (h != null && h != tail) { // 队列非空且有等待线程int ws = h.waitStatus; // 获取AQS等待队列中头节点的等待状态if (ws == Node.SIGNAL) { // 头节点状态为SIGNAL(表示后续节点需要唤醒)if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // CAS更新状态失败,继续自旋unparkSuccessor(h); // 唤醒头节点的后继线程} // 处理共享模式下的状态传播(保证多个线程依次被唤醒)else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head) // 头节点未变化,说明唤醒操作完成break;}
    }
    
    • 自旋循环:不断尝试唤醒操作,确保所有需要唤醒的线程都能被处理

    • 状态判断与唤醒:若头节点状态为 SIGNAL,则通过 unparkSuccessor(h) 唤醒其后继线程;同时设置状态为 PROPAGATE,保证共享模式下唤醒的“传播性”(多个等待线程依次被唤醒)。

6 总结

  • CountDownLatch 基于 AQS(抽象队列同步器)CAS(比较并交换) 实现:

    • AQS 提供了同步状态管理(state 属性)和等待队列机制,是 CountDownLatch 实现“线程阻塞/唤醒”的基础;

    • CAS 保证了“计数递减”操作的原子性(如 countDown() 中通过 CAS 原子更新 state);

  • CountDownLatch 的构造函数必须指定 count(需等待的线程数),并通过内部类 Sync(继承自 AQS)将 count 赋值给 AQS 的 state 属性。这一步为后续的“计数递减”和“阻塞等待”逻辑奠定了状态基础;

  • 调用 countDown() 时,本质是将 AQS 的 state 减 1(通过 CAS 保证原子性);

  • 当所有线程执行完毕,state 会被减到 0,此时 countDown() 会触发 AQS 唤醒等待队列中所有挂起的线程(即调用 await() 的线程);

  • 调用 await() 时,本质是判断 AQS 的 state 是否为 0:

    • state > 0,说明还有线程未执行完毕,await() 会阻塞当前线程,将其加入 AQS 等待队列;
    • state == 0(最后一个线程执行 countDown() 后),await() 会停止阻塞,当前线程继续执行。
http://www.xdnf.cn/news/1401445.html

相关文章:

  • Spring Boot 后端接收多个文件的方法
  • 项目管理常用的方法有哪些
  • 三菱 PLC的中断指令/中断指针
  • 构建现代化的“历史上的今天“网站:从API到精美UI的全栈实践
  • 北方苍鹰优化算法优化的最小二乘支持向量机NGO-LSSVM多输入多输出回归预测【MATLAB】
  • 2025年06月 Scratch 图形化(二级)真题解析#中国电子学会#全国青少年软件编程等级考试
  • Robolectric如何启动一个Activity
  • 倾斜摄影是选择RGB图像还是多光谱影响进行操作?
  • Transformer:从入门到精通
  • 嵌入式Linux驱动开发:蜂鸣器驱动
  • stack queue的实现 deque的底层结构 priority_queue的实现
  • 【Java实战⑦】从入门到精通:Java异常处理实战指南
  • 漫谈《数字图像处理》之分水岭分割
  • AUTOSAR进阶图解==>AUTOSAR_TR_ClassicPlatformReleaseOverview
  • 计算机毕设项目 基于Python与机器学习的B站视频热度分析与预测系统 基于随机森林算法的B站视频内容热度预测系统
  • observer pattern 最简上手笔记
  • 如何调整Linux系统下单个文件的最大大小?
  • hadoop安欣医院挂号看诊管理系统(代码+数据库+LW)
  • 2025年高性能计算年会
  • centos7.9的openssh漏洞修复脚本
  • w嵌入式分享合集125
  • 【Day 33】Linux-MySQL 备份与恢复详解
  • 【机器学习入门】3.3 FP树算法——高效挖掘频繁项集的“树状神器”
  • SNMPv3开发--简单使用
  • bevformer模型训练过程
  • 嵌入式Linux输入子系统驱动开发
  • Python实现点云AABB和OBB包围盒
  • 后台技术方案设计经验之谈
  • FPGA增量式方差与均值计算
  • 银河麒麟V10(Phytium,D2000/8 E8C, aarch64)开发Qt