JAVA同步器CyclicBarrier
CyclicBarrier
(循环屏障)是 Java 并发包中用于 多线程协同 的工具类,允许多个线程相互等待,直到所有线程都到达某个屏障点(Barrier)后,再继续执行后续任务。其核心特点是 可重复使用,与 CountDownLatch
的一次性特性形成对比。
一、核心概念与使用场景
-
核心功能:
-
屏障点(Barrier):线程调用
await()
后会被阻塞,直到所有线程都到达屏障点。 -
可重复触发:当所有线程到达屏障点后,屏障自动重置,可继续复用。
-
回调任务(可选):当屏障被触发时,可执行预定义的
Runnable
任务(由最后一个到达屏障的线程执行)。
-
-
典型场景:
-
多阶段任务:如并行计算分片,每个阶段完成后同步数据。
-
模拟并发测试:多个线程同时到达屏障后触发压力测试。
-
迭代计算:多次循环中复用同一个屏障。
-
二、源码结构分析
1. 核心成员变量
public class CyclicBarrier {// 内部锁和条件变量private final ReentrantLock lock = new ReentrantLock();private final Condition trip = lock.newCondition();// 屏障的参与者数量(固定值)private final int parties;// 屏障触发时执行的回调任务(可为 null)private final Runnable barrierCommand;// 当前屏障的“代”(Generation),用于标识屏障是否被重置或破坏private Generation generation = new Generation();// 当前剩余等待的线程数(初始值为 parties)private int count;
}
2. 内部类 Generation
记录屏障的当前“代”,用于检测屏障是否被重置或破坏:
private static class Generation {boolean broken = false; // 标识屏障是否被破坏
}
3. 核心方法实现
await()
:线程到达屏障并等待其他线程
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // 不会发生(无超时逻辑)}
}// 核心等待逻辑
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException {final ReentrantLock lock = this.lock;lock.lock(); // 加锁try {final Generation g = generation;// 检查屏障是否被破坏if (g.broken) throw new BrokenBarrierException();// 如果线程被中断,则破坏屏障并抛出异常if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}int index = --count; // 剩余等待线程数减 1if (index == 0) { // 最后一个线程到达屏障Runnable command = barrierCommand;if (command != null) {try {command.run(); // 执行回调任务(无新线程,由当前线程执行)} catch (Throwable ex) {breakBarrier(); // 执行异常则破坏屏障throw ex;}}reset(); // 重置屏障(生成新 Generation,count=parties)return 0;}// 非最后一个线程:循环等待直到屏障触发或超时for (;;) {try {if (!timed) {trip.await(); // 阻塞等待} else {if (nanos > 0L) {nanos = trip.awaitNanos(nanos); // 超时等待}}} catch (InterruptedException ie) {if (g == generation && !g.broken) {breakBarrier(); // 中断时破坏屏障throw ie;} else {Thread.currentThread().interrupt(); // 保留中断状态}}// 检查屏障是否被破坏if (g.broken) throw new BrokenBarrierException();// 如果 Generation 已更新,说明屏障已重置,返回剩余索引if (g != generation) return index;// 超时处理if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock(); // 释放锁}
}
reset()
:手动重置屏障
public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {breakBarrier(); // 破坏当前屏障resetInternal(); // 生成新 Generation} finally {lock.unlock();}
}// 重置内部状态
private void resetInternal() {generation = new Generation(); // 创建新 Generationcount = parties; // 重置计数器
}
breakBarrier()
:破坏屏障并唤醒所有线程
private void breakBarrier() {generation.broken = true; // 标记屏障为破坏状态count = parties; // 重置计数器(为了后续可能的重置)trip.signalAll(); // 唤醒所有等待线程
}
三、代码示例与执行流程
示例代码:3 个线程在屏障处同步
public class CyclicBarrierDemo {public static void main(String[] args) {CyclicBarrier barrier = new CyclicBarrier(3, () -> {System.out.println("所有线程到达屏障,执行回调任务");});for (int i = 0; i < 3; i++) {new Thread(() -> {try {System.out.println("线程到达屏障前");barrier.await(); // 等待其他线程System.out.println("线程通过屏障后");} catch (Exception e) {e.printStackTrace();}}).start();}}
}
执行流程:
-
初始化:屏障的
parties=3
,count=3
。 -
线程调用
await()
:-
每个线程进入
dowait()
,count
依次递减为 2 → 1 → 0。 -
当最后一个线程(
count=0
)到达时:-
执行回调任务(若有)。
-
调用
reset()
:生成新的Generation
,重置count=parties
。 -
唤醒所有等待线程。
-
-
-
唤醒后:所有线程继续执行后续逻辑。
四、关键设计细节
-
可重用性:
-
通过
reset()
方法生成新的Generation
,重置计数器count=parties
。 -
与
CountDownLatch
不同,无需重新创建对象即可多次使用。
-
-
中断与异常处理:
-
若等待线程被中断,会调用 breakBarrier() 破坏屏障,并抛出 InterruptedException。
-
回调任务执行异常会导致屏障被破坏。
-
-
超时机制:
-
支持
await(timeout, unit)
方法,超时后破坏屏障并抛出TimeoutException
。
-
-
线程安全:
-
通过
ReentrantLock
和Condition
实现同步,确保状态修改的原子性。
-
五、注意事项
-
避免死锁:
-
确保参与线程数等于
parties
,否则线程会永久阻塞。 -
若某个线程未调用
await()
,其他线程将无法继续。
-
-
屏障破坏处理:
-
当屏障被破坏(
BrokenBarrierException
)后,必须调用reset()
才能再次使用。
-
-
回调任务执行:
-
回调任务由最后一个到达屏障的线程执行,需避免耗时操作阻塞其他线程。
-
-
性能开销:
-
频繁创建
CyclicBarrier
可能影响性能,建议合理设计parties
数量。
-
六、与 CountDownLatch 对比
特性 | CyclicBarrier | CountDownLatch |
---|---|---|
重置性 | 可重复使用 | 一次性 |
等待方向 | 子等子(全员到齐) | 主等子 |
底层实现 | ReentrantLock + Condition | AQS 共享锁 |
异常处理 | 自动破坏屏障 | 需手动确保计数 |
回调任务 | 支持 | 不支持 |
总结
-
核心原理:基于
ReentrantLock
和Condition
实现线程阻塞与唤醒,通过Generation
管理屏障状态。 -
适用场景:多线程分阶段协同、可重复触发的同步需求。
-
源码关键:
dowait()
方法控制线程等待与唤醒,reset()
实现屏障重置,Generation
标识屏障生命周期。