当前位置: 首页 > web >正文

并发编程——10 CyclicBarrier的源码分析

1 概述

  • CyclicBarrierCountDownLatchSemaphore不同:

    • CountDownLatchSemaphore直接基于 AQS(AbstractQueuedSynchronizer) 实现;

    • CyclicBarrier基于ReentrantLock + ConditionObject实现,间接依赖 AQS(因为 ReentrantLock 底层基于 AQS);

  • 内部结构

    • Generation 静态内部类。它是 CyclicBarrier 实现“循环复用”和“中断/异常处理”的核心:

      • 持有布尔属性 broken,默认 false

      • 当调用 reset() 方法、执行出现异常中断时调用 breakBarrier()broken 会被设为 true

    • 核心属性

      • 计数器:用于记录等待“凑齐一组线程”的进度;

      • generation 属性:关联当前的 Generation 实例,控制屏障的“代”切换;

    • breakBarrier() 方法。当任务执行中断异常或调用 reset() 时触发:

      • 将当前 Generationbroken 设为 true

      • ConditionObjectWaiter 队列中等待的线程,转移到 AQS 队列;

      • 执行 unlock 后,唤醒 AQS 队列中挂起的线程,让它们感知“屏障已破”;

    • await() 方法。这是 CyclicBarrier核心方法

      • 调用时会对“计数器”进行递减处理,直到计数器归 0,所有等待的线程才会一起继续执行;

      • 过程中会处理线程的等待、唤醒,以及 Generation 的切换逻辑;

    在这里插入图片描述

2 构造函数

  • CyclicBarrier 有两个构造函数,最终都调用 CyclicBarrier(int parties, Runnable barrierAction) 这个核心构造函数:

    public CyclicBarrier(int parties) {this(parties, null);
    }public CyclicBarrier(int parties, Runnable barrierAction) {// 参数合法性校验if (parties <= 0) throw new IllegalArgumentException();// final修饰,所有线程执行完成归为或重置时使用this.parties = parties;// 在await方法中计数值,表示还有多少线程待执行awaitthis.count = parties;// 当计数count为0时 ,执行此Runnnable,再唤醒被阻塞的线程this.barrierCommand = barrierAction;
    }
    
    • parties:表示需要“凑齐”的线程数量,只有当这么多线程都调用 await() 后,屏障才会放行。若 parties ≤ 0,会直接抛出 IllegalArgumentException 异常;

    • count:是一个计数器,初始值等于 parties。每次有线程调用 await()count 就减 1,直到 count = 0 时触发屏障放行逻辑;

    • parties(实例属性):用 final 修饰,是 CyclicBarrier 复用(重置)时的基准线程数;

    • barrierAction:是一个 Runnable 任务,当所有线程凑齐(count 减到 0)时,会优先执行这个任务,然后再唤醒所有等待的线程。若不需要回调任务,可传 null

  • 它的核心作用是初始化“线程凑齐的门槛”和“凑齐后的回调逻辑”

    • 先校验 parties 的合法性,保证必须有正整数个线程参与;

    • 初始化 countparties,用于后续 await() 方法中跟踪线程凑齐的进度;

    • 初始化 barrierCommand(即 barrierAction),指定所有线程凑齐后要执行的回调任务。

3 await()

  • await() 有两个重载版本,用于支持“无限等待”和“超时等待”两种场景:

    • await():线程会一直等待,直到指定数量的线程都调用 await() 才继续执行;若等待过程中屏障被中断、异常破坏,会抛出 InterruptedExceptionBrokenBarrierException

      // 执行没有超时时间的await
      public int await() throws InterruptedException, BrokenBarrierException {try {// 执行dowait()return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe);}
      }
      
    • await(long timeout, TimeUnit unit):线程最多等待 timeout 时间,若指定时间内线程未凑齐,会抛出 TimeoutException;同时也会处理中断、屏障破坏的情况;

      // 执行有超时时间的await
      public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));
      }
      
  • await() 最终都调用 dowait() 方法,其流程可分为以下关键步骤:

    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {// 获取锁final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取generation对象(屏障的“代”)final Generation g = generation;// 若 Generation 的 broken 为 true(屏障已破坏),直接抛出 BrokenBarrierException// 屏障已破坏,即线程中在执行过程中是否异常、超时、中断、重置if (g.broken)throw new BrokenBarrierException();// 若当前线程被中断if (Thread.interrupted()) {breakBarrier(); // 调用breakBarrier()标记屏障为“破坏”状态,将等待线程转移到 AQS 队列throw new InterruptedException(); // 并抛出 InterruptedException}// 执行 --count,得到当前线程在“凑齐序列”中的索引 indexint index = --count;// 若所有线程已凑齐if (index == 0) {// 执行结果标识boolean ranAction = false;try {// 执行构造函数中传入的 barrierAction(若不为 null),barrierCommand 即 barrierActionfinal Runnable command = barrierCommand;if (command != null)command.run();// 执行完成,将执行结果设置为trueranAction = true;nextGeneration(); // 调用nextGeneration()重置计数器和Generation,实现屏障的“循环复用”return 0; // 返回 0,表示当前线程是最后一个凑齐的线程} finally {// 执行过程中出现问题if (!ranAction)// 重置标识与计数值,将Waiter队列中的线程转移到AQS队列breakBarrier();}}// 若线程未凑齐(index != 0),进入循环挂起逻辑(自旋):for (;;) {try {// 无超时场景(timed = false)if (!timed)trip.await(); // 调用 Condition.await(),将线程挂起在 Condition 队列// 有超时场景(timed = true)else if (nanos > 0L)nanos = trip.awaitNanos(nanos); // 调用Condition.awaitNanos(nanos),线程最多挂起nanos纳秒,返回剩余等待时间} catch (InterruptedException ie) { // 过程中若被中断、屏障破坏或超时,会触发对应异常if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}// 该组线程被中断、执行异常、超时,抛出BrokenBarrierException异常if (g.broken)throw new BrokenBarrierException();// g 是线程进入 dowait() 时获取的 “当前代”(Generation 实例)// generation 是 CyclicBarrier 的全局属性,表示 “最新代”// 若“当前代”与“最新代”不一致,说明在该线程等待期间,屏障已经通过nextGeneration()切换到了新的 “代”(可能是因为之前的线程已凑齐并重置了屏障)// 此时当前线程的等待已经“过期”,无需继续处理,直接返回 index 即可if (g != generation)return index;// 超时,抛出异常TimeoutExceptionif (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {// 无论等待是否成功,最终都会释放 ReentrantLock,保证锁资源的回收lock.unlock();}
    }
    

4 breakBarrier()

  • breakBarrier()CyclicBarrier 中用于主动“破坏屏障”的核心方法。当线程执行出现中断异常或调用 reset() 时,会触发该方法,让所有等待的线程退出等待状态;

    // 结束CyclicBarrier的执行
    private void breakBarrier() {// 将当前 Generation 的 broken 属性设为 true,标识该屏障已被破坏。后续线程检查到这个标记时,会抛出 BrokenBarrierExceptiongeneration.broken = true;// 把计数器 count 重置为构造函数中指定的 parties(即需要凑齐的线程总数),为下一次屏障复用做准备count = parties;// trip 是 ConditionObject 实例,调用 signalAll() 会将 Condition 队列中所有等待的线程转移到 AQS 队列// 当锁释放(unlock())后,这些线程会被唤醒,从而感知到 “屏障已破坏” 并退出等待trip.signalAll();
    }
    

5 reset()

  • reset()CyclicBarrier 用于主动重置屏障状态的方法,让 CyclicBarrier 可以“循环复用”,回到初始状态以支持新的一轮线程凑齐逻辑;

    // 重置CyclicBarrier
    public void reset() {// 获取 ReentrantLock 并加锁,确保 reset() 操作的原子性,避免多线程并发修改导致状态混乱final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 调用 breakBarrier() 方法,标记当前 Generation 为“破坏”状态,重置计数器,并唤醒所有等待的线程,让它们退出当前屏障的等待逻辑breakBarrier();// 生成新的 Generation 实例,重置计数器 count 为 parties(构造函数中指定的线程总数),让 CyclicBarrier 进入“新的一轮”,可以接收新的线程凑齐请求nextGeneration();} finally {// 无论重置过程是否出现异常,最终都会释放锁,保证锁资源的正确回收lock.unlock();}
    }
    

6 nextGeneration()

  • CyclicBarrier 实现**“循环复用”**的核心方法,用于在一组线程凑齐后,将屏障状态“归位”,以便支持下一轮线程的凑齐逻辑;
private void nextGeneration() {// trip 是 ConditionObject 实例,调用 signalAll() 会将 Condition 队列中所有等待的线程转移到 AQS 队列。当锁释放后,这些线程会被唤醒,继续执行后续逻辑trip.signalAll();// 将计数器 count 重置为构造函数中指定的 parties(即需要凑齐的线程总数),为下一轮凑齐逻辑准备count = parties;// 创建新的 Generation 实例,标记为 “新的一代” 屏障,与上一轮的屏障状态隔离generation = new Generation();
}

7 总结

  • CyclicBarrier 基于 ReentrantLock + ConditionObject 实现。构造函数必须指定 parties(初始待执行线程数),内部通过 generation 这个带有布尔属性的结构,标记当前屏障执行过程中是否出现超时、异常、中断等情况;

  • 构造函数会把 parties 赋值给计数值 count,每当有一个线程执行 await() 方法,count 就会减 1;

  • 线程凑齐后的执行流程

    • count 减到 0 时,代表所有线程都准备就绪。此时会先判断是否初始化了 barrierCommand(构造函数中传入的 Runnable 任务),如果有则优先执行该任务
    • barrierCommand 执行完成后,会把 Condition 队列(Waiter 队列)中的线程转移到 AQS 队列,执行 unlock 操作后唤醒这些线程;同时将计数值 countgeneration 归位,为下一轮屏障复用做准备。
http://www.xdnf.cn/news/19594.html

相关文章:

  • 大模型参数到底是什么?
  • synchronized的锁对象 和 wait,notify的调用者之间的关系
  • EKS上部署gpu服务利用karpenter实现自动扩缩(s3作为共享存储)
  • 一、计算机系统知识
  • C++ 枚举算法详细利用与数字分解教学教案
  • Spring Security 6.x 功能概览与代码示例
  • 程序员独立开发直播卖产品 SOP 教程
  • arm容器启动spring-boot端口报错
  • 基于开源AI大模型、AI智能名片与S2B2C商城小程序的“教育用户”模式探究
  • 谈谈对BFC的理解
  • 当代科学(范畴大辩论) 的学科分科(论据)的要素论(论点)及方法论(论证):边缘处理
  • 浅谈 SQL 窗口函数:ROW_NUMBER() 与聚合函数的妙用
  • 机器视觉opencv教程(三):形态学变换(腐蚀与膨胀)
  • 利用爬虫获取淘宝商品信息,参数解析
  • 基于单片机停车场管理系统/车位管理/智慧停车系统
  • 小迪自用web笔记22
  • Java线程池使用入门
  • uvm验证环境中struct(结构体)和class的区别与联系
  • 基于单片机老人防丢失防摔倒系统/老人健康状态检测系统
  • CMake⼯程指南-3
  • [光学原理与应用-361]:ZEMAX - 分析 - 像差分析
  • KingbaseES V009版本发布:国产数据库的新飞跃
  • 基于全参考图的质量评价均方误差MSE、峰值信噪比PSNR
  • [特殊字符] Rust概述:系统编程的革命者
  • 力扣(LeetCode) ——101. 对称二叉树(C语言)
  • Vue Router 嵌套路由与布局系统详解:理解 component = router-view 的核心概念
  • 接口测试总结-含接口测试和前端测试的区别与比较
  • Matlab自学笔记六十六:求解带参数的不等式
  • 国庆福建霞浦游
  • Linux 启动传参