当前位置: 首页 > ai >正文

并发编程——06 JUC并发同步工具类的应用实战

0 常用并发同步工具类的真实应用场景

  • JDK 提供了比synchronized更加高级的各种同步工具,包括ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等,可以实现更加丰富的多线程操作;

    在这里插入图片描述

1 ReentrantLock(可重入的占用锁)

1.1 简介

  • ReentrantLock可重入的独占锁

    • “可重入”是指同一线程能多次获取同一把锁,不会自己阻塞自己;
    • “独占”是说同一时间,最多只有一个线程能成功拿到锁,其他线程得等待;
    • synchronized作用类似,都是解决多线程并发访问共享资源时的线程安全问题;
  • 相比 synchronizedReentrantLock 多了这些灵活特性:

    • 可中断:获取锁的过程中,线程能响应中断(比如其他地方调用了 interrupt()),不用死等锁释放,更灵活控制执行流程;

    • 可设置超时时间:调用 tryLock(long timeout, TimeUnit unit) 时,线程在指定时间内没拿到锁,就会放弃尝试,避免无限阻塞;

    • 可设置为公平锁:默认 ReentrantLock 是 “非公平锁”(新线程和等待队列里的线程抢锁,可能插队),但它支持通过构造方法 ReentrantLock(true) 设为“公平锁”,严格按线程等待顺序分配锁,减少线程饥饿(某些线程一直拿不到锁);

  • synchronized 一样,都支持可重入:

    • synchronizedwait/notify 实现线程通信,只能关联一个等待队列;
    • ReentrantLock 可通过 newCondition() 创建多个 Condition,精准控制不同线程的等待 / 唤醒,比如生产者 - 消费者模型里,能区分 “生产条件”“消费条件” 分别处理;
  • 应用场景:多线程抢共享资源时,需要独占访问保证数据安全,比如卖票系统(如下两图)、银行账户转账;

    在这里插入图片描述

    • 线程 A、B 抢锁:线程 A、B 同时尝试获取锁,假设线程 A 先拿到(锁的独占性,同一时间只有 A 能持有),此时 A 可以操作共享资源(比如修改车票库存 ),B 因为没抢到,进入 “等待” 状态;

    在这里插入图片描述

    • 线程 A 释放锁:A 操作完共享资源后,会释放锁;接着 B 再次尝试获取锁,这次就能成功拿到,然后 B 开始操作共享资源(修改车票库存)。

1.2 常用API

  • ReentrantLock 实现了Lock接口规范,常见API如下:

    方法方法声明功能说明
    lockvoid lock()获取锁,调用该方法当前线程会获取锁,当锁获得后,该方法返回
    lockInterruptiblyvoid lockInterruptibly() throws InterruptedException可中断的获取锁,和 lock() 方法不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程
    tryLockboolean tryLock()尝试非阻塞的获取锁,调用该方法后立即返回。如果能够获取到锁返回 true,否则返回 false
    tryLock(带超时)boolean tryLock(long time, TimeUnit unit) throws InterruptedException超时获取锁,当前线程在以下三种情况下会返回:当前线程在超时时间内获取了锁;当前线程在超时时间内被中断;超时时间结束,返回 false
    unlockvoid unlock()释放锁
    newConditionCondition newCondition()获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的 await() 方法,而调用后,当前线程将释放锁
  • 基本用法:

    private final Lock lock = new ReentrantLock();public void foo()
    {// 获取锁lock.lock();try {// 程序执行逻辑} finally {// finally语句块可以确保lock被正确释放lock.unlock();}
    }// 尝试获取锁,最多等待 100 毫秒  
    if (lock.tryLock(100, TimeUnit.MILLISECONDS)) {  try {  // 成功获取到锁,执行需要同步的代码块  // ... 执行一些操作 ...  } finally {  // 释放锁  lock.unlock();  }  
    } else {  // 超时后仍未获取到锁,执行备选逻辑  // ... 执行一些不需要同步的操作 ...  
    }  
    
  • 在使用时要注意以下 4 个问题:

    • 默认情况下 ReentrantLock 为非公平锁而非公平锁;
    • 加锁次数和释放锁次数一定要保持一致,否则会导致线程阻塞或程序异常;
    • 加锁操作一定要放在try代码之前,这样可以避免未加锁成功又释放锁的异常;
    • 释放锁一定要放在finally中,否则会导致线程阻塞;
  • 工作原理:

    • 当有线程调用lock方法时,会用 CAS(Compare-And-Swap,比较并交换) 操作,尝试把 AQS(AbstractQueuedSynchronizer,抽象队列同步器,Java 并发包的核心基础组件 )内部的 state 变量从 0 改成 1

      • state=0 表示“锁没人用”,CAS 成功 → 线程拿到锁,开始执行临界区代码(操作共享资源);
      • state=1 表示“锁被占用”,CAS 失败 → 线程抢锁失败,进入阻塞队列(CLH 队列,按 FIFO 排队 ) 等待;
    • 抢锁失败的线程,会被包装成节点(Node),加入队列尾部(tail),队列头部是 head 节点(代表 “即将拿到锁的线程”);

      • 队列里的线程,都在等锁释放,避免线程忙等(一直重试抢锁,浪费 CPU 资源);
      • 队列是 FIFO(先进先出) 顺序,理论上保证线程公平性,但实际还受“公平锁 / 非公平锁”策略影响;
    • 当持有锁的线程执行完 unlock(),会把 state 改回 0(释放锁),然后唤醒队列里的线程。这时分两种策略:

      • 公平锁(ReentrantLock(true)):严格按队列顺序唤醒:释放锁后,优先唤醒 head 节点的下一个节点(head.next),让队列里“等最久”的线程拿到锁;

        • 优点:绝对公平,避免线程 饥饿(某些线程一直抢不到锁);
          • 缺点:频繁唤醒 / 切换线程,性能略低(线程上下文切换有开销);
      • 非公平锁(默认策略,ReentrantLock()):释放锁后,不严格按队列顺序,允许新线程和队列里被唤醒的线程重新用 CAS 抢锁:

        • 新线程(没进队列的)可能直接 CAS 抢锁成功(插队),不用进队列等;

        • 队列里的线程也会被唤醒,参与竞争;

        • 优点:减少线程切换,吞吐量更高(适合竞争不激烈的场景);

        • 缺点:可能让队列里的线程等更久,存在小概率线程饥饿;

    在这里插入图片描述

1.3 使用

1.3.1 独占锁

  • 模拟抢票场景。8张票,10个人抢,如果不加锁,会出现什么问题?

    /*** 模拟抢票场景*/
    public class ReentrantLockDemo {// 创建 ReentrantLock 实例,默认使用非公平锁策略private final ReentrantLock lock = new ReentrantLock();//默认非公平// 共享资源:总票数,会有多个线程同时操作这个变量private static int tickets = 8;/*** 购买车票的方法* 核心逻辑:通过加锁保证同一时间只有一个线程能执行购票操作*/public void buyTicket() {// 1. 获取锁:调用 lock() 方法,当前线程会尝试获取锁// 如果锁未被占用,则当前线程获得锁并继续执行// 如果锁已被占用,则当前线程会进入阻塞队列等待lock.lock(); // 获取锁// 2. try-finally 结构保证锁一定会被释放// 即使代码执行过程中发生异常,finally 块也会执行解锁操作try {// 3. 临界区:操作共享资源(tickets 变量)if (tickets > 0) { // 检查是否还有剩余车票try {// 休眠 10ms,放大并发问题的可能性// 如果不加锁,这里会出现多个线程同时进入判断并扣减票数的情况Thread.sleep(10); // 模拟出并发效果} catch (InterruptedException e) {e.printStackTrace();}// 打印购票信息,并将票数减 1(原子操作)System.out.println(Thread.currentThread().getName() + "购买了第" + tickets-- + "张票");} else {// 票已售罄时的提示System.out.println("票已经卖完了," + Thread.currentThread().getName() + "抢票失败");}} finally {// 4. 释放锁:无论是否发生异常,都必须释放锁// 否则会导致其他线程永远无法获取锁,造成死锁lock.unlock(); // 释放锁}}public static void main(String[] args) {// 创建抢票系统实例(共享同一个锁和票数变量)ReentrantLockDemo ticketSystem = new ReentrantLockDemo();// 创建 10 个线程模拟 10 个用户抢票(总票数只有 8 张)for (int i = 1; i <= 10; i++) {Thread thread = new Thread(() -> {// 每个线程执行抢票操作ticketSystem.buyTicket(); // 抢票}, "线程" + i); // 给线程命名,方便观察输出// 启动线程,线程进入就绪状态,等待 CPU 调度thread.start();}try {// 主线程休眠 3000ms,等待所有抢票线程执行完毕// 避免主线程提前打印剩余票数Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 打印最终剩余票数,验证是否正确(应该为 0)System.out.println("剩余票数:" + tickets);}
    }
    
  • 不加锁:出现超卖问题

    在这里插入图片描述

  • 加锁:正常,两人抢票失败

    在这里插入图片描述

1.3.2 公平锁和非公平锁

  • ReentrantLock 支持公平锁和非公平锁两种模式:

    • 公平锁:线程在获取锁时,按照线程等待的先后顺序获取锁;
    • 非公平锁:线程在获取锁时,不按照等待的先后顺序获取锁,而是随机获取锁。ReentrantLock 默认是非公平锁;
    ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁  
    ReentrantLock lock = new ReentrantLock(true); //公平锁  
    
  • 比如买票的时候就有可能出现插队的场景,允许插队就是非公平锁,如下图:

    在这里插入图片描述

1.3.3 可重入锁

  • 可重入锁又名递归锁,是指在同一线程里,只要锁对象相同,内层方法(或代码块)能直接复用已获取的锁,不用重新竞争。比如线程执行 方法A 时拿到锁,方法A 里调用 方法B(也需要同一把锁),线程进 方法B 时不用等自己释放锁,直接继续用;

  • Java 中ReentrantLocksynchronized都是可重入锁

    • synchronized:隐式的(JVM 自动管加锁 / 释放)、可重入的内置锁,只要是同一线程、同一对象锁,内层同步代码直接重入;
    • ReentrantLock:显式的(手动 lock() 加锁、unlock() 释放)可重入锁,功能更灵活(支持公平 / 非公平、可中断、超时获取等),但得手动配对加锁释放,否则容易死锁;
  • 可重入锁的一个优点是可一定程度避免死锁:

    • 要是锁不可重入,同一线程内层方法需要锁时,会因为自己占着锁没放,导致自己等自己(阻塞),最后死锁。可重入锁允许同一线程重复拿锁,从设计上就避免了这种自己堵死自己的情况;
    • 注意:只是一定程度避免,要是代码逻辑乱(比如忘记释放锁、不同锁交叉嵌套不当),还是可能死锁,只是解决了同一线程重入锁这类场景的死锁风险;
  • 应用场景:

    • 递归操作:递归函数里加锁,每次递归调用都是内层“方法”,可重入锁让线程不用反复竞争锁,比如计算阶乘时用 ReentrantLock 保护共享变量,递归调用时直接重入;
    • 调用同一类其他方法:类里多个 synchronized 方法,线程调完一个调另一个,因为是同一对象锁,直接重入,不用额外处理;
    • 锁嵌套:多层代码块都需要同一把锁,外层加锁后,内层嵌套的加锁逻辑直接复用,不用释放外层锁再重新加;
  • 例:

    class Counter {// 创建 ReentrantLock 对象,作为可重入锁的实例// ReentrantLock 是显式锁,支持可重入、可中断、公平/非公平等特性private final ReentrantLock lock = new ReentrantLock(); // 递归调用方法,演示可重入锁的核心场景public void recursiveCall(int num) {// 1. 获取锁:同一线程再次调用时,可直接重入,不会阻塞自己//    可重入的关键体现:锁对象识别当前持有线程,允许重复获取lock.lock(); try {// 递归终止条件:num 减到 0 时停止if (num == 0) {return;}// 打印当前递归层级,证明方法执行(锁已成功获取)System.out.println("执行递归,num = " + num);// 2. 递归调用自身:再次进入方法时,会再次执行 lock.lock()//    由于是【同一线程】操作【同一把锁】,可直接重入,不会阻塞recursiveCall(num - 1); } finally {// 3. 释放锁:递归调用多少次,就要释放多少次//    保证锁的获取与释放次数严格匹配,避免死锁lock.unlock(); }}// 主方法:测试可重入锁的递归场景public static void main(String[] args) throws InterruptedException {// 创建 Counter 实例,所有递归调用共享同一把锁Counter counter = new Counter(); // 启动递归测试:从 num=10 开始调用// 预期行为:线程安全执行递归,不会因锁重入导致阻塞counter.recursiveCall(10); }
    }
    

1.3.4 Condition 详解

  • Condition 是 Java 并发包里的线程协调工具,依赖 Lock(如 ReentrantLock)使用,比 Objectwait/notify 更灵活,解决线程间按条件等待 / 唤醒的问题。可以把它理解成:给 Lock 搭配“专属等待队列”,让线程能按需等待条件、精准唤醒,而不是像 wait/notify 只能用 Object 的单一队列;

  • 核心优势(对比 Object.wait/notify

    • 多条件分离

      • Object 里,一个对象只有 1 个等待队列(所有 wait() 的线程都挤在一起);
      • Condition 让一个 Lock 可以有多个等待队列(比如锁 lock 可以创建 condition1condition2,不同条件的线程进不同队列),唤醒时能精准选队列,避免唤醒无关线程;
    • 更灵活的等待控制

      • 支持超时等待await(long time, TimeUnit unit) ),避免线程无限阻塞;
      • 唤醒时可选单个唤醒(signal())或全部唤醒(signalAll(),比 notify()(随机唤醒一个)、notifyAll()(唤醒全部)更精准;
  • 核心方法解析

    返回值类型方法作用说明
    voidawait()让当前线程进入等待,直到被 signal()/signalAll() 唤醒、被中断,或意外唤醒(如假唤醒) 等待前释放当前持有的 Lock,唤醒后重新竞争获取锁,再继续执行
    booleanawait(long time, TimeUnit unit)限时等待:等待 time 时间后,若没被唤醒就自动返回 false;被唤醒则返回 true 同样会先释放锁,超时 / 唤醒后重新抢锁。
    voidsignal()唤醒 Condition 等待队列中一个线程(选一个唤醒,类似 notify() 但更可控) 唤醒后,线程不会直接执行,要重新竞争锁
    voidsignalAll()唤醒 Condition 等待队列中所有线程(类似 notifyAll()) 线程被唤醒后,重新竞争锁,抢到锁的继续执行
  • 比如生产者 - 消费者模型中,想区分“队列满了让生产者等”和“队列空了让消费者等”:

    • ReentrantLock 加锁,然后创建两个 ConditionnotFull(生产者等)、notEmpty(消费者等);

    • 生产者发现队列满了 → 调用 notFull.await() 等待;消费者取走数据后 → 调用 notFull.signal() 唤醒生产者;

    • 消费者发现队列空了 → 调用 notEmpty.await() 等待;生产者放入数据后 → 调用 notEmpty.signal() 唤醒消费者;

    • 这样就能 精准控制不同条件的线程等待 / 唤醒,比 wait/notify 更清晰。

1.3.5 结合 Condition 实现生产者消费者模式

案例:基于ReentrantLockCondition实现一个简单队列

public class ReentrantLockDemo3 {public static void main(String[] args) {// 1. 创建容量为 5 的队列,作为生产者和消费者共享的资源Queue queue = new Queue(5);// 2. 启动生产者线程:传入队列,线程执行 Producer 的 run 方法new Thread(new Producer(queue)).start();// 3. 启动消费者线程:传入队列,线程执行 Customer 的 run 方法new Thread(new Customer(queue)).start();}
}/*** 队列封装类:* 用 ReentrantLock + Condition 实现线程安全的生产者-消费者队列* 核心逻辑:* - 队列满时,生产者通过 notFull.await() 等待;* - 队列空时,消费者通过 notEmpty.await() 等待;* - 生产/消费后,用 signal() 唤醒对应等待线程*/
class Queue {private Object[] items;      // 存储队列元素的数组int size = 0;                // 当前队列中元素数量int takeIndex = 0;           // 消费者取元素的索引int putIndex = 0;            // 生产者放元素的索引private ReentrantLock lock;  // 控制并发的锁public Condition notEmpty;   // 消费者等待条件:队列空时阻塞,生产后唤醒public Condition notFull;    // 生产者等待条件:队列满时阻塞,消费后唤醒// 初始化队列,指定容量public Queue(int capacity) {this.items = new Object[capacity];lock = new ReentrantLock();// 为同一把锁创建两个 Condition,分别控制“空”和“满”的等待notEmpty = lock.newCondition();notFull = lock.newCondition();}/*** 生产者放入元素的方法* 必须在 lock 保护下调用,保证线程安全*/public void put(Object value) throws Exception {// 加锁:同一时间只有一个线程能操作队列lock.lock();try {// 队列满了(size == 数组长度),让生产者等待// 用 while 而非 if:防止“假唤醒”后直接执行(需再次检查条件)while (size == items.length) notFull.await();  // 释放锁,进入等待队列,直到被唤醒// 队列有空位,放入元素items[putIndex] = value;// 索引循环:如果放到数组末尾,重置为 0if (++putIndex == items.length) putIndex = 0;size++;  // 元素数量+1// 生产完成,唤醒等待的消费者(队列非空了)notEmpty.signal(); } finally {// 测试用:打印生产日志(实际可删除或放业务逻辑里)System.out.println("producer生产:" + value);// 必须释放锁:无论是否异常,保证锁能被其他线程获取lock.unlock();}}/*** 消费者取出元素的方法* 必须在 lock 保护下调用,保证线程安全*/public Object take() throws Exception {// 加锁:同一时间只有一个线程能操作队列lock.lock();try {// 队列空了(size == 0),让消费者等待// 用 while 而非 if:防止“假唤醒”后直接执行(需再次检查条件)while (size == 0) notEmpty.await();  // 释放锁,进入等待队列,直到被唤醒// 取出元素Object value = items[takeIndex];items[takeIndex] = null;  // 清空位置,避免内存泄漏// 索引循环:如果取到数组末尾,重置为 0if (++takeIndex == items.length) takeIndex = 0;size--;  // 元素数量-1// 消费完成,唤醒等待的生产者(队列非满了)notFull.signal(); return value;  // 返回取出的元素} finally {// 释放锁:无论是否异常,保证锁能被其他线程获取lock.unlock();}}
}/*** 生产者线程:* 每隔 1 秒生产一个随机数(0~999),放入队列*/
class Producer implements Runnable {private Queue queue;  // 共享的队列public Producer(Queue queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {  // 无限循环生产Thread.sleep(1000);  // 每隔 1 秒生产一次// 生产随机数,放入队列queue.put(new Random().nextInt(1000));}} catch (Exception e) {e.printStackTrace();  // 捕获并打印异常}}
}/*** 消费者线程:* 每隔 2 秒从队列取出一个元素,打印消费日志*/
class Customer implements Runnable {private Queue queue;  // 共享的队列public Customer(Queue queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {  // 无限循环消费Thread.sleep(2000);  // 每隔 2 秒消费一次// 取出元素并打印消费日志System.out.println("consumer消费:" + queue.take());}} catch (Exception e) {e.printStackTrace();  // 捕获并打印异常}}
}
  • Condition 的作用

    • notEmpty:消费者专属等待条件。队列空时,消费者调用 notEmpty.await() 释放锁并阻塞;生产者放入元素后,用 notEmpty.signal() 唤醒。
    • notFull:生产者专属等待条件。队列满时,生产者调用 notFull.await() 释放锁并阻塞;消费者取出元素后,用 notFull.signal() 唤醒;
    • 注意:await()signal() 都必须被 lock.lock()lock.unlock() 包裹,即都在 lock 保护范围内;
  • 为什么用 while 而非 if 检查条件?防止**“假唤醒”**:线程可能在未被 signal() 的情况下醒来(如系统调度)。用 while 会重新检查条件,确保队列状态符合预期后再继续执行;

  • 锁的配对使用lock.lock()lock.unlock() 必须成对出现,且 unlock() 放在 finally 中,保证无论是否发生异常,锁都会释放,避免死锁;

  • 生产者-消费者的节奏:生产者 1 秒生产一次,消费者 2 秒消费一次 → 队列会逐渐被填满(生产者更快),但通过 Condition 协调,不会出现“队列满了还生产”或“队列空了还消费”的情况。

1.4 应用场景总结

  • ReentrantLock 最基本的作用是多线程环境下,让共享资源只能被一个线程独占访问,保证操作共享资源时数据不会乱(比如多个线程同时改同一个变量,用锁让它们排队改);

  • 应用场景总结:

    • 解决多线程竞争资源的问题

      • 场景描述:多个线程抢同一资源(比如同时写同一个数据库、操作同一个文件、修改同一个内存变量),需要保证同一时间只有一个线程能改,避免数据冲突;

      • 例子:多个线程同时往数据库同一表插入/修改数据,用 ReentrantLock 加锁,让线程排队执行写操作,保证数据最终是正确的,不会因为并发写入导致数据混乱(比如库存扣减、订单状态修改);

    • 实现多线程任务的顺序执行

      • 场景描述:希望线程 A 执行完某段逻辑后,线程 B 再执行;或者多个线程严格按特定顺序跑任务;

      • 例子:比如线程 1 先初始化配置,线程 2 再加载数据,线程 3 最后处理业务。用 ReentrantLock 配合 Condition(条件变量 ),线程 2 等线程 1 释放锁并发信号后再执行,线程 3 等线程 2 发信号后执行,实现顺序控制;

    • 实现多线程等待/通知机制

      • 场景描述:线程 A 完成某个关键步骤后,需要主动通知线程 B、C 可以继续执行了;或者线程需要等待某个条件满足后再干活(类似生产者 - 消费者模型);

      • 例子:生产者线程生产完数据,通过 ReentrantLockCondition 发信号,唤醒等待的消费者线程来处理数据;反之,消费者处理完,也能发信号让生产者继续生产。这比 Objectwait/notify 更灵活,能精准控制哪些线程被唤醒。

2 Semaphore(信号量)

2.1 简介

  • Semaphore是多线程同步工具,核心解决控制同时访问共享资源的线程数量,让有限的资源(比如数据库连接、文件句柄)在同一时间被合理数量的线程使用,避免因资源耗尽导致系统崩溃;

  • 工作原理:使用Semaphore的过程实际上是多个线程获取访问共享资源许可证的过程

    • Semaphore内部维护一个计数器,可以把它理解成剩余许可证数量。比如设置 Semaphore(3),就代表最多允许 3 个线程同时用资源,相当于发 3 张“访问许可证”;

    • 线程获取许可证(acquire()

      • 线程要访问共享资源时,必须先调用 acquire() 拿许可证;
      • 如果计数器 > 0(还有许可证):线程拿到许可证,计数器减 1,然后继续执行(访问资源);
      • 如果计数器 == 0(没许可证了):线程会被阻塞,直到有其他线程释放许可证;
    • 线程释放许可证(release():线程用完资源后,调用release()归还许可证,计数器加 1,这样阻塞的线程里就有机会拿到新的许可证,继续执行;

    在这里插入图片描述

  • Semaphore 专门用来限制资源的并发访问数量,典型场景比如:

    • 数据库连接池:假设连接池只有 10 个连接,用 Semaphore(10) 控制,避免几百个线程同时抢连接,把数据库压垮;

    • 文件访问:如果一个文件同一时间只能被 3 个线程读写,用 Semaphore(3) 限制,防止文件被疯狂读写导致错误;

    • 网络请求:控制同时发起的 HTTP 请求数量,避免把服务器或本地网络打满,保证系统稳定。

2.2 常用API

2.2.1 构造器

  • 构造器是用来创建 Semaphore 对象的,主要决定两件事:

    • 允许同时访问资源的 最大线程数(许可证数量)
    • 线程获取许可证时,是用 公平策略 还是 非公平策略
  • Semaphore有两个构造器:

    public Semaphore(int permits) {sync = new NonfairSync(permits);
    }
    
    • permits:设置最大并发数(比如传 3,就代表最多允许 3 个线程同时拿许可证);

    • NonfairSync:默认用非公平策略。意思是,当许可证释放时,新线程和等待队列里的线程一起抢许可证,新线程可能“插队”,不用严格排队;

    • 等价写法new Semaphore(3) 等价于 new Semaphore(3, false),因为默认是非公平的;

    public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    
    • permits:同上,设置最大并发数;

    • fair:布尔值,决定用公平还是非公平策略:

      • fair = true:用公平策略FairSync),线程严格按“等待队列顺序”拿许可证,先等的线程一定先拿到,不会被插队;
      • fair = false:用非公平策略NonfairSync),新线程和等待线程一起抢,可能插队。

2.2.2 acquire方法

  • acquireSemaphore用于获取许可证的核心方法,特点是获取不到许可证时,线程会一直阻塞等待,直到拿到许可证或者被中断。它有两种重载形式,适配不同的许可证获取需求;

    • void acquire() throws InterruptedException

      • 尝试获取 1 个许可证。如果当前有可用许可证,直接获取(许可证计数减 1)并返回;如果没有可用许可证,当前线程会进入阻塞状态,直到:

        • 其他线程释放许可证(使当前线程有可用许可证),此时当前线程会竞争获取许可证;
        • 当前线程被其他线程中断(触发 InterruptedException);
      • 类比:类似去银行办事,只有 1 个窗口(许可证 = 1),当前窗口有人(没许可证),你就只能排队等着,直到窗口空出来(有人办完业务,释放许可证);

    • void acquire(int permits) throws InterruptedException

      • 功能:尝试获取指定数量(permits )的许可证。如果 Semaphore 中剩余许可证数量 ≥ permits,直接获取(许可证计数减 permits)并返回;否则,线程进入阻塞,直到:

        • 其他线程释放许可证,使剩余许可证 ≥ permits,当前线程竞争获取;
        • 当前线程被中断(触发 InterruptedException);
      • 注意:使用该方法时,要确保最终能释放对应数量的许可证,否则容易导致其他线程长期无法获取足够许可证而阻塞,引发“资源饥饿”问题;

      • 类比:如果银行办理大额业务需要占 2 个窗口(permits = 2),只有当至少空出 2 个窗口时,你才能开始办理,否则就得等;

  • 例:主线程先占许可证,子线程等待,主线程释放后子线程再获取

    // 1. 创建 Semaphore:许可证数量=1,公平策略(true 表示严格按线程等待顺序分配许可证)
    final Semaphore semaphore = new Semaphore(1, true);  // 2. 主线程直接抢许可证:因为初始许可证是 1,主线程能直接拿到(许可证计数变为 0)
    semaphore.acquire();  // 3. 创建子线程,尝试获取许可证
    Thread t = new Thread(() -> {  try {// 子线程执行到这,尝试获取许可证:但主线程已占用(许可证计数 0),所以子线程进入阻塞等待System.out.println("子线程等待获取permit"); semaphore.acquire();  // 4. 子线程被唤醒(主线程释放许可证后),执行到这,打印获取成功System.out.println("子线程获取到permit");  } catch (InterruptedException e) { // 子线程等待中被中断时,会走到这e.printStackTrace();  } finally {// 5. 子线程执行完,释放许可证(许可证计数 +1 )semaphore.release();  }
    });
    t.start(); // 启动子线程try {// 6. 主线程休眠 5 秒:模拟做其他事情,期间子线程一直阻塞等待许可证TimeUnit.SECONDS.sleep(5);  
    } catch (InterruptedException e) {e.printStackTrace();
    }// 7. 主线程释放许可证(许可证计数从 0 变为 1 ),子线程此时会被唤醒,竞争获取许可证
    System.out.println("主线程释放permit");  
    semaphore.release();  
    
    • 初始化Semaphore 许可证数量 1,公平策略;
    • 主线程抢许可证semaphore.acquire() 执行后,许可证计数 0,主线程持有许可证;
    • 子线程启动:执行 semaphore.acquire() 时,因许可证 0,子线程进入阻塞,打印 子线程等待获取permit
    • 主线程休眠:5 秒内,子线程一直阻塞;
    • 主线程释放许可证semaphore.release() 执行,许可证计数 1;因为是公平策略,阻塞的子线程被唤醒,竞争拿到许可证,执行后续逻辑,打印 子线程获取到permit
    • 子线程释放许可证:子线程执行 semaphore.release(),许可证计数 0(子线程获取时减 1,释放时加 1,整体回到初始逻辑)。

2.2.3 tryAcquire方法

  • tryAcquireSemaphore 用于尝试获取许可证的方法,特点是:

    • 非阻塞优先:如果拿不到许可证,不会一直阻塞,而是直接返回 false(表示没拿到);

    • 灵活控制:支持获取 1 个许可证、获取指定数量许可证、带超时等待等场景,比 acquire 更灵活;

  • tryAcquire方法有三种重载形式

    • boolean tryAcquire():尝试获取 1 个许可证。如果当前有可用许可证,直接获取(许可证计数 -1),返回 true;否则,直接返回 false(不阻塞);

    • boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException:尝试获取 1 个许可证,但增加超时等待机制。如果一开始没许可证,线程会阻塞最多 timeout 时间:

      • 期间有许可证释放,线程拿到许可证,返回 true
      • 超时后还没许可证,返回 false
      • 等待中被中断,抛出 InterruptedException
    • boolean tryAcquire(int permits):尝试获取 指定数量(permits)的许可证。如果 Semaphore 剩余许可证 ≥ permits,直接获取(计数 -permits ),返回 true;否则,返回 false(不阻塞);

      • 注意:要确保最终释放对应数量的许可证,否则会导致其他线程无法获取足够许可证;
    • boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException:结合指定数量和超时等待,尝试获取 permits 个许可证,最多等 timeout 时间,逻辑和上面类似;

  • 例:

    // 1. 创建 Semaphore:1 个许可证,公平策略
    final Semaphore semaphore = new Semaphore(1, true);  // 2. 启动线程 t,尝试获取许可证
    new Thread(() -> {  // 2.1 尝试获取 1 个许可证:返回 true/falseboolean gotPermit = semaphore.tryAcquire(); // 2.2 如果拿到许可证if (gotPermit) { try {System.out.println(Thread.currentThread() + " 拿到许可证");TimeUnit.SECONDS.sleep(5); // 模拟占用 5 秒} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release(); // 释放许可证(必须!)}}
    }).start();// 3. 主线程休眠 1 秒:确保线程 t 启动并拿到许可证
    TimeUnit.SECONDS.sleep(1); // 4. 主线程尝试“带超时的获取”:最多等 3 秒
    if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) { System.out.println("主线程拿到许可证");
    } else {System.out.println("主线程 3 秒内没拿到许可证,失败");
    }
    
    1. 线程 t 启动tryAcquire() 拿到许可证(gotPermit = true),打印日志,休眠 5 秒;
    2. 主线程休眠 1 秒:等线程 t 启动并占用许可证;
    3. 主线程尝试获取:调用 tryAcquire(3, ...),但线程 t 会占用许可证 5 秒,所以主线程等 3 秒后超时,进入 else,打印 主线程 3 秒内没拿到许可证,失败
  • 例:

    // 1. 创建 Semaphore:5 个许可证,公平策略
    final Semaphore semaphore = new Semaphore(5, true);  // 2. 尝试获取 5 个许可证:成功(因为初始有 5 个)
    assert semaphore.tryAcquire(5) : "获取 5 个许可证失败"; // 3. 此时许可证已耗尽(5 - 5 = 0 ),尝试获取 1 个许可证:失败
    assert !semaphore.tryAcquire() : "获取 1 个许可证失败"; 
    
    • tryAcquire(5) 一次性拿 5 个许可证,成功后 Semaphore 剩余 0 个;

    • 后续 tryAcquire()(默认拿 1 个)返回 false,因为没许可证了。

2.2.4 正确使用release

  • Semaphorerelease 是用来 归还许可证 的,让其他线程有机会获取。它有两种形式:

    • release():归还 1 个许可证,内部计数器 +1;

    • release(int permits):归还 permits 个许可证,内部计数器 +permits

    • 关键点:许可证数量有限,必须谁拿的谁还,否则会导致计数器混乱,破坏 Semaphore 控制并发的逻辑;

  • 错误用法示例(用 finally 无脑释放)

    // 1. 创建 1 个许可证的 Semaphore(公平策略)
    final Semaphore semaphore = new Semaphore(1, true);  // 2. 线程 t1:拿到许可证后,霸占 1 小时(休眠)
    Thread t1 = new Thread(() -> {  try {semaphore.acquire(); // 拿许可证System.out.println("t1 拿到许可证");TimeUnit.HOURS.sleep(1); // 霸占 1 小时} catch (InterruptedException e) {System.out.println("t1 被中断");} finally {semaphore.release(); // 不管怎样,finally 里释放}
    });
    t1.start(); 
    TimeUnit.SECONDS.sleep(1); // 等 t1 启动// 3. 线程 t2:尝试拿许可证,但若被中断,会在 finally 里错误释放
    Thread t2 = new Thread(() -> {  try {semaphore.acquire(); // 尝试拿许可证(但 t1 没释放,所以会阻塞)System.out.println("t2 拿到许可证");} catch (InterruptedException e) {System.out.println("t2 被中断");} finally {semaphore.release(); // 问题:t2 没拿到许可证,却释放了!}
    });
    t2.start(); 
    TimeUnit.SECONDS.sleep(2); // 4. 主线程逻辑:中断 t2,然后自己拿许可证
    t2.interrupt(); // 中断 t2(此时 t2 还在阻塞等许可证)
    semaphore.acquire(); // 主线程尝试拿许可证
    System.out.println("主线程拿到许可证");  
    
    • t2 根本没拿到许可证(因为 t1 霸占着),但由于 release 写在 finally 里,t2 被中断时,会错误地归还 1 个许可证(相当于无中生有多了 1 个许可证);

    • 原本 Semaphore 只有 1 个许可证,被 t1 占用后,计数器是 0。但 t2 错误释放后,计数器变成 1,导致主线程能直接拿到许可证(而预期中 t1 要 1 小时后才释放,主线程不该拿到);

  • 修改后的 t2 逻辑:

    Thread t2 = new Thread(() -> {  boolean acquired = false; // 标记是否成功拿到许可证try {semaphore.acquire(); // 尝试拿许可证acquired = true; // 拿到了,标记为 trueSystem.out.println("t2 拿到许可证");} catch (InterruptedException e) {System.out.println("t2 被中断");} finally {// 只有成功拿到许可证(acquired=true),才释放if (acquired) { semaphore.release(); }}
    });
    
    • acquired 标记是否真的拿到许可证,只有拿到许可证的线程,才在 finally 里释放,避免没拿到却释放的问题;
  • Semaphore 的设计里,不强制检查释放许可证的线程是否真的拿过,而是靠开发者自己保证。官方文档说明:“没有要求释放许可证的线程必须是通过 acquire 拿到许可证的,正确用法由开发者通过编程规范保证。”

2.3 使用

2.3.1 Semaphore实现商品服务接口限流

  • Semaphore可以用于实现限流功能,即限制某个操作或资源在一定时间内的访问次数;

  • 代码:限制同一时间,最多有 N 个线程能访问接口(比如下面代码中的 N=2),超过的请求要么排队,要么直接拒绝,保证服务稳定;

    @Slf4j
    public class SemaphoreDemo {/*** 同一时刻最多只允许有两个并发* 即许可证数量=2 → 同一时间最多允许 2 个线程访问*/private static Semaphore semaphore = new Semaphore(2);// 创建线程池,最多 10 个线程(模拟大量请求)private static Executor executor = Executors.newFixedThreadPool(10);public static void main(String[] args) {// 循环 10 次,模拟 10 个请求for(int i = 0; i < 10; i++){ // 提交任务到线程池,执行 getProductInfo() 或 getProductInfo2()executor.execute(() -> getProductInfo());}}// 阻塞式限流public static String getProductInfo() {// 1. 尝试获取许可证:拿不到就阻塞,直到有许可证try {semaphore.acquire();log.info("请求服务"); // 拿到许可证,执行逻辑Thread.sleep(2000);  // 模拟接口执行耗时(2 秒)} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 2. 释放许可证:不管是否异常,必须释放,让其他线程能用semaphore.release();}return "返回商品详情信息";}// 非阻塞式限流public static String getProductInfo2() {// 1. 尝试获取许可证:拿不到直接返回 false,不阻塞if(!semaphore.tryAcquire()){log.error("请求被流控了"); // 没拿到许可证,直接拒绝return "请求被流控了";}try {log.info("请求服务"); // 拿到许可证,执行逻辑Thread.sleep(2000); // 模拟接口执行耗时(2 秒)} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 2. 释放许可证:必须释放semaphore.release();}return "返回商品详情信息";}
    }  
    
  • 假设运行 getProductInfo()

    • 前 2 个线程能拿到许可证,执行 log.info("请求服务"),然后 sleep 2 秒。

    • 第 3~10 个线程调用 acquire() 时,因为许可证被占满,会阻塞等待。

    • 2 秒后,前 2 个线程 release() 归还许可证,阻塞的线程开始竞争,每次放 2 个执行,直到所有请求处理完。

  • 如果运行 getProductInfo2():前 2 个线程拿到许可证,执行逻辑;第 3 个线程 tryAcquire() 返回 false,直接走限流逻辑(log.error)。

2.3.2 Semaphore限制同时在线的用户数量

  • 模拟一个登录系统,最多限制给定数量的人员同时在线,如果所能申请的许可证不足,那么将告诉用户无法登录,请稍后重试;

  • 主类SemaphoreDemo7(模拟多用户登录):

    public class SemaphoreDemo7 {public static void main(String[] args) {// 最多允许 10 个用户同时在线final int MAX_PERMIT_LOGIN_ACCOUNT = 10; LoginService loginService = new LoginService(MAX_PERMIT_LOGIN_ACCOUNT);// 启动 20 个线程(模拟 20 个用户登录)IntStream.range(0, 20).forEach(i -> new Thread(() -> {// 执行登录boolean login = loginService.login(); if (!login) {// 登录失败(超过并发数)System.out.println(Thread.currentThread() + " 因超过最大在线数被拒绝");return;}try {simulateWork(); // 模拟登录后的业务操作} finally {loginService.logout(); // 退出时释放许可证}}, "User-" + i).start());}// 模拟登录后的业务操作(随机休眠,模拟用户在线时长)private static void simulateWork() {try {TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) { /* 忽略中断 */ }}
    }
    
    • 创建 LoginService,传入最大在线数 10

    • 启动 20 个线程(用户),调用 loginService.login() 尝试登录;

    • 登录成功 → 执行 simulateWork()(模拟用户在线操作,随机休眠 )→ 退出时 logout() 释放许可证;

    • 登录失败 → 直接提示并返回;

  • LoginService 类(控制登录逻辑)

    private static class LoginService {private final Semaphore semaphore; public LoginService(int maxPermitLoginAccount) {// 创建 Semaphore:许可证数量=maxPermitLoginAccount,公平策略(true)this.semaphore = new Semaphore(maxPermitLoginAccount, true); }public boolean login() {// 尝试获取许可证:非阻塞,拿不到直接返回 falseboolean success = semaphore.tryAcquire(); if (success) {System.out.println(Thread.currentThread() + " 登录成功");}return success;}public void logout() {// 释放许可证:登录成功的用户退出时,归还许可证semaphore.release(); System.out.println(Thread.currentThread() + " 退出成功");}
    }
    
    • Semaphore 许可证数量 = 最大在线用户数(10),保证同一时间最多 10 个线程(用户)能拿到许可证;

      • login()tryAcquire() 尝试拿许可证:
        • 拿到 → 返回 true(登录成功);
        • 拿不到 → 返回 false(登录失败,超过并发);
    • logout()release() 释放许可证,让其他用户可登录;

  • 运行效果:

    • 登录成功:前 10 个线程(用户)能拿到许可证,打印 登录成功,执行 simulateWork() 随机休眠;

    • 登录失败:后 10 个线程调用 tryAcquire() 时,许可证已耗尽,返回 false,打印 因超过最大在线数被拒绝

    • 用户退出:休眠结束后,线程执行 logout() 释放许可证,Semaphore 计数器 +1,后续新线程(或之前阻塞的线程)有机会拿到许可证登录;

  • 如果把 login 里的 tryAcquire() 换成 acquire()(阻塞式获取 ):

    public boolean login() {try {// 阻塞式获取:拿不到就一直等,直到有许可证semaphore.acquire(); System.out.println(Thread.currentThread() + " 登录成功");return true;} catch (InterruptedException e) {// 被中断时返回登录失败return false; }
    }
    
    • 效果:超过并发数的用户不会直接失败,而是阻塞等待,直到有用户退出释放许可证,再继续登录。

2.4 应用场景总结

  • Semaphore(信号量)是高并发工具,核心能力是控制同时访问共享资源的线程数量,让有限的资源(比如连接池、文件句柄)在高并发下被合理使用,避免系统被压垮;

  • 应用场景总结

    • 限流(流量控制):系统的某个资源(比如接口、数据库连接)能承受的并发量有限,需要限制同时访问的线程数,防止资源被打满导致系统崩溃;

      • 接口限流:比如商品详情接口,最多允许 100 个线程同时访问,用 Semaphore(100) 控制,超过的请求排队或拒绝;
      • 数据库连接限流:数据库连接池有 20 个连接,用 Semaphore(20) 控制,避免几千个线程同时抢连接,把数据库压垮;
      • 侧重于控制并发访问量,保护资源不被压垮,比如接口、网关层的流量控制;
    • 资源池(维护有限资源):系统有一组有限资源(比如数据库连接、文件句柄、网络端口),需要让线程按需借用、用完归还,保证资源被合理复用;

      • 数据库连接池:初始化 Semaphore(连接数),线程需要连接时 acquire() 拿许可证(同时从池里取连接),用完后 release() 释放许可证(同时把连接还回池);
      • 文件访问池:如果有 5 个文件句柄,用 Semaphore(5) 控制,线程访问文件时拿许可证,访问完归还,保证同一时间最多 5 个线程操作文件;
      • 侧重于管理有限资源的借用/归还,保证资源复用,比如连接池、句柄池的资源调度;
  • 但本质都是Semaphore 的许可证数量,限制同时使用资源的线程数

3 CountDownLatch(闭锁)

3.1 简介

  • CountDownLatch多线程同步工具,解决的问题是:让一个或多个线程等待其他多个任务全部完成后,再继续执行。比如:

    • 主线程要等 10 个子线程都跑完初始化任务,才开始处理业务;
    • 或者多个线程要等某个“总开关”任务完成,再一起执行;
  • 工作流程:

    • 初始化计数器CountDownLatch latch = new CountDownLatch(N);,这里的 N 是需要等待的任务数量(比如有 3 个子线程要执行,N=3);

    • 等待线程latch.await();,调用 await() 的线程(比如主线程)会阻塞等待,直到 N 减到 0;

    • 任务线程计数减 1:每个子任务线程执行完自己的逻辑后,调用 latch.countDown(); ,让计数器 N-1

    • 计数器归 0,等待线程唤醒:当所有子任务线程都调用 countDown()N 变成 0 ,之前阻塞的线程(await() 的线程)会被唤醒,继续执行;

    在这里插入图片描述

    TA:等待线程、T1/T2/T3:任务线程

    cnt = 3:对应 CountDownLatch latch = new CountDownLatch(3);,表示需要等待 3 个任务完成

    过程:

    • 线程 TA 调用 await()TA执行到latch.await() 时,会检查计数器cnt:此时cnt=3≠0,所以TA进入阻塞状态(awaiting...,暂停执行;

    • 任务线程 T1 完成T1 执行 latch.countDown() → 计数器 cnt3→2。此时 cnt≠0TA 仍阻塞;

    • 任务线程 T2 完成T2 执行 latch.countDown() → 计数器 cnt2→1。此时 cnt≠0TA 仍阻塞;

    • 任务线程 T3 完成T3 执行 latch.countDown() → 计数器 cnt1→0

    • cnt=0 时,CountDownLatch唤醒所有等待的线程(这里是 TA):TA从阻塞状态恢复(resumed),继续执行后续逻辑;

  • 关键特性:

    • 一次性:计数器 N 减到 0 后,就不能再重置或复用,只能用一次;

    • 多线程等待:可以有多个线程调用 await() ,一起等待 N 归 0 后被唤醒;

    • 任务结束的宽泛性:子任务结束包括正常跑完或者抛异常终止,只要调用 countDown() ,就会让计数器减 1;

  • 典型场景:

    • 并行任务汇总:比如计算一个大数组的和,拆成 10 个子数组并行计算,主线程等 10 个子线程都算完,再汇总结果;
    • 系统启动初始化:系统启动时,需要初始化 5 个服务(比如缓存、数据库连接、配置加载),主线程等 5 个服务都初始化完,再对外提供服务;
    • 测试多线程并发:测试时,让 100 个线程等信号,信号发出(countDown())后一起执行,模拟高并发场景。

3.2 常用API

3.2.1 构造器

public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}
  • CountDownLatch 构造时,count 必须 ≥0,否则抛 IllegalArgumentException
  • count 减到 0 后,无法重置,CountDownLatch 只能用一次。

3.2.2 常用方法

  • 总览:

    // 1. await():调用的线程会阻塞,直到 count 减到 0
    public void await() throws InterruptedException {};  // 2. await(long timeout, TimeUnit unit):阻塞等待,但最多等 timeout 时间;
    //    若超时后 count 仍≠0,不再等待,返回 false
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {};  // 3. countDown():让 count 减 1,直到 count=0 时唤醒所有等待的线程
    public void countDown() {};  
    
  • await():调用线程进入阻塞状态,直到 countDownLatchcount 减到 0;

    • 如果等待中被其他线程中断,会抛出 InterruptedException
    • count=0 时,调用 await()立即返回,不阻塞;
  • await(long timeout, TimeUnit unit):阻塞等待,但增加了超时退出机制;

    • 返回值为true → 等待中 count 减到 0(正常唤醒);
    • 返回值为false → 超时后 count 仍≠0(放弃等待);
  • countDown():让 count 减 1。当 count1→0 时,会唤醒所有等待的线程await() 的线程);

    • count 已经是 0 时,调用 countDown() 会被忽略(count 最小为 0);
    • 只有 count1→0 时,才会触发唤醒;count3→2 这类变化,不会唤醒线程;
  • 例:

    // 1. 初始化:count=2 → 需要 2 次 countDown() 才会唤醒 await() 的线程
    CountDownLatch latch = new CountDownLatch(2);  // 2. 第一次 countDown() → count=2→1
    latch.countDown();  // 3. 第二次 countDown() → count=1→0 → 唤醒所有 await() 的线程
    latch.countDown();  // 4. 第三次 countDown() → count 已经是 0,调用被忽略
    latch.countDown();  // 5. count=0,调用 await() 直接返回,不阻塞
    latch.await();  
    

3.3 使用

3.3.1 多任务完成后合并汇总

  • 开发中常见多个任务并行执行,必须等所有任务完成后,再统一处理结果 的需求:

    • 比如“数据详情页”需要同时调用 5 个接口(并行),等所有接口返回数据后,再合并结果展示;

    • 或者“多个数据操作完成后,统一做校验(check)”;

  • 代码:

    public class CountDownLatchDemo2 {public static void main(String[] args) throws Exception {// 1. 初始化 CountDownLatch:需要等待 5 个任务完成CountDownLatch countDownLatch = new CountDownLatch(5); // 2. 启动 5 个线程(模拟 5 个并行任务)for (int i = 0; i < 5; i++) { final int index = i;new Thread(() -> {try {// 模拟任务执行耗时(1~3 秒随机)Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000)); System.out.println("任务 " + index + " 执行完成");// 3. 任务完成,计数器减 1(countDownLatch.countDown())countDownLatch.countDown(); } catch (InterruptedException e) {e.printStackTrace();}}).start();}// 4. 主线程阻塞等待:直到 5 个任务都完成(count=0)countDownLatch.await(); // 5. 所有任务完成后,主线程执行汇总逻辑System.out.println("主线程:在所有任务运行完成后,进行结果汇总"); }
    }
    
    • 初始化 CountDownLatchnew CountDownLatch(5) → 表示需要等待 5 个任务完成(计数器初始值 5);

    • 启动并行任务:循环创建 5 个线程,模拟 5 个并行任务。每个线程:

      • Thread.sleep(...):模拟任务执行耗时(随机 1~3 秒);

      • countDownLatch.countDown():任务完成后,计数器减 1(5→4→3→2→1→0);

    • 主线程等待countDownLatch.await() → 主线程阻塞,直到计数器减到 0(所有任务完成);

    • 汇总结果:计数器归 0 后,主线程被唤醒,执行 System.out.println(...) 做结果汇总;

  • 运行效果

    • 5 个任务线程会随机顺序完成(因为 sleep 时间随机),比如:

      任务 2 执行完成  
      任务 0 执行完成  
      任务 4 执行完成  
      任务 1 执行完成  
      任务 3 执行完成  
      
    • 主线程必须等所有任务打印完,才会输出:

      主线程:在所有任务运行完成后,进行结果汇总  
      
  • 核心价值

    • 并行效率:5 个任务并行执行,不用等待前一个任务完成再执行下一个,节省时间;

    • 同步控制:主线程通过 CountDownLatch 精准等待所有任务完成,保证汇总逻辑在所有数据就绪后执行。

3.3.2 电商场景中的应用——等待所有子任务结束

  • 需求:根据商品品类 ID,获取 10 个商品,并行计算每个商品的最终价格(需调用 ERP、CRM 等系统,计算复杂、耗时),最后汇总所有价格返回;

    ERP系统:ERP是企业资源计划系统,它整合企业内部各部门的核心业务流程,如财务、采购、生产、销售和人力资源等,以实现数据共享和资源优化;

    CRM系统:CRM是客户关系管理系统,它专注于管理公司与当前及潜在客户的交互和业务往来,旨在改善客户服务、提升销售效率并维护客户关系;

    • 串行问题:如果一个一个计算(串行),总耗时 = 获取商品时间 + 10×单个商品计算时间,商品越多越慢;

      在这里插入图片描述

    • 并行优化:用多线程并行计算商品价格,总耗时 = 获取商品时间 + 最长单个商品计算时间,效率更高;

      在这里插入图片描述

  • 代码:工具方法 & 数据类

    // 根据品类 ID 获取商品 ID 列表(模拟返回 1~10 号商品)
    private static int[] getProductsByCategoryId() {return IntStream.rangeClosed(1, 10).toArray();
    }// 商品价格数据类:存储商品 ID 和计算后的价格
    private static class ProductPrice {private final int prodID; // 商品 IDprivate double price;    // 计算后的价格// 构造方法、get/set、toString 略...
    }
    
  • 主逻辑:并行计算商品价格

    public static void main(String[] args) throws InterruptedException {// 1. 获取商品 ID 列表(1~10)final int[] products = getProductsByCategoryId(); // 2. 转换为 ProductPrice 列表(初始价格未计算)List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new) // 每个商品 ID 对应一个 ProductPrice.collect(Collectors.toList()); // 3. 初始化 CountDownLatch:需要等待 10 个商品计算完成(products.length=10)final CountDownLatch latch = new CountDownLatch(products.length); // 4. 为每个商品启动线程,并行计算价格list.forEach(pp -> {new Thread(() -> {try {System.out.println(pp.getProdID() + " -> 开始计算商品价格.");// 模拟耗时操作(调用外部系统):随机休眠 0~9 秒TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); // 5. 计算商品价格(模拟业务逻辑:奇偶商品不同折扣)if (pp.getProdID() % 2 == 0) {pp.setPrice(pp.getProdID() * 0.90); // 偶数商品 9 折} else {pp.setPrice(pp.getProdID() * 0.71); // 奇数商品 7.1 折}System.out.println(pp.getProdID() + " -> 价格计算完成.");} catch (InterruptedException e) {e.printStackTrace();} finally {// 6. 任务完成,计数器减 1latch.countDown(); }}).start(); // 启动线程});// 7. 主线程阻塞等待:直到 10 个商品都计算完成(latch.await())latch.await(); // 8. 所有商品计算完成,汇总结果System.out.println("所有价格计算完成.");list.forEach(System.out::println); 
    }
    
    • 准备商品数据:调用 getProductsByCategoryId() 获取 1~10 号商品 ID,转成 ProductPrice 列表(初始价格未计算);

    • 初始化 CountDownLatchnew CountDownLatch(10) → 表示需要等待10 个商品的计算任务完成;

    • 并行计算价格:为每个商品启动线程:

      • 模拟耗时操作(TimeUnit.SECONDS.sleep(...));
      • 根据商品 ID 奇偶,设置不同折扣价格(模拟业务逻辑);
      • 任务完成后,latch.countDown() 让计数器减 1;
    • 主线程等待 & 汇总结果latch.await() 阻塞主线程,直到 10 个任务都完成(计数器归 0),最后打印所有商品的计算结果。

3.4 应用场景总结

  • 并行任务同步:多个任务并行执行(比如 5 个线程同时下载文件),必须等所有任务都完成后,再执行下一步(比如合并文件);

    • CountDownLatch 让主线程等待所有并行任务完成,保证后续操作在所有任务就绪后执行;
  • 多任务汇总:需要统计多个线程的执行结果(比如 10 个线程分别计算一部分数据,最后汇总总和);

    • 主线程等所有线程计算完,再统一汇总结果,避免部分数据未计算就开始汇总的问题;
  • 资源初始化:系统启动时,需要初始化多个资源(比如缓存、数据库连接、配置加载),必须等所有资源初始化完成,再对外提供服务;

    • 主线程等待所有资源初始化任务完成,保证系统启动后资源可用。

3.5 不足

  • CountDownLatch一次性工具

    • 构造时设置的计数器(比如 new CountDownLatch(5)),一旦减到 0,就无法重置或复用

    • 如果业务需要“重复等待多个任务完成”,CountDownLatch 无法满足,必须重新创建新的实例。

4 CyclicBarrier(回环栅栏/循环屏障)

4.1 简介

  • CyclicBarrier多线程同步工具,解决的问题是:让一组线程互相等待,直到所有线程都到达同一个“屏障点”,然后再一起继续执行

  • 关键特点:可循环使用(屏障可以重置,重复让线程等待、一起执行);

  • 工作流程:

    1. 初始化屏障CyclicBarrier barrier = new CyclicBarrier(N);N 是“需要等待的线程数量”(比如 5 个线程要一起执行后续逻辑);

    2. 线程到达屏障点:每个线程执行到 barrier.await(); 时,会阻塞等待,直到有 N 个线程都调用了 await()

    3. 所有线程到达,一起执行:当第 N 个线程调用 await() 后,所有阻塞的线程会被同时唤醒,继续执行后续逻辑;

    4. 循环使用:唤醒后,屏障可以重置(通过 reset() 方法 ),再次让新的一组线程等待、一起执行;

  • 适合把一个大任务拆成多个子任务并行执行,等所有子任务完成后,再统一做下一步的场景,且需要重复执行该流程。典型场景有:

    • 并行计算 + 合并结果:比如计算一个大数组的和,拆成 10 个子数组并行计算,等所有子数组算完,再合并总和。计算完一次后,还能再拆新的数组,重复使用屏障。

    • 多阶段任务:系统升级时,先让 5 个节点并行执行数据迁移,全部完成后,再一起执行验证数据,验证完还能继续下一阶段(比如启动服务),屏障可循环用;

  • CountDownLatch的核心区别

    特性CyclicBarrierCountDownLatch
    是否可循环可循环(屏障可重置,重复用)一次性(计数器到 0 后无法重置)
    等待的目标等待“一组线程互相到达屏障点”等待“其他线程完成任务(计数减到 0)”
    典型场景多阶段并行任务(可重复)单次多任务同步(不可重复)

4.2 常用API

4.2.1 构造器

  • 有两个构造器:

    public CyclicBarrier(int parties)
    
    • parties:需要等待的线程总数。比如传 4,表示必须有 4 个线程都调用 await(),屏障才会放行;

    • 作用:初始化一个基础的循环屏障,所有线程到达后一起执行后续逻辑;

    public CyclicBarrier(int parties, Runnable barrierAction)
    
    • parties:同上,需要等待的线程总数;

    • barrierAction:一个 Runnable 任务。当所有线程到达屏障后,会优先执行这个任务,再让所有线程继续执行;

    • 作用:适合线程到达屏障后,需要先统一处理一些逻辑(比如汇总数据、初始化资源)的场景;

  • 工作原理:以CyclicBarrier barrier = new CyclicBarrier(4, new Runnable(){...});为例

    在这里插入图片描述

    1. 初始化

      • parties=4 → 需要 4 个线程到达屏障;
      • barrierAction → 所有线程到达后执行的任务;
    2. 线程到达屏障:每个线程执行 barrier.await(); 时:

      • 计数器(count)减 1(初始4 → 线程 1 调用后变成3 → 线程 2 调用后变成2 → 线程 3 调用后变成1 → 线程 4 调用后变成0);
      • 前 3 个线程调用 await() 后,会阻塞等待
    3. 屏障放行(所有线程到达):第 4 个线程调用 await() 后,count=0,执行 barrierAction,然后唤醒所有阻塞的线程,一起继续执行后续逻辑;

    4. 循环复用:屏障放行后,count重置为初始值(4),可以再次让新的一组线程(4 个)等待、触发屏障。

4.2.2 常用方法

  • 总览:

    // 1. await():线程调用后阻塞,直到所有线程都调用 await(),屏障放行
    public int await() throws InterruptedException, BrokenBarrierException {}// 2. await(long timeout, TimeUnit unit):带超时的 await(),超时后屏障视为“被破坏”
    public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {}// 3. reset():重置屏障,让计数器回到初始值,可重复使用
    public void reset() {}
    
  • await():线程调用await() 后,会阻塞等待,直到有 parties 个线程都调用 await()parties 是构造器传入的线程数);

    • InterruptedException:等待中的线程被中断;

    • BrokenBarrierException:屏障被破坏(比如其他线程 await() 时被中断、超时 ),导致当前线程无法继续等待;

    • 返回值:返回当前线程在到达屏障的线程组中的索引(比如 4 个线程到达,第一个调用 await() 的线程返回 3,最后一个返回 0 ,索引从 0 开始逆序);

  • await(long timeout, TimeUnit unit):和 await() 类似,但增加超时机制。如果在 timeout 时间内,凑不齐 parties 个线程调用 await(),则触发超时,屏障被标记为破坏。防止线程因其他线程异常,无限期阻塞;

    • 除了 InterruptedExceptionBrokenBarrierException,还可能抛出 TimeoutException(超时);
  • reset():重置CyclicBarrier,让计数器回到初始值(parties),屏障状态恢复到未使用;

    • 注意:重置时,若有线程正在 await(),会触发 BrokenBarrierException(因为屏障被强制重置,这些线程的等待被打断)。

4.3 使用

4.3.1 等待所有子任务结束

  • 需求:根据品类 ID 获取 10 个商品,并行计算每个商品的最终价格(模拟调用外部系统,耗时随机),等所有商品价格计算完成后,汇总结果返回;

  • 工具方法 & 数据类

    // 根据品类 ID 获取商品 ID 列表(1~10 号商品)
    private static int[] getProductsByCategoryId() {return IntStream.rangeClosed(1, 10).toArray();
    }// 商品价格数据类:存储商品 ID 和计算后的价格
    private static class ProductPrice {private final int prodID; // 商品 IDprivate double price;     // 计算后的价格// 构造方法、get/set、toString 略...
    }
    
  • 主逻辑:用 CyclicBarrier 同步多线程

    public static void main(String[] args) throws InterruptedException {// 1. 获取商品 ID 列表,转换为 ProductPrice 列表final int[] products = getProductsByCategoryId();List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList()); // 2. 初始化 CyclicBarrier:需要等待 list.size()(10)个线程到达屏障final CyclicBarrier barrier = new CyclicBarrier(list.size()); // 3. 存储线程的列表(用于后续 join 等待)final List<Thread> threadList = new ArrayList<>(); // 4. 为每个商品启动线程,并行计算价格list.forEach(pp -> {Thread thread = new Thread(() -> {try {System.out.println(pp.getProdID() + " 开始计算商品价格.");// 模拟耗时操作(调用外部系统):随机休眠 0~9 秒TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); // 5. 计算商品价格(奇偶商品不同折扣)if (pp.getProdID() % 2 == 0) {pp.setPrice(pp.getProdID() * 0.90); // 偶数商品 9 折} else {pp.setPrice(pp.getProdID() * 0.71); // 奇数商品 7.1 折}System.out.println(pp.getProdID() + " -> 价格计算完成.");// 6. 等待其他线程:调用 await(),直到所有线程都到达屏障barrier.await(); } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});threadList.add(thread); // 记录线程thread.start();         // 启动线程});// 7. 等待所有线程执行完成(通过 join 保证主线程等所有子线程跑完)threadList.forEach(t -> {try {t.join(); } catch (InterruptedException e) {e.printStackTrace();}});// 8. 所有商品价格计算完成,汇总结果System.out.println("所有价格计算完成.");list.forEach(System.out::println); 
    }
    
    • 准备商品数据:调用 getProductsByCategoryId() 获取 1~10 号商品 ID,转成 ProductPrice 列表(初始价格未计算);

    • 初始化 CyclicBarriernew CyclicBarrier(list.size()) → 表示需要等待 10 个线程 到达屏障(每个商品对应一个线程);

    • 并行计算价格:为每个商品启动线程:

      • 模拟耗时操作(TimeUnit.SECONDS.sleep(...));
      • 根据商品 ID 奇偶,设置不同折扣价格;
      • 调用 barrier.await():线程到达屏障,阻塞等待其他线程;
    • 屏障放行:当第 10 个线程调用 await() 后,所有阻塞的线程会被同时唤醒,继续执行后续逻辑;

    • 主线程等待 & 汇总结果:通过 threadList.forEach(t -> t.join()) 让主线程等待所有子线程执行完成,最后打印所有商品的计算结果。

4.3.2 CyclicBarrier的循环特性——模拟跟团旅游

  • 需求:跟团旅游

    • 第一阶段(上车屏障)

      • 导游要求:所有游客上车后,大巴才出发(对应 CyclicBarrier 的第一次 await());
      • 类比:10 个游客 + 1 个导游(主线程)= 11 个线程,凑齐后屏障放行;
    • 第二阶段(下车屏障)

      • 导游要求:所有游客下车后,大巴才去下一个景点(对应 CyclicBarrier 的第二次 await());
      • 类比:同一组线程(游客 + 导游)再次凑齐,屏障放行,实现“循环复用”;

    在这里插入图片描述

  • 游客线程逻辑(Tourist 类)

    private static class Tourist implements Runnable {private final int touristID; // 游客编号private final CyclicBarrier barrier; // 循环屏障public Tourist(int touristID, CyclicBarrier barrier) {this.touristID = touristID;this.barrier = barrier;}@Overridepublic void run() {// 1. 模拟上车(第一阶段:上车同步)System.out.printf("游客:%d 乘坐旅游大巴\n", touristID);spendSeveralSeconds(); // 模拟上车耗时waitAndPrint("游客:%d 上车,等别人上车.\n"); // 调用 await(),等待凑齐 11 个线程// 2. 模拟下车(第二阶段:下车同步)System.out.printf("游客:%d 到达目的地\n", touristID);spendSeveralSeconds(); // 模拟下车耗时waitAndPrint("游客:%d 下车,等别人下车.\n"); // 再次调用 await(),等待凑齐 11 个线程}// 调用 barrier.await(),并打印日志private void waitAndPrint(String message) {System.out.printf(message, touristID);try {barrier.await(); // 线程到达屏障,阻塞等待} catch (InterruptedException | BrokenBarrierException e) {// 忽略异常}}// 模拟随机耗时(上车/下车的时间)private void spendSeveralSeconds() {try {TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) {// 忽略异常}}
    }
    
  • 主线程逻辑(导游视角)

    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {// parties=11 → 需要 11 个线程到达屏障(10 个游客线程 + 1 个主线程)final CyclicBarrier barrier = new CyclicBarrier(11);// 启动 10 个游客线程for (int i = 0; i < 10; i++) {new Thread(new Tourist(i, barrier)).start();}// 4. 主线程(导游)参与第一阶段屏障:等待所有游客上车barrier.await(); System.out.println("导游:所有的游客都上了车.");// 5. 主线程(导游)参与第二阶段屏障:等待所有游客下车barrier.await(); System.out.println("导游:所有的游客都下车了.");
    }
    
  • 上车同步

    • 10 个游客线程 + 1 个主线程(导游),共 11 个线程;

    • 每个游客线程执行到 waitAndPrint("游客:%d 上车,等别人上车.\n") → 调用 barrier.await(),阻塞等待;

    • 当 11 个线程都调用 await() 后,屏障放行:

      • 打印所有“游客上车等待”的日志;

      • 主线程继续执行,打印 导游:所有的游客都上了车.

  • 下车同步

    • 同一组 11 个线程(10 个游客 + 1 个主线程 ),再次执行到 waitAndPrint("游客:%d 下车,等别人下车.\n") → 调用 barrier.await(),阻塞等待;

    • 当 11 个线程都调用 await() 后,屏障放行:

      • 打印所有“游客下车等待”的日志;

      • 主线程继续执行,打印 导游:所有的游客都下车了.

  • CyclicBarrier 的循环特性

    • 可重复触发:同一 CyclicBarrier 实例,可通过多次 await() 实现“多阶段同步”(上车→下车);

    • 线程组复用:同一组线程(游客 + 导游 )参与多个阶段的屏障,无需重新创建实例。

4.4 应用场景总结

  • CyclicBarrier多线程同步工具,核心解决让一组线程互相等待,全部到达同一屏障点后,再一起继续执行的问题,且可循环使用(屏障可重置,重复同步多阶段任务);

  • 应用场景:

    • 多线程任务拆分与合并:一个复杂任务(比如计算大数据集的总和)拆成多个子任务(比如 10 个线程各算一部分),必须等所有子任务完成后,再合并结果;

    • 多线程数据处理同步:多个线程并行处理不同的数据分片(比如处理 5 个文件),必须等所有线程处理完自己的数据,再统一汇总、校验或持久化。

4.5 CyclicBarrier VS CountDownLatch

  • 可复用性

    • CountDownLatch一次性工具。构造时设置的计数器(比如 new CountDownLatch(5)),一旦减到 0,无法重置或复用;

    • CyclicBarrier可循环复用。计数器(parties)可以通过 reset() 重置,重复让新的线程组等待、触发屏障;

  • 等待目标

    • CountDownLatchawait() 的线程等待其他线程调用 countDown() 把计数器减到 0(主线程等子线程完成);

    • CyclicBarrierawait() 的线程等待其他线程也到达屏障点(调用 await()(线程组互相等待);

  • 计数器特性

    • CountDownLatch:计数器只能递减(从 N→0),且无法重置;

    • CyclicBarrier:计数器可以重置(通过 reset() 回到初始值 parties),支持多轮同步。

5 Exchange(数据交换机)

5.1 简介

  • Exchanger 专门解决两个线程需要互相交换数据的场景,让两个线程在“交换点”(调用 exchange 方法时)同步,安全交换数据;

  • 工作流程

    • 线程 1 调用 exchange(object1):线程 1 会阻塞等待,直到线程 2 也调用 exchange 方法;
    • 线程 2 调用 exchange(object2):此时两个线程都到达“交换点”,Exchanger 会将 object1 传递给线程 2,将 object2 传递给线程 1;
    • 交换后继续执行:线程 1 拿到 object2,线程 2 拿到 object1,继续执行后续逻辑;

    在这里插入图片描述

5.2 常用API

  • public V exchange(V x) throws InterruptedException

    • 功能

      • 当前线程携带数据 x,阻塞等待另一个线程到达交换点;
      • 对方线程到达后,交换数据:当前线程接收对方数据,返回给调用方;
    • 异常:等待中若线程被中断,抛出 InterruptedException

  • public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

    • 同上,但增加超时机制。如果在 timeout 时间内,对方线程未到达交换点,抛出 TimeoutException
    • 适用场景:防止线程因对方异常,无限期阻塞。

5.3 使用

5.3.1 模拟交易场景

  • 模拟买卖双方交易

    • 卖家带“商品”(goods = "电脑"),买家带“钱”(money = "$4000");

    • 双方必须都到达“交易点”(调用 exchanger.exchange(...)),才能交换数据(一手交钱,一手交货);

  • 代码:

    public class ExchangerDemo {private static Exchanger exchanger = new Exchanger(); static String goods = "电脑";static String money = "$4000";public static void main(String[] args) throws InterruptedException {System.out.println("准备交易,一手交钱一手交货...");// 卖家线程:携带 goods,等待买家new Thread(() -> {try {System.out.println("卖家到了,已经准备好货:" + goods);// 交换数据:卖家发送 goods,接收 moneyString receivedMoney = (String) exchanger.exchange(goods); System.out.println("卖家收到钱:" + receivedMoney);} catch (Exception e) { /* 忽略异常 */ }}).start();// 主线程休眠 3 秒,模拟买家延迟到达Thread.sleep(3000); // 买家线程:携带 money,等待卖家new Thread(() -> {try {System.out.println("买家到了,已经准备好钱:" + money);// 交换数据:买家发送 money,接收 goodsString receivedGoods = (String) exchanger.exchange(money); System.out.println("买家收到货:" + receivedGoods);} catch (Exception e) { /* 忽略异常 */ }}).start();}
    }
    
    • 同步交换:卖家先调用 exchange(goods) 会阻塞,直到买家调用 exchange(money),双方交换数据;

    • 数据流向:卖家发送 goods → 接收 money;买家发送 money → 接收 goods

5.3.2 模拟对账场景

  • 模拟数据对账

    • 线程 1 生成数据 A,线程 2 生成数据 B
    • 双方交换数据后,线程 2 校验 AB 是否一致;
  • 代码:

    public class ExchangerDemo2 {private static final Exchanger<String> exchanger = new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) {// 线程 1:发送数据 AthreadPool.execute(() -> {try {String A = "12379871924sfkhfksdhfks";exchanger.exchange(A); // 发送 A,等待线程 2} catch (InterruptedException e) { /* 忽略 */ }});// 线程 2:发送数据 B,接收数据 A,校验一致性threadPool.execute(() -> {try {String B = "32423423jkmjkfsbfj";String A = exchanger.exchange(B); // 发送 B,接收 ASystem.out.println("A和B数据是否一致:" + A.equals(B));System.out.println("A= " + A);System.out.println("B= " + B);} catch (InterruptedException e) { /* 忽略 */ }});threadPool.shutdown();}
    }
    
    • 数据校验:线程 2 接收线程 1 的数据 A 后,对比自己的 B,判断是否一致;

    • 线程池简化:用线程池管理两个线程,避免手动创建 Thread

5.3.3 模拟队列中交换数据

  • 模拟生产者 - 消费者模式,但通过 Exchanger 动态交换“满队列”和“空队列”:

    • 生产者往 emptyQueue 放数据,满了就和消费者交换队列(拿空队列继续生产);

    • 消费者从 fullQueue 取数据,空了就和生产者交换队列(拿满队列继续消费);

  • 代码:

    public class ExchangerDemo3 {// 满队列(消费者初始用)、空队列(生产者初始用)private static ArrayBlockingQueue<String> fullQueue = new ArrayBlockingQueue<>(5); private static ArrayBlockingQueue<String> emptyQueue = new ArrayBlockingQueue<>(5); private static Exchanger<ArrayBlockingQueue<String>> exchanger = new Exchanger<>(); public static void main(String[] args) {new Thread(new Producer()).start(); // 启动生产者new Thread(new Consumer()).start(); // 启动消费者}// 生产者:往队列放数据,满了就交换队列static class Producer implements Runnable {@Overridepublic void run() {ArrayBlockingQueue<String> current = emptyQueue; try {while (current != null) {String str = UUID.randomUUID().toString();try {current.add(str); // 往队列放数据System.out.println("producer:生产了一个序列:" + str + ">>>>>加入到交换区");Thread.sleep(2000);} catch (IllegalStateException e) {// 队列满了,交换队列(拿空队列)System.out.println("producer:队列已满,换一个空的");current = exchanger.exchange(current); }}} catch (Exception e) { /* 忽略 */ }}}// 消费者:从队列取数据,空了就交换队列static class Consumer implements Runnable {@Overridepublic void run() {ArrayBlockingQueue<String> current = fullQueue; try {while (current != null) {if (!current.isEmpty()) {String str = current.poll(); // 从队列取数据System.out.println("consumer:消耗一个序列:" + str);Thread.sleep(1000);} else {// 队列空了,交换队列(拿满队列)System.out.println("consumer:队列空了,换个满的");current = exchanger.exchange(current); System.out.println("consumer:换满的成功~~~~~~~~~~~~~~~~~~~~~~");}}} catch (Exception e) { /* 忽略 */ }}}
    }
    
    • 动态队列交换

      • 生产者队列满 → 用 exchanger.exchange(current) 交换出空队列,继续生产;
      • 消费者队列空 → 用 exchanger.exchange(current) 交换出满队列,继续消费;
    • 解耦生产和消费:通过交换队列,避免生产者/消费者因队列满/空阻塞,灵活控制数据流转。

5.4 应用场景总结

  • 数据交换:两个线程需要安全交换数据(如交易场景的“钱 - 货”交换);
    • 保证“交换原子性”,避免数据不一致;
  • 数据采集:采集线程(生产者)和处理线程(消费者)交换数据(如日志采集→日志处理);
    • 解耦数据生产和消费,通过交换数据缓冲,提升系统吞吐量。

6 Phaser(阶段协同器)

6.1 简介

  • Phaser 用于协调多个线程的多阶段执行,支持:

    • 动态调整参与线程的数量(可增、可减);

    • 分阶段同步(线程完成当前阶段,再一起进入下一阶段);

    • CyclicBarrier 更灵活(支持动态线程数、多阶段),比 CountDownLatch 更强大(可循环、可动态调整);

  • 核心特性

    • 多阶段同步:线程可以分多个阶段执行(如 phase-0 → phase-1 → phase-2),每个阶段都需要线程同步后再继续;

    • 动态线程管理

      • 可通过 register() 动态增加参与线程;

      • 可通过 arriveAndDeregister() 动态减少参与线程;

    • 灵活的阶段控制:每个阶段完成后,可自定义逻辑(重写 onAdvance 方法),决定是否继续下一阶段;

  • 工作流程

    1. 阶段 0(phase-0
      • 多个线程执行“阶段 0”的任务;
      • 线程调用 arriveAndAwaitAdvance() 表示“阶段 0 完成”,等待其他线程也完成“阶段 0”;
    2. 进入阶段 1(phase-1
      • 所有线程都完成“阶段 0”后,一起进入“阶段 1”;
      • 重复“执行任务 → 同步等待”的流程;
    3. 多阶段循环:支持多个阶段(phase-0 → phase-1 → phase-2 → ...),直到手动终止或所有线程退出;

    在这里插入图片描述

6.2 常用 API

  • 构造方法

    构造方法作用
    Phaser()初始化一个“参与任务数为 0”的 Phaser,后续用 register() 动态添加线程
    Phaser(int parties)指定初始参与线程数(类似 CyclicBarrierparties
    Phaser(Phaser parent)作为子阶段协同器,依附于父 Phaser,适合复杂多阶段场景
    Phaser(Phaser parent, int parties)结合父 Phaser 和初始线程数,更灵活的初始化
  • 增减参与线程

    方法作用
    int register()动态增加一个参与线程,返回当前阶段号
    int bulkRegister(int parties)动态增加多个参与线程(批量注册),返回当前阶段号
    int arriveAndDeregister()线程完成任务后,退出参与(减少一个线程),返回当前阶段号
  • 到达、等待方法

    方法作用
    int arrive()标记“当前线程完成阶段任务”,但不等待其他线程,继续执行
    int arriveAndAwaitAdvance()标记“当前线程完成阶段任务”,等待其他线程也完成,再进入下一阶段
    int awaitAdvance(int phase)等待进入指定阶段(需当前阶段匹配)
    int awaitAdvanceInterruptibly(int phase)同上,但等待中可被中断
    int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)带超时的等待,超时后抛出异常
  • 阶段自定义逻辑

    protected boolean onAdvance(int phase, int registeredParties) 
    
    • 作用:每个阶段完成后,自动调用此方法,决定是否继续下一阶段;

    • 返回值

      • true:阶段结束,Phaser 不再继续(可用于终止多阶段流程);
      • false:继续下一阶段。

6.3 使用

  • 需求:模拟了公司团建的多阶段活动,团建分4个阶段,参与人数动态变化:

    • 阶段0:所有人到公司集合 → 出发去公园

    • 阶段1:所有人到公园门口 → 出发去餐厅

    • 阶段2:部分人到餐厅(有人提前离开,有人新增加入)→ 开始用餐

    • 阶段3:用餐结束 → 活动终止

    • 参与人数不固定(有人早退、有人中途加入),每个阶段必须等人齐了再继续

  • 代码:

    public class PhaserDemo {public static void main(String[] args) {final Phaser phaser = new Phaser() {// 每个阶段完成后自动调用下面的 onAdvance 方法,打印阶段总结,并判断是否终止(只剩主线程时终止)@Overrideprotected boolean onAdvance(int phase, int registeredParties) {// registeredParties 是当前注册的线程数(包括主线程),减去 1 得到实际员工数// 主线程:作为协调者,全程参与并动态添加中途加入者int staffs = registeredParties - 1;// 每个阶段完成后的提示信息switch (phase) {case 0:System.out.println("大家都到公司了,出发去公园,人数:" + staffs);break;case 1:System.out.println("大家都到公园门口了,出发去餐厅,人数:" + staffs);break;case 2:System.out.println("大家都到餐厅了,开始用餐,人数:" + staffs);break;}// 终止条件:只剩主线程(registeredParties == 1)return registeredParties == 1;}};// 注册主线程————让主线程全程参与phaser.register();final StaffTask staffTask = new StaffTask();// 全程参与者:3 人(参与所有 4 个阶段)for (int i = 0; i < 3; i++) {// 添加任务数phaser.register();new Thread(() -> {try {staffTask.step1Task();//到达后等待其他任务到达phaser.arriveAndAwaitAdvance();staffTask.step2Task();phaser.arriveAndAwaitAdvance();staffTask.step3Task();phaser.arriveAndAwaitAdvance();staffTask.step4Task();// 完成了,注销离开phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}// 早退者:2 人(只参与前 2 个阶段,到公园后离开)for (int i = 0; i < 2; i++) {phaser.register();new Thread(() -> {try {staffTask.step1Task();phaser.arriveAndAwaitAdvance();staffTask.step2Task();System.out.println("员工【" + Thread.currentThread().getName() + "】回家了");// 完成了,注销离开phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}// 中途加入者:4 人(从阶段 2 开始参与,直接到餐厅聚餐)while (!phaser.isTerminated()) {int phase = phaser.arriveAndAwaitAdvance();if (phase == 2) {for (int i = 0; i < 4; i++) {phaser.register();new Thread(() -> {try {staffTask.step3Task();phaser.arriveAndAwaitAdvance();staffTask.step4Task();// 完成了,注销离开phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}}}}static final Random random = new Random();// 封装了 4 个阶段的具体动作(从家出发→到公司→去公园→去餐厅→用餐),每个阶段用 Thread.sleep 模拟耗时static class StaffTask {public void step1Task() throws InterruptedException {// 第一阶段:来公司集合String staff = "员工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "从家出发了……");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到达公司");}public void step2Task() throws InterruptedException {// 第二阶段:出发去公园String staff = "员工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "出发去公园玩");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到达公园门口集合");}public void step3Task() throws InterruptedException {// 第三阶段:去餐厅String staff = "员工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "出发去餐厅");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到达餐厅");}public void step4Task() throws InterruptedException {// 第四阶段:就餐String staff = "员工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "开始用餐");Thread.sleep(random.nextInt(5000));System.out.println(staff + "用餐结束,回家");}}
    }
    
    • 阶段0:到公司集合
      • 主线程 + 3个全程者 + 2个早退者 → 共6个线程,调用 step1Task()(从家到公司)
      • 完成后调用 phaser.arriveAndAwaitAdvance() → 等待所有人到公司
      • 阶段结束:onAdvance 触发,打印“出发去公园,人数:5”(6-1=5)
    • 阶段1:到公园门口
      • 所有线程调用 step2Task()(从公司到公园),完成后调用 phaser.arriveAndAwaitAdvance() → 等待所有人到公园
      • 2个早退者完成后调用 phaser.arriveAndDeregister() → 退出(注册线程数变为6-2=4)
      • 阶段结束:onAdvance 触发,打印“出发去餐厅,人数:3”(4-1=3,只剩3个全程者+主线程)
    • 阶段2:到餐厅集合
      • 主线程动态添加:检测到阶段2时,新增4个中途加入者 → 注册线程数变为4+4=8
        • 3个全程者调用 step3Task()(从公园到餐厅)
        • 4个新加入者直接调用 step3Task()(到餐厅)
      • 所有人调用 phaser.arriveAndAwaitAdvance() → 等待到餐厅
      • 阶段结束:onAdvance 触发,打印“开始用餐,人数:7”(8-1=7,3+4=7个员工+主线程)
    • 阶段3:用餐结束
      • 所有7人调用 step4Task()(用餐),完成后调用 phaser.arriveAndDeregister() → 所有人退出(注册线程数逐渐减少至1,只剩主线程)
      • 终止条件:onAdvance 检测到 registeredParties == 1 → 返回 truePhaser 终止
  • Phaser 核心特性体现

    • 多阶段同步:通过 arriveAndAwaitAdvance() 实现每个阶段的等待,确保人齐后再进入下一阶段;

    • 动态线程管理

      • phaser.register():新增参与者(如中途加入的4人);

      • phaser.arriveAndDeregister():参与者退出(如早退者和用餐结束的人);

    • 阶段自定义逻辑onAdvance 方法实现每个阶段的总结,并控制流程终止条件;

    • 灵活的协同:相比 CyclicBarrier(固定线程数),Phaser 能应对“有人早退、有人中途加入”的动态场景。

6.4 应用场景总结

  • 多线程任务分配:把一个复杂任务拆成多个子任务,分配给不同线程并行执行,且需要协调子任务的进度(比如所有子任务完成后,再合并结果);

    • Phaser 分阶段管理:

      • 阶段 0:子任务分配,线程开始执行
      • 阶段 1:所有子任务完成,合并结果
    • 支持动态调整线程数(比如某个子任务需要更多线程,用 register() 新增);

  • 多级任务流程:任务需要分多个层级/阶段执行,必须等当前级所有任务完成,才能触发下一级任务(比如“数据采集→数据清洗→数据汇总→结果输出”);

    • 每个层级对应 Phaser 的一个阶段(phase-0 采集→phase-1 清洗→phase-2 汇总);

    • 通过 arriveAndAwaitAdvance() 确保“当前级完成后,再进入下一级”,流程更清晰;

  • 模拟并行计算:模拟分布式并行计算(比如科学计算、大数据处理 ),需要协调多个线程的“计算阶段”(比如矩阵计算分块执行,所有分块完成后再合并);

    • Phaser 同步“分块计算阶段”和“合并阶段”,确保:
      • 所有分块计算完成(阶段 0 同步);
      • 合并结果后,再进入下一阶段(阶段 1 同步);
  • 阶段性任务:任务天然是阶段性的,每个阶段需要所有线程同步后再继续(比如“团队项目”:需求评审→开发→测试→上线,每个阶段必须全员完成);

    • 每个阶段对应 Phaserphase,通过 arriveAndAwaitAdvance() 实现“阶段同步”;

    • 支持动态调整参与线程(比如测试阶段需要新增测试人员,用 register() 加入);

  • 上面所有场景都需要多阶段同步 + 动态线程协作,每个阶段必须等所有线程完成,再进入下一阶段。Phaser 优势:

    • CyclicBarrier 更灵活:支持动态增减线程多阶段自定义逻辑onAdvance);

    • CountDownLatch 更强大:可循环分阶段,而非一次性同步。

http://www.xdnf.cn/news/18937.html

相关文章:

  • sr04模块总结
  • Scala面试题及详细答案100道(41-50)-- 模式匹配
  • MySQL底层数据结构与算法浅析
  • 捡捡java——2、基础05
  • 部署2.516.2版本的jenkins,同时适配jdk8
  • 【Windows】netstat命令解析及端口状态解释
  • React过渡更新:优化渲染性能的秘密
  • Vue3组件加载顺序
  • MySQL 索引
  • THM Whats Your Name WP
  • SDK、JDK、JRE、JVM的区别
  • python使用sqlcipher4对sqlite数据库加密
  • Mip-splatting
  • GCC版本和C语言标准版本的对应关系
  • java去图片水印的方法
  • 生产环境Vue组件报错:Cannot access before initialization
  • 使用qianjkun uniapp 主应用 集成 vue微应用
  • 8.28作业
  • 可改善能源利用水平、削减碳排放总量,并为可再生能源规模化发展提供有力支撑的智慧能源开源了
  • Python Imaging Library (PIL) 全面指南:Python Imaging Library (PIL)基础图像处理入门
  • 【图像处理基石】DCT在图像处理中的应用及实现
  • 从零开始学习JavaWeb-20
  • 第二十节:3D文本渲染 - 字体几何体生成与特效
  • Node.js终极文本转图指南
  • 使用 Action 自动部署 VuePress 到 GitHub Pages
  • Webdriver-Manager 4.0.1启动错误解决
  • Komo Searc-AI驱动的搜索引擎
  • 区块链+隐私计算护航“东数西算”数据安全报告
  • 20.22 QLoRA微调实战:中文语音识别数据准备全流程解密
  • hintcon2025No Man‘s Echo