当前位置: 首页 > news >正文

Java并发:资源共享

目录

一、资源共享的核心机制

1、volatile关键字

2、不可变对象

3、线程封闭-ThreadLocal

4、隐式锁+显式锁+CAS无锁及原子类

二、并发容器与工具

1、并发集合

1.1 ConcurrentHashMap(高并发键值存储)

1.2 CopyOnWriteArrayList(读多写少的列表)

1.3 ConcurrentSkipListMap/Set(有序的并发 Map/Set)

1.4 BlockingQueue(阻塞队列)

1.4.1 ArrayBlockingQueue(有界队列--生产者-消费者模型)

1.4.2 LinkedBlockingQueue(可选有界/无界)

1.4.3 PriorityBlockingQueue(无界优先级队列--优先级任务调度)

1.4.4 SynchronousQueue(直接传递队列)

1.5 ConcurrentLinkedQueue(无锁非阻塞队列)

1.6 ConcurrentLinkedDeque(双端队列)

2、同步工具类

2.1 CountDownLatch(倒计时闩)

2.2 CyclicBarrier(循环屏障)

2.3 Semaphore(信号量)

2.4 Phaser(阶段器)

2.5 Exchanger(交换器)

2.6 Future 与 CompletableFuture(异步任务处理)

2.6.1 Future(阻塞)

2.6.2 CompletableFuture(非阻塞)

2.7 Lock与Condition(多条件变量锁)

2.8 BlockingQueue(阻塞队列)

2.9 Fork/Join 框架(并行任务处理)


一、资源共享的核心机制

1、volatile关键字

基本作用:

  • 可见性保证(强制主内存访问)与禁止指令重排序(插入内存屏障)

  • 不保证原子性(如i++问题),无互斥性,不解决竞态条件

适用场景:

  • 状态标志:简单的boolean状态标志

  • 发布不可变对象:通过 volatile 安全发布不可变对象(如 final 修饰的字段),确保其他线程看到的是构造完成后的对象。避免使用 synchronized 或 AtomicReference,简化代码

  • 单例模式的双重检查锁定(DCL)

class Singleton {// volatile 防止 new Singleton() 的指令重排序(对象分配内存、初始化、引用赋值),避免其他线程获取到未初始化的对象。private volatile static Singleton instance;public static Singleton getInstance() {if (instance == null) {synchronized (Singleton.class) {if (instance == null) {instance = new Singleton();}}}return instance;}
}

注意事项:频繁读写volatile变量可能因内存屏障和缓存同步导致性能损耗,需谨慎使用

2、不可变对象

特性:对象状态在构造后不可修改(如 String)。

优势:天然线程安全,无需同步。

实现方式

  • 所有字段声明为 final

  • 不提供修改状态的方法。

3、线程封闭-ThreadLocal

ThreadLocal是一个用于实现线程封闭(Thread Confinement)的核心工具,每个线程持有变量的独立副本

核心作用:

  • 线程本地存储:为每个线程创建变量的独立副本,实现线程间数据隔离

  • 避免共享:消除多线程竞争,无需同步即可保证线程安全

典型场景:

  • 线程上下文传递(如用户会话、事务ID)

  • 线程不安全的对象复用(如SimpleDateFormat

  • 框架级参数透传(如Spring的事务管理、日志MDC)

基本用法:

// 推荐初始化方式(Java 8+)
private static final ThreadLocal<UserContext> userContextHolder = ThreadLocal.withInitial(UserContext::new);public void processRequest() {try {UserContext context = userContextHolder.get();// 业务逻辑...} finally {userContextHolder.remove();  // 必须清理!}
}

API核心方法:

方法说明
T get()返回当前线程的副本值(首次调用会触发initialValue()
void set(T value)设置当前线程的副本值
void remove()删除当前线程的副本值(避免内存泄漏的关键操作)
initialValue()初始值默认实现(可重写)

实现原理:每个Thread对象内部持有ThreadLocalMap(静态内部类),Key为ThreadLocal实例(弱引用),Value为存储的值(强引用)

内存泄漏问题--风险来源:当 ThreadLocal 实例被垃圾回收(GC)后,Entry 的 Key 变为 null,但 Value 仍然是强引用。若线程长期存活(如线程池中的线程),且未主动清理 Entry,这些 Key 为 null 的 Value 会持续占用内存。

// 内存泄漏示例
// 问题:线程池中的线程会反复执行 processRequest(),导致每个线程的 ThreadLocalMap 中积累大量 Key 为 null 的 Value(1MB 的 byte[]),最终引发内存溢出(OOM)public class LeakDemo {private static final ThreadLocal<byte[]> threadLocal = new ThreadLocal<>();public void processRequest() {threadLocal.set(new byte[1024 * 1024]); // 1MB 数据// 未调用 threadLocal.remove()}public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(4);while (true) {pool.execute(() -> new LeakDemo().processRequest());}}
}

内存泄漏问题--解决方案:

措施说明
及时调用remove()线程使用完ThreadLocal后必须清理(尤其是线程池场景)
使用static修饰ThreadLocal静态ThreadLocal 属于类而非实例,所有线程共享同一个,从而减少ThreadLocal 数量,降低泄漏频率
Java 8优化ThreadLocalMap在set/get时自动清理脏Entry(探测式清理/启发式清理)

4、隐式锁+显式锁+CAS无锁及原子类

具体的锁相关信息,参考:Java并发:锁

二、并发容器与工具

1、并发集合

1.1 ConcurrentHashMap(高并发键值存储)

使用:

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.computeIfAbsent("key", k -> 0 + 1); //复合操作需使用原子方法(如 computeIfAbsent)

注意:避免大规模扩容(初始化时预估容量);迭代器可能不反应最新修改(分段遍历);

          复合操作仍需要同步:例如 if (!map.containsKey(k)) map.put(k, v) 需替换为原子方法 putIfAbsent()

特性:

  • 高并发:JDK 8 后采用 CAS + synchronized 锁单个桶(Node),替代 JDK 7 的分段锁。

  • 无锁读:读操作完全无锁,直接访问 volatile 变量。

  • 不允许 null 键/值:避免歧义(如 get(key) 返回 null 时无法区分是键不存在还是值为 null)。

原理:

  • 桶数组 + 链表/红黑树(冲突时)。

  • 写操作使用 synchronized 锁住单个桶,读操作无锁。

适用场景:

  • 高并发键值存储(如缓存、计数器)。

  • 替代 Hashtable 和 Collections.synchronizedMap

1.2 CopyOnWriteArrayList(读多写少的列表)

使用:

CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item");
list.forEach(System.out::println); // 无锁遍历

注意:写操作性能差(大规模数据时复制成本高);迭代器可能读取到旧数据

特性:

  • 写时复制:写操作(增、删、改)复制整个数组,保证读操作无锁。

  • 弱一致性迭代:迭代器基于创建时的数据快照。

原理:

  • 底层使用 volatile 数组,写操作加锁并复制新数组。

  • 读操作直接访问原数组。

适用场景:读多写极少(如监听器列表、配置项)

CopyOnWriteArraySet:内部封装 CopyOnWriteArrayList,通过遍历检查元素唯一性,元素唯一性由 equals() 保证。用于读多写极少的集合需求。写性能差,不适用于频繁修改的场景

CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("element");

1.3 ConcurrentSkipListMap/Set(有序的并发 Map/Set)

 使用:

ConcurrentSkipListMap<Integer, String> sortedMap = new ConcurrentSkipListMap<>();
sortedMap.put(3, "C");
sortedMap.put(1, "A");

注意:内存开销较高(跳表的多层指针结构);适用于读多写少场景

特性:

  • 有序:基于跳表(Skip List)实现,按自然顺序或自定义比较器排序。

  • 无锁读:读操作完全并发。

原理:

  • 跳表结构:多层链表,通过概率平衡实现高效查找(时间复杂度 O(log n))。

  • 写操作使用 CAS 和少量锁。

适用场景:需要有序且高并发的键值存储(如排行榜、范围查询)

ConcurrentSkipListSet:跳表结构,类似 ConcurrentSkipListMap。基于 ConcurrentSkipListMap 实现,有序集合。适用于需要有序且高并发的集合(如分布式任务调度)

ConcurrentSkipListSet<Integer> sortedSet = new ConcurrentSkipListSet<>();
sortedSet.add(5);

1.4 BlockingQueue(阻塞队列)

原理:使用 ReentrantLock 和 Condition 实现阻塞(如 put() 和 take()

1.4.1 ArrayBlockingQueue(有界队列--生产者-消费者模型)

原理:基于数组实现,固定容量。双锁设计(生产者和消费者使用不同的锁,提高吞吐量)。

使用:

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("task"); // 阻塞直到空间可用
String task = queue.take(); // 阻塞直到元素可用

使用场景:生产者-消费者模型(如线程池任务队列)

1.4.2 LinkedBlockingQueue(可选有界/无界

原理:基于链表实现,默认无界(Integer.MAX_VALUE

注意:无界队列可能导致内存溢出( LinkedBlockingQueue 默认无界)

1.4.3 PriorityBlockingQueue(无界优先级队列--优先级任务调度

元素按自然顺序或比较器排序

1.4.4 SynchronousQueue(直接传递队列

不存储元素,生产者必须等待消费者

1.5 ConcurrentLinkedQueue(无锁非阻塞队列)

原理:链表结构,head 和 tail 指针使用 CAS 更新

使用:

ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("item");
String item = queue.poll();

注意:无阻塞语义,需自行处理队列空/满逻辑

特性:

  • 无锁非阻塞队列:基于 CAS 实现。

  • 高并发:适合高吞吐场景。

适用场景:高并发任务调度(如事件总线)

1.6 ConcurrentLinkedDeque(双端队列)

原理:链表结构,头尾节点独立更新,支持从两端插入和移除;无锁实现,基于 CAS

使用:

ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();
deque.addFirst("front");
deque.addLast("end");

注意:无阻塞方法,需结合其他机制实现阻塞语义

适用场景:高并发双端操作(如工作窃取算法)

2、同步工具类

2.1 CountDownLatch(倒计时闩)

作用:等待一组线程完成指定任务后,再触发后续操作(一次性使用

核心方法:

  • await():阻塞等待计数器归零。

  • countDown():计数器减 1。

使用:主线程等待所有子线程初始化完成

CountDownLatch latch = new CountDownLatch(3);for (int i = 0; i < 3; i++) {new Thread(() -> {// 执行初始化任务latch.countDown();}).start();
}latch.await(); // 等待所有子线程完成任务
System.out.println("所有任务完成");

适用场景:

  • 服务启动时等待依赖组件初始化完成。

  • 多线程任务完成后汇总结果。

2.2 CyclicBarrier(循环屏障

作用:让一组线程相互等待,达到同步点(屏障)后一起继续执行(可重复使用

核心方法:

  • await():线程到达屏障并等待其他线程。

使用:多线程分阶段处理数据(支持回调任务)

CyclicBarrier barrier = new CyclicBarrier(3, () -> {System.out.println("所有线程已到达,执行回调任务!");
});for (int i = 0; i < 3; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 准备就绪");barrier.await();} catch (Exception e) {e.printStackTrace();}}, "线程-" + i).start();
}

输出:

线程-0 准备就绪
线程-1 准备就绪
线程-2 准备就绪
所有线程已到达,执行回调任务!

适用场景:

  • 并行计算的分阶段结果合并。

  • 多线程测试中的协同执行。

2.3 Semaphore(信号量)

作用:控制同时访问特定资源的线程数量(限流)

核心方法:

  • acquire():获取许可(若无可选许可则阻塞)。

  • release():释放许可。

使用:限制数据库连接池并发数

Semaphore semaphore = new Semaphore(5); // 最大5个并发semaphore.acquire(); // 获取许可
try {// 使用数据库连接
} finally {semaphore.release(); // 释放许可
}

适用场景:

  • 资源池管理(如线程池、连接池)。

  • 高并发系统的限流保护。

2.4 Phaser(阶段器)

作用:用于协调多个线程分阶段执行任务。它支持动态调整参与者数量,并允许每个阶段(phase)结束后进行自定义操作(更灵活的 CyclicBarrier

核心方法:

方法说明
Phaser()创建 Phaser,初始参与者数为 0。
Phaser(int parties)创建 Phaser,并指定初始参与者数。
register()注册一个参与者(线程)。
arrive()到达当前阶段,但不等待其他线程(非阻塞)。
arriveAndAwaitAdvance()到达当前阶段,并阻塞等待其他线程到达。
arriveAndDeregister()到达当前阶段,并注销一个参与者。
onAdvance(int phase, int parties)可重写的方法,当所有线程到达后触发,用于阶段结束时的自定义逻辑(如数据聚合)。

使用1:分阶段任务同步

import java.util.concurrent.Phaser;public class PhaserDemo {public static void main(String[] args) {Phaser phaser = new Phaser(3); // 初始3个参与者for (int i = 0; i < 3; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName() + " 完成阶段1");phaser.arriveAndAwaitAdvance(); // 等待所有线程完成阶段1System.out.println(Thread.currentThread().getName() + " 完成阶段2");phaser.arriveAndAwaitAdvance(); // 等待所有线程完成阶段2System.out.println(Thread.currentThread().getName() + " 退出");phaser.arriveAndDeregister();    // 注销自身}, "线程-" + i).start();}}
}

输出1:

线程-0 完成阶段1
线程-1 完成阶段1
线程-2 完成阶段1
线程-0 完成阶段2
线程-1 完成阶段2
线程-2 完成阶段2
线程-0 退出
线程-1 退出
线程-2 退出

使用2:动态注册与注销

Phaser phaser = new Phaser(1); // 主线程作为初始参与者// 阶段0:主线程准备任务
System.out.println("阶段0:主线程准备任务");// 动态添加3个子任务
for (int i = 0; i < 3; i++) {phaser.register(); // 注册新参与者new Thread(() -> {System.out.println(Thread.currentThread().getName() + " 执行任务");phaser.arriveAndDeregister(); // 执行完任务后注销}).start();
}// 主线程等待所有子任务完成
phaser.arriveAndAwaitAdvance();
System.out.println("所有子任务完成,进入阶段1");

输出2:

阶段0:主线程准备任务
Thread-0 执行任务
Thread-1 执行任务
Thread-2 执行任务
所有子任务完成,进入阶段1

适用场景:

  • 复杂多阶段任务(如游戏 AI 的回合制逻辑)。

  • 动态增减参与线程的任务协同。

2.5 Exchanger(交换器)

作用:两个线程在同步点交换数据(仅限成对线程

核心方法:

exchange(T data):阻塞等待另一线程到达并交换数据

使用:生产者-消费者数据交换

Exchanger<String> exchanger = new Exchanger<>();new Thread(() -> {String data = "生产的数据";String received = exchanger.exchange(data);
}).start();new Thread(() -> {String data = "消费的数据";String received = exchanger.exchange(data);
}).start();

适用场景:

  • 线程间数据交换(如流水线处理)。

  • 双缓冲区的数据传递。

2.6 Future 与 CompletableFuture(异步任务处理)

2.6.1 Future(阻塞)

Future 是 Java 并发编程中用于表示异步任务结果的核心接口,其核心作用是为开发者提供一种非阻塞的异步任务管理机制

作用:

1> 异步提交:允许将耗时任务(如网络请求、复杂计算、I/O操作)提交到线程池中执行,主线程无需等待结果,继续执行其他逻辑。

2> 结果延迟获取:通过 get() 方法在需要时获取任务结果,若任务未完成,线程可选择阻塞等待或超时处理。

3> 任务状态监控:提供 isDone()isCancelled() 等方法,实时检查任务执行状态

4> 任务取消控制:支持通过 cancel() 方法中断正在执行的任务或阻止未启动的任务运行

核心方法:

方法作用
V get()阻塞等待任务完成并返回结果。
V get(long timeout, TimeUnit unit)带超时的阻塞等待,超时后抛出 TimeoutException。注意:超时后,需要手动取消其他任务(cancel(true)),释放资源
boolean isDone()检查任务是否完成(完成、取消或异常均返回 true)。
boolean cancel(boolean mayInterrupt)尝试取消任务。mayInterrupt=true 表示允许中断正在执行的任务。false:仅取消未启动的任务。

使用1:在电商系统中,用户查看订单详情页时,需要同时获取订单基本信息、商品库存状态和物流信息。为提高响应速度,通过多线程并行查询这三个数据源,最后聚合结果返回给用户

import java.util.concurrent.*;public class OrderDetailService {// 创建固定线程池(实际场景建议使用 ThreadPoolExecutor 自定义核心参数,如队列大小、拒绝策略)private static final ExecutorService executor = Executors.newFixedThreadPool(3);public static void main(String[] args) {// 提交三个并行任务Future<String> orderInfoFuture = executor.submit(() -> fetchOrderInfo("1001"));Future<Integer> stockFuture = executor.submit(() -> fetchStock("SKU-123"));Future<String> logisticsFuture = executor.submit(() -> fetchLogistics("LP-456"));try {// 等待所有任务完成(最大等待3秒)String orderInfo = orderInfoFuture.get(3, TimeUnit.SECONDS);Integer stock = stockFuture.get(3, TimeUnit.SECONDS);String logistics = logisticsFuture.get(3, TimeUnit.SECONDS);// 聚合结果String result = String.format("订单信息:%s,库存:%d,物流:%s", orderInfo, stock, logistics);System.out.println(result);} catch (TimeoutException e) {// 处理超时:取消未完成的任务orderInfoFuture.cancel(true);stockFuture.cancel(true);logisticsFuture.cancel(true);System.out.println("部分数据获取超时,请重试");} catch (InterruptedException | ExecutionException e) {// InterruptedException:线程被中断(如服务关闭),中断线程可能导致数据不一致(如数据库事务未提交),需结合业务逻辑处理// ExecutionException:任务执行时抛出异常(如远程调用失败)System.out.println("数据获取异常:" + e.getMessage());} finally {executor.shutdown(); // 关闭线程池,防止资源泄漏}}// 模拟远程调用:获取订单信息private static String fetchOrderInfo(String orderId) throws InterruptedException {Thread.sleep(1000); // 模拟耗时1秒return "订单号:" + orderId;}// 模拟远程调用:获取商品库存private static Integer fetchStock(String sku) throws InterruptedException {Thread.sleep(1500); // 模拟耗时1.5秒return 50;}// 模拟远程调用:获取物流信息private static String fetchLogistics(String logisticsId) throws InterruptedException {Thread.sleep(2000); // 模拟耗时2秒return "物流单号:" + logisticsId;}
}

使用2:超时熔断,调用外部服务时设置超时,避免长时间阻塞导致系统雪崩

Future<String> future = executor.submit(() -> callExternalService());try {String result = future.get(2, TimeUnit.SECONDS); // 最多等待2秒
} catch (TimeoutException e) {future.cancel(true); // 超时后取消任务System.out.println("调用超时,已取消任务");
}

适用场景:

1> 并行远程调用:同时请求多个外部服务(如订单、库存、物流)。

2> 批量数据处理:分片处理大数据集,最后合并结果。

3> 超时熔断:快速失败(Fast Fail)机制,避免系统雪崩。

4> 定时任务监控:检查异步任务是否按时完成。

缺点:

1> 阻塞式获取结果:get() 方法会阻塞线程,若任务未完成,可能导致线程资源浪费

2> 无法链式处理:不支持任务完成后的回调(如结果处理、异常处理),需手动编写逻辑

3> 组合能力弱:难以实现多个任务的依赖组合(如任务A完成后触发任务B)

2.6.2 CompletableFuture(非阻塞)

CompletableFuture 是 Java 8 引入的异步编程工具,是 Future 的增强版,旨在解决 Future 的局限性。其核心作用包括:

1> 异步任务管理:提交任务并异步获取结果,避免阻塞主线程。

2> 链式调用:支持任务流水线处理(如结果转换、异常处理、任务组合)。

3> 任务组合:合并多个异步任务的结果(如并行执行后聚合)。

4> 手动控制:允许手动完成任务或抛出异常。

5> 线程池灵活控制:可指定自定义线程池,优化资源使用。

核心方法:

1> 创建异步任务

  • supplyAsync():异步执行有返回值的任务。

  • runAsync():异步执行无返回值的任务。

  • 指定线程池:默认使用 ForkJoinPool.commonPool(),可自定义 Executor

2> 结果处理

  • thenApply():同步处理结果,返回新值。

  • thenApplyAsync():异步处理结果(使用默认或自定义线程池)。

  • thenAccept():消费结果(无返回值)。

  • thenRun():任务完成后执行操作,不依赖结果。

3> 异常处理

  • exceptionally():捕获异常并返回默认值。

  • handle():无论成功与否,统一处理结果和异常。

  • whenComplete():类似 handle,但不修改结果,仅用于副作用(如日志记录)。

4> 组合多个任务

  • thenCompose():串联两个任务(扁平化)。

  • thenCombine():合并两个任务的结果。

  • allOf():等待所有任务完成。

  • anyOf():等待任意一个任务完成。

5> 手动控制

  • complete():手动完成任务。

  • completeExceptionally():手动抛出异常,标记任务失败。

6> 超时处理(Java 9+)

  • orTimeout(long timeout, TimeUnit unit):设置超时时间,超时后抛出 TimeoutException

  • completeOnTimeout(T value, long timeout, TimeUnit unit):超时后返回默认值。

使用1:电商订单异步处理

        步骤:

                1> 校验库存(远程调用库存服务)

                2> 生成订单(数据库操作)

                3> 发送通知(短信、邮件,可并行)

                4> 记录日志(异步落盘)

// 自定义线程池(I/O 密集型任务使用更大的线程数)
ExecutorService ioExecutor = Executors.newFixedThreadPool(8);// 1. 异步校验库存
CompletableFuture<Boolean> checkStockFuture = CompletableFuture.supplyAsync(() -> {return inventoryService.checkStock(order.getProductId(), order.getQuantity());
}, ioExecutor);// 2. 库存充足后生成订单
CompletableFuture<Order> createOrderFuture = checkStockFuture.thenComposeAsync(isStockSufficient -> {if (!isStockSufficient) {throw new InsufficientStockException("库存不足");}return CompletableFuture.supplyAsync(() -> orderService.create(order), ioExecutor);
}, ioExecutor);// 3. 并行发送通知
CompletableFuture<Void> sendSmsFuture = createOrderFuture.thenAcceptAsync(order -> {smsService.send(order.getUserId(), "订单创建成功");
}, ioExecutor);CompletableFuture<Void> sendEmailFuture = createOrderFuture.thenAcceptAsync(order -> {emailService.send(order.getUserId(), "订单确认邮件");
}, ioExecutor);// 4. 合并通知任务,记录日志(任意通知完成即可记录)
CompletableFuture<Void> allNotifications = CompletableFuture.allOf(sendSmsFuture, sendEmailFuture);
allNotifications.thenRunAsync(() -> logService.log(order), ioExecutor);// 5. 异常统一处理
createOrderFuture.exceptionally(ex -> {if (ex instanceof InsufficientStockException) {notifyUser("库存不足,订单失败");} else {notifyUser("系统错误,请重试");}return null;
});

使用2:订单接口直接返回,异步发送短信

@RestController
public class OrderController {private final OrderService orderService;private final SmsService smsService;private final Executor asyncExecutor;  // 自定义线程池// 初始化服务与线程池public OrderController(OrderService orderService, SmsService smsService) {this.orderService = orderService;this.smsService = smsService;// 创建专用的异步线程池(I/O 密集型任务建议使用较大的线程数)this.asyncExecutor = Executors.newFixedThreadPool(8);}@PostMapping("/order")public ResponseEntity<String> createOrder(@RequestBody OrderRequest request) {// 1. 同步处理订单(主流程)Order order = orderService.createOrder(request);// 2. 异步发送短信(不阻塞主流程)CompletableFuture.runAsync(() -> {try {smsService.sendSms(order.getUserPhone(), "您的订单已创建,订单号:" + order.getId());} catch (Exception e) {// 异常处理:记录日志,但不影响主流程log.error("短信发送失败,订单号:{}", order.getId(), e);}}, asyncExecutor); // 指定自定义线程池// 3. 立即返回响应return ResponseEntity.ok("订单创建成功,订单号:" + order.getId());}
}

注意事项:

1> 线程池选择

  • CPU 密集型:线程数 ≈ CPU 核数。

  • I/O 密集型:线程数 ≈ (任务等待时间 / 任务总时间) * CPU 核数 * 目标利用率(通常 0.8~0.9)。

  • 使用 CompletableFuture 时显式指定线程池,避免污染公共池。

2> 避免阻塞主线程:使用 join() 或 get() 时需谨慎,必要时设置超时;不要在 CompletableFuture 链中执行阻塞代码(如同步 I/O),否则可能拖慢整个线程池

3> 资源泄漏:确保在所有分支中关闭线程池

4> 异常处理

  • 始终处理可能的异常(exceptionally() 或 handle()),防止静默失败。

  • 避免在异步任务中抛出未检查异常,需显式捕获。

5> 循环引用问题:避免在回调中嵌套创建新的 CompletableFuture,导致内存泄漏

6> 调试技巧:使用 thenApplyAsync 时附加日志,追踪任务链路:

.thenApplyAsync(data -> {log.debug("处理数据: {}", data);return process(data);
})

适用场景:

  • 并行调用多个服务:同时请求订单、库存、物流接口,聚合结果后返回

  • 批量任务处理:使用 allOf() 等待所有子任务完成(如批量文件上传)

  • 分阶段数据处理:先查询数据,再处理数据,最后保存结果

  • 异步流水线操作:如订单创建后异步发送通知、更新缓存

  • 超时与熔断控制:结合 orTimeout() 和 completeOnTimeout() 实现超时降级

Future与 CompletableFuture 的对比

特性FutureCompletableFuture
结果获取阻塞式(get()非阻塞式(thenApply()thenAccept()
任务组合不支持支持链式调用、allOf()anyOf()
异常处理需手动捕获 ExecutionException内置方法(exceptionally()handle()
手动完成任务不支持支持(complete()completeExceptionally()

2.7 Lock与Condition(多条件变量锁)

作用:ReentrantLock 可以创建多个 Condition 对象,实现更精细的线程等待与唤醒机制(替代wait()/notify()的更灵活方式)

优势:通过多个 Condition 分离等待条件,避免无效唤醒

使用:生产者-消费者模型

ReentrantLock lock = new ReentrantLock();
Condition notEmpty = lock.newCondition();
Condition notFull = lock.newCondition();// 生产者
lock.lock();
try {while (queue.isFull()) {notFull.await();}queue.add(data);notEmpty.signal();
} finally {lock.unlock();
}

详细见:Java并发:锁    2.1.1.3 多条件变量(Condition)

适用场景

  • 复杂条件同步(如多条件等待队列)。

  • 需要可中断或超时控制的锁操作。

2.8 BlockingQueue(阻塞队列)

作用:线程安全的队列,支持阻塞插入/移除操作

常用实现:

  • ArrayBlockingQueue:有界数组队列。

  • LinkedBlockingQueue:可选有界链表队列。

  • PriorityBlockingQueue:优先级阻塞队列。

使用:任务调度队列

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
queue.put(task); // 阻塞插入
Runnable task = queue.take(); // 阻塞获取

适用场景:

  • 生产者-消费者任务队列。

  • 线程池任务调度。

2.9 Fork/Join 框架(并行任务处理)

核心思想:Fork/Join框架基于​​分而治之(Divide and Conquer)​​,将大任务递归拆分为子任务(Fork)并行执行,最终合并结果(Join)。例如归并排序、数组求和等场景均适用此模型

核心组件

类/接口作用
ForkJoinPool管理线程的专用线程池(默认线程数=CPU核数)
RecursiveTask<V>有返回值的任务(需实现compute()方法)
RecursiveAction无返回值的任务(需实现compute()方法)
ForkJoinWorkerThread框架内部使用的优化线程

使用示例计算1~n的和(RecursiveTask实现)

class SumTask extends RecursiveTask<Long> {private final int start;private final int end;private static final int THRESHOLD = 10000; // 任务拆分阈值SumTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Long compute() {if (end - start <= THRESHOLD) {// 直接计算小任务long sum = 0;for (int i = start; i <= end; i++) sum += i;return sum;} else {// 拆分任务int mid = (start + end) / 2;SumTask left = new SumTask(start, mid);SumTask right = new SumTask(mid + 1, end);left.fork();        // 异步执行左任务long rightResult = right.compute(); // 同步计算右任务long leftResult = left.join();       // 等待左任务结果return leftResult + rightResult;}}
}// 使用方式
ForkJoinPool pool = ForkJoinPool.commonPool();
long result = pool.invoke(new SumTask(1, 1000000));

工作原理

  • 工作窃取算法:每个线程优先处理自己队列的头部任务,空闲时从其他队列尾部窃取任务,减少线程竞争,提升负载均衡

  • 递归任务拆分:在compute()方法中判断任务是否足够小(通过阈值THRESHOLD),若超过阈值则继续拆分,否则直接计算

适用场景

  • ​计算密集型任务​​:如大规模数组运算、矩阵乘法、归并排序等。
  • ​可递归拆分的任务​​:例如斐波那契数列、图像分块处理等。
  • ​大数据处理​​:分片处理日志或数据集,提高并行效率。
http://www.xdnf.cn/news/70291.html

相关文章:

  • 在CSDN的1095天(创作纪念日)
  • ECMAScript
  • 网络结构及安全科普
  • 【MySQL】表的约束(主键、唯一键、外键等约束类型详解)、表的设计
  • 前端工程化:构建高质量 Web 项目的现代方法论
  • Keil5没有stm32的芯片库
  • 反转字符串
  • 【CUDA 】第5章 共享内存和常量内存——5.2 共享内存的数据分布(2)
  • Nacos 客户端 SDK 的核心功能是什么?是如何与服务端通信的?
  • 【集群IP管理分配技术_DHCP】二、DHCP核心功能与技术实现
  • Openwrt 编译树莓派4B固件
  • 【C++】入门基础知识(下)
  • JAVA实战开源项目:医院资源管理系统 (Vue+SpringBoot) 附源码
  • leetcode day 35 01背包问题 416+1049
  • buildadmin 自定义单元格渲染
  • 【STM32单片机】#10.5 串口数据包
  • 在线打开查看cad免费工具dwg, dxf格式工具网站
  • 14.电容的高频特性在EMC设计中的应用
  • Novartis诺华制药社招入职综合能力测评真题SHL题库考什么?
  • 抱佛脚之学SSM三
  • Anaconda Prompt 切换工作路径的方法
  • RNA Club | CRISPR-Cas 免疫系统的作用原理及其与噬菌体的对抗-王艳丽教授讲座笔记
  • Activity之间交互
  • unity动态骨骼架设+常用参数分享(包含部分穿模解决方案)
  • 22. git show
  • MyBatis-Plus 实战:优雅处理 JSON 字段映射(以 JSONArray 为例)
  • 12个领域近120个典型案例:2024年“数据要素X”大赛典型案例集(附下载)
  • 网络编程4
  • L1-106 偷感好重 - java
  • vision transformer图像分类模型结构介绍