Java并发:资源共享
目录
一、资源共享的核心机制
1、volatile关键字
2、不可变对象
3、线程封闭-ThreadLocal
4、隐式锁+显式锁+CAS无锁及原子类
二、并发容器与工具
1、并发集合
1.1 ConcurrentHashMap(高并发键值存储)
1.2 CopyOnWriteArrayList(读多写少的列表)
1.3 ConcurrentSkipListMap/Set(有序的并发 Map/Set)
1.4 BlockingQueue(阻塞队列)
1.4.1 ArrayBlockingQueue(有界队列--生产者-消费者模型)
1.4.2 LinkedBlockingQueue(可选有界/无界)
1.4.3 PriorityBlockingQueue(无界优先级队列--优先级任务调度)
1.4.4 SynchronousQueue(直接传递队列)
1.5 ConcurrentLinkedQueue(无锁非阻塞队列)
1.6 ConcurrentLinkedDeque(双端队列)
2、同步工具类
2.1 CountDownLatch(倒计时闩)
2.2 CyclicBarrier(循环屏障)
2.3 Semaphore(信号量)
2.4 Phaser(阶段器)
2.5 Exchanger(交换器)
2.6 Future 与 CompletableFuture(异步任务处理)
2.6.1 Future(阻塞)
2.6.2 CompletableFuture(非阻塞)
2.7 Lock与Condition(多条件变量锁)
2.8 BlockingQueue(阻塞队列)
2.9 Fork/Join 框架(并行任务处理)
一、资源共享的核心机制
1、volatile关键字
基本作用:
-
可见性保证(强制主内存访问)与禁止指令重排序(插入内存屏障)
-
不保证原子性(如
i++
问题),无互斥性,不解决竞态条件
适用场景:
-
状态标志:简单的boolean状态标志
-
发布不可变对象:通过
volatile
安全发布不可变对象(如final
修饰的字段),确保其他线程看到的是构造完成后的对象。避免使用synchronized
或AtomicReference
,简化代码 -
单例模式的双重检查锁定(DCL)
class Singleton {// volatile 防止 new Singleton() 的指令重排序(对象分配内存、初始化、引用赋值),避免其他线程获取到未初始化的对象。private volatile static Singleton instance;public static Singleton getInstance() {if (instance == null) {synchronized (Singleton.class) {if (instance == null) {instance = new Singleton();}}}return instance;}
}
注意事项:频繁读写volatile
变量可能因内存屏障和缓存同步导致性能损耗,需谨慎使用
2、不可变对象
特性:对象状态在构造后不可修改(如 String
)。
优势:天然线程安全,无需同步。
实现方式:
-
所有字段声明为
final
。 -
不提供修改状态的方法。
3、线程封闭-ThreadLocal
ThreadLocal
是一个用于实现线程封闭(Thread Confinement)的核心工具,每个线程持有变量的独立副本
核心作用:
-
线程本地存储:为每个线程创建变量的独立副本,实现线程间数据隔离
-
避免共享:消除多线程竞争,无需同步即可保证线程安全
典型场景:
-
线程上下文传递(如用户会话、事务ID)
-
线程不安全的对象复用(如
SimpleDateFormat
) -
框架级参数透传(如Spring的事务管理、日志MDC)
基本用法:
// 推荐初始化方式(Java 8+)
private static final ThreadLocal<UserContext> userContextHolder = ThreadLocal.withInitial(UserContext::new);public void processRequest() {try {UserContext context = userContextHolder.get();// 业务逻辑...} finally {userContextHolder.remove(); // 必须清理!}
}
API核心方法:
方法 | 说明 |
---|---|
T get() | 返回当前线程的副本值(首次调用会触发initialValue() ) |
void set(T value) | 设置当前线程的副本值 |
void remove() | 删除当前线程的副本值(避免内存泄漏的关键操作) |
initialValue() | 初始值默认实现(可重写) |
实现原理:每个Thread
对象内部持有ThreadLocalMap(静态内部类),Key为ThreadLocal
实例(弱引用),Value为存储的值(强引用)
内存泄漏问题--风险来源:当 ThreadLocal
实例被垃圾回收(GC)后,Entry
的 Key 变为 null
,但 Value 仍然是强引用。若线程长期存活(如线程池中的线程),且未主动清理 Entry
,这些 Key 为 null
的 Value 会持续占用内存。
// 内存泄漏示例
// 问题:线程池中的线程会反复执行 processRequest(),导致每个线程的 ThreadLocalMap 中积累大量 Key 为 null 的 Value(1MB 的 byte[]),最终引发内存溢出(OOM)public class LeakDemo {private static final ThreadLocal<byte[]> threadLocal = new ThreadLocal<>();public void processRequest() {threadLocal.set(new byte[1024 * 1024]); // 1MB 数据// 未调用 threadLocal.remove()}public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(4);while (true) {pool.execute(() -> new LeakDemo().processRequest());}}
}
内存泄漏问题--解决方案:
措施 | 说明 |
---|---|
及时调用remove() | 线程使用完ThreadLocal后必须清理(尤其是线程池场景) |
使用static修饰ThreadLocal | 静态ThreadLocal 属于类而非实例,所有线程共享同一个,从而减少ThreadLocal 数量,降低泄漏频率 |
Java 8优化 | ThreadLocalMap在set/get时自动清理脏Entry(探测式清理/启发式清理) |
4、隐式锁+显式锁+CAS无锁及原子类
具体的锁相关信息,参考:Java并发:锁
二、并发容器与工具
1、并发集合
1.1 ConcurrentHashMap
(高并发键值存储)
使用:
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.computeIfAbsent("key", k -> 0 + 1); //复合操作需使用原子方法(如 computeIfAbsent)
注意:避免大规模扩容(初始化时预估容量);迭代器可能不反应最新修改(分段遍历);
复合操作仍需要同步:例如 if (!map.containsKey(k)) map.put(k, v)
需替换为原子方法 putIfAbsent()
特性:
-
高并发:JDK 8 后采用
CAS + synchronized
锁单个桶(Node),替代 JDK 7 的分段锁。 -
无锁读:读操作完全无锁,直接访问
volatile
变量。 -
不允许
null
键/值:避免歧义(如get(key)
返回null
时无法区分是键不存在还是值为null
)。
原理:
-
桶数组 + 链表/红黑树(冲突时)。
-
写操作使用
synchronized
锁住单个桶,读操作无锁。
适用场景:
-
高并发键值存储(如缓存、计数器)。
-
替代
Hashtable
和Collections.synchronizedMap
。
1.2 CopyOnWriteArrayList
(读多写少的列表)
使用:
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item");
list.forEach(System.out::println); // 无锁遍历
注意:写操作性能差(大规模数据时复制成本高);迭代器可能读取到旧数据
特性:
-
写时复制:写操作(增、删、改)复制整个数组,保证读操作无锁。
-
弱一致性迭代:迭代器基于创建时的数据快照。
原理:
-
底层使用
volatile
数组,写操作加锁并复制新数组。 -
读操作直接访问原数组。
适用场景:读多写极少(如监听器列表、配置项)
CopyOnWriteArraySet:
内部封装 CopyOnWriteArrayList
,通过遍历检查元素唯一性,元素唯一性由 equals()
保证。用于读多写极少的集合需求。写性能差,不适用于频繁修改的场景
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("element");
1.3 ConcurrentSkipListMap/Set(有序的并发 Map/Set)
使用:
ConcurrentSkipListMap<Integer, String> sortedMap = new ConcurrentSkipListMap<>();
sortedMap.put(3, "C");
sortedMap.put(1, "A");
注意:内存开销较高(跳表的多层指针结构);适用于读多写少场景
特性:
-
有序:基于跳表(Skip List)实现,按自然顺序或自定义比较器排序。
-
无锁读:读操作完全并发。
原理:
-
跳表结构:多层链表,通过概率平衡实现高效查找(时间复杂度 O(log n))。
-
写操作使用 CAS 和少量锁。
适用场景:需要有序且高并发的键值存储(如排行榜、范围查询)
ConcurrentSkipListSet:
跳表结构,类似 ConcurrentSkipListMap
。基于 ConcurrentSkipListMap
实现,有序集合。适用于需要有序且高并发的集合(如分布式任务调度)
ConcurrentSkipListSet<Integer> sortedSet = new ConcurrentSkipListSet<>();
sortedSet.add(5);
1.4 BlockingQueue
(阻塞队列)
原理:使用 ReentrantLock
和 Condition
实现阻塞(如 put()
和 take()
)
1.4.1 ArrayBlockingQueue(有界队列--生产者-消费者模型)
原理:基于数组实现,固定容量。双锁设计(生产者和消费者使用不同的锁,提高吞吐量)。
使用:
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("task"); // 阻塞直到空间可用
String task = queue.take(); // 阻塞直到元素可用
使用场景:生产者-消费者模型(如线程池任务队列)
1.4.2 LinkedBlockingQueue(
可选有界/无界)
原理:基于链表实现,默认无界(Integer.MAX_VALUE
)
注意:无界队列可能导致内存溢出( LinkedBlockingQueue
默认无界)
1.4.3 PriorityBlockingQueue(
无界优先级队列--优先级任务调度)
元素按自然顺序或比较器排序
1.4.4 SynchronousQueue(
直接传递队列)
不存储元素,生产者必须等待消费者
1.5 ConcurrentLinkedQueue(无锁非阻塞队列)
原理:链表结构,head
和 tail
指针使用 CAS 更新
使用:
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("item");
String item = queue.poll();
注意:无阻塞语义,需自行处理队列空/满逻辑
特性:
-
无锁非阻塞队列:基于 CAS 实现。
-
高并发:适合高吞吐场景。
适用场景:高并发任务调度(如事件总线)
1.6 ConcurrentLinkedDeque(双端队列)
原理:链表结构,头尾节点独立更新,支持从两端插入和移除;无锁实现,基于 CAS
使用:
ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();
deque.addFirst("front");
deque.addLast("end");
注意:无阻塞方法,需结合其他机制实现阻塞语义
适用场景:高并发双端操作(如工作窃取算法)
2、同步工具类
2.1 CountDownLatch
(倒计时闩)
作用:等待一组线程完成指定任务后,再触发后续操作(一次性使用)
核心方法:
-
await()
:阻塞等待计数器归零。 -
countDown()
:计数器减 1。
使用:主线程等待所有子线程初始化完成
CountDownLatch latch = new CountDownLatch(3);for (int i = 0; i < 3; i++) {new Thread(() -> {// 执行初始化任务latch.countDown();}).start();
}latch.await(); // 等待所有子线程完成任务
System.out.println("所有任务完成");
适用场景:
-
服务启动时等待依赖组件初始化完成。
-
多线程任务完成后汇总结果。
2.2 CyclicBarrier
(循环屏障)
作用:让一组线程相互等待,达到同步点(屏障)后一起继续执行(可重复使用)
核心方法:
-
await()
:线程到达屏障并等待其他线程。
使用:多线程分阶段处理数据(支持回调任务)
CyclicBarrier barrier = new CyclicBarrier(3, () -> {System.out.println("所有线程已到达,执行回调任务!");
});for (int i = 0; i < 3; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 准备就绪");barrier.await();} catch (Exception e) {e.printStackTrace();}}, "线程-" + i).start();
}
输出:
线程-0 准备就绪
线程-1 准备就绪
线程-2 准备就绪
所有线程已到达,执行回调任务!
适用场景:
-
并行计算的分阶段结果合并。
-
多线程测试中的协同执行。
2.3 Semaphore
(信号量)
作用:控制同时访问特定资源的线程数量(限流)
核心方法:
-
acquire()
:获取许可(若无可选许可则阻塞)。 -
release()
:释放许可。
使用:限制数据库连接池并发数
Semaphore semaphore = new Semaphore(5); // 最大5个并发semaphore.acquire(); // 获取许可
try {// 使用数据库连接
} finally {semaphore.release(); // 释放许可
}
适用场景:
-
资源池管理(如线程池、连接池)。
-
高并发系统的限流保护。
2.4 Phaser(阶段器)
作用:用于协调多个线程分阶段执行任务。它支持动态调整参与者数量,并允许每个阶段(phase)结束后进行自定义操作(更灵活的 CyclicBarrier)
核心方法:
方法 | 说明 |
---|---|
Phaser() | 创建 Phaser,初始参与者数为 0。 |
Phaser(int parties) | 创建 Phaser,并指定初始参与者数。 |
register() | 注册一个参与者(线程)。 |
arrive() | 到达当前阶段,但不等待其他线程(非阻塞)。 |
arriveAndAwaitAdvance() | 到达当前阶段,并阻塞等待其他线程到达。 |
arriveAndDeregister() | 到达当前阶段,并注销一个参与者。 |
onAdvance(int phase, int parties) | 可重写的方法,当所有线程到达后触发,用于阶段结束时的自定义逻辑(如数据聚合)。 |
使用1:分阶段任务同步
import java.util.concurrent.Phaser;public class PhaserDemo {public static void main(String[] args) {Phaser phaser = new Phaser(3); // 初始3个参与者for (int i = 0; i < 3; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName() + " 完成阶段1");phaser.arriveAndAwaitAdvance(); // 等待所有线程完成阶段1System.out.println(Thread.currentThread().getName() + " 完成阶段2");phaser.arriveAndAwaitAdvance(); // 等待所有线程完成阶段2System.out.println(Thread.currentThread().getName() + " 退出");phaser.arriveAndDeregister(); // 注销自身}, "线程-" + i).start();}}
}
输出1:
线程-0 完成阶段1
线程-1 完成阶段1
线程-2 完成阶段1
线程-0 完成阶段2
线程-1 完成阶段2
线程-2 完成阶段2
线程-0 退出
线程-1 退出
线程-2 退出
使用2:动态注册与注销
Phaser phaser = new Phaser(1); // 主线程作为初始参与者// 阶段0:主线程准备任务
System.out.println("阶段0:主线程准备任务");// 动态添加3个子任务
for (int i = 0; i < 3; i++) {phaser.register(); // 注册新参与者new Thread(() -> {System.out.println(Thread.currentThread().getName() + " 执行任务");phaser.arriveAndDeregister(); // 执行完任务后注销}).start();
}// 主线程等待所有子任务完成
phaser.arriveAndAwaitAdvance();
System.out.println("所有子任务完成,进入阶段1");
输出2:
阶段0:主线程准备任务
Thread-0 执行任务
Thread-1 执行任务
Thread-2 执行任务
所有子任务完成,进入阶段1
适用场景:
-
复杂多阶段任务(如游戏 AI 的回合制逻辑)。
-
动态增减参与线程的任务协同。
2.5 Exchanger(交换器)
作用:两个线程在同步点交换数据(仅限成对线程)
核心方法:
exchange(T data)
:阻塞等待另一线程到达并交换数据
使用:生产者-消费者数据交换
Exchanger<String> exchanger = new Exchanger<>();new Thread(() -> {String data = "生产的数据";String received = exchanger.exchange(data);
}).start();new Thread(() -> {String data = "消费的数据";String received = exchanger.exchange(data);
}).start();
适用场景:
-
线程间数据交换(如流水线处理)。
-
双缓冲区的数据传递。
2.6 Future 与 CompletableFuture(异步任务处理)
2.6.1 Future(阻塞)
Future 是 Java 并发编程中用于表示异步任务结果的核心接口,其核心作用是为开发者提供一种非阻塞的异步任务管理机制。
作用:
1> 异步提交:允许将耗时任务(如网络请求、复杂计算、I/O操作)提交到线程池中执行,主线程无需等待结果,继续执行其他逻辑。
2> 结果延迟获取:通过 get()
方法在需要时获取任务结果,若任务未完成,线程可选择阻塞等待或超时处理。
3> 任务状态监控:提供 isDone()
、isCancelled()
等方法,实时检查任务执行状态
4> 任务取消控制:支持通过 cancel()
方法中断正在执行的任务或阻止未启动的任务运行
核心方法:
方法 | 作用 |
---|---|
V get() | 阻塞等待任务完成并返回结果。 |
V get(long timeout, TimeUnit unit) | 带超时的阻塞等待,超时后抛出 TimeoutException 。注意:超时后,需要手动取消其他任务(cancel(true) ),释放资源 |
boolean isDone() | 检查任务是否完成(完成、取消或异常均返回 true )。 |
boolean cancel(boolean mayInterrupt) | 尝试取消任务。mayInterrupt=true 表示允许中断正在执行的任务。false :仅取消未启动的任务。 |
使用1:在电商系统中,用户查看订单详情页时,需要同时获取订单基本信息、商品库存状态和物流信息。为提高响应速度,通过多线程并行查询这三个数据源,最后聚合结果返回给用户
import java.util.concurrent.*;public class OrderDetailService {// 创建固定线程池(实际场景建议使用 ThreadPoolExecutor 自定义核心参数,如队列大小、拒绝策略)private static final ExecutorService executor = Executors.newFixedThreadPool(3);public static void main(String[] args) {// 提交三个并行任务Future<String> orderInfoFuture = executor.submit(() -> fetchOrderInfo("1001"));Future<Integer> stockFuture = executor.submit(() -> fetchStock("SKU-123"));Future<String> logisticsFuture = executor.submit(() -> fetchLogistics("LP-456"));try {// 等待所有任务完成(最大等待3秒)String orderInfo = orderInfoFuture.get(3, TimeUnit.SECONDS);Integer stock = stockFuture.get(3, TimeUnit.SECONDS);String logistics = logisticsFuture.get(3, TimeUnit.SECONDS);// 聚合结果String result = String.format("订单信息:%s,库存:%d,物流:%s", orderInfo, stock, logistics);System.out.println(result);} catch (TimeoutException e) {// 处理超时:取消未完成的任务orderInfoFuture.cancel(true);stockFuture.cancel(true);logisticsFuture.cancel(true);System.out.println("部分数据获取超时,请重试");} catch (InterruptedException | ExecutionException e) {// InterruptedException:线程被中断(如服务关闭),中断线程可能导致数据不一致(如数据库事务未提交),需结合业务逻辑处理// ExecutionException:任务执行时抛出异常(如远程调用失败)System.out.println("数据获取异常:" + e.getMessage());} finally {executor.shutdown(); // 关闭线程池,防止资源泄漏}}// 模拟远程调用:获取订单信息private static String fetchOrderInfo(String orderId) throws InterruptedException {Thread.sleep(1000); // 模拟耗时1秒return "订单号:" + orderId;}// 模拟远程调用:获取商品库存private static Integer fetchStock(String sku) throws InterruptedException {Thread.sleep(1500); // 模拟耗时1.5秒return 50;}// 模拟远程调用:获取物流信息private static String fetchLogistics(String logisticsId) throws InterruptedException {Thread.sleep(2000); // 模拟耗时2秒return "物流单号:" + logisticsId;}
}
使用2:超时熔断,调用外部服务时设置超时,避免长时间阻塞导致系统雪崩
Future<String> future = executor.submit(() -> callExternalService());try {String result = future.get(2, TimeUnit.SECONDS); // 最多等待2秒
} catch (TimeoutException e) {future.cancel(true); // 超时后取消任务System.out.println("调用超时,已取消任务");
}
适用场景:
1> 并行远程调用:同时请求多个外部服务(如订单、库存、物流)。
2> 批量数据处理:分片处理大数据集,最后合并结果。
3> 超时熔断:快速失败(Fast Fail)机制,避免系统雪崩。
4> 定时任务监控:检查异步任务是否按时完成。
缺点:
1> 阻塞式获取结果:get()
方法会阻塞线程,若任务未完成,可能导致线程资源浪费
2> 无法链式处理:不支持任务完成后的回调(如结果处理、异常处理),需手动编写逻辑
3> 组合能力弱:难以实现多个任务的依赖组合(如任务A完成后触发任务B)
2.6.2 CompletableFuture(非阻塞)
CompletableFuture
是 Java 8 引入的异步编程工具,是 Future
的增强版,旨在解决 Future
的局限性。其核心作用包括:
1> 异步任务管理:提交任务并异步获取结果,避免阻塞主线程。
2> 链式调用:支持任务流水线处理(如结果转换、异常处理、任务组合)。
3> 任务组合:合并多个异步任务的结果(如并行执行后聚合)。
4> 手动控制:允许手动完成任务或抛出异常。
5> 线程池灵活控制:可指定自定义线程池,优化资源使用。
核心方法:
1> 创建异步任务
-
supplyAsync()
:异步执行有返回值的任务。 -
runAsync()
:异步执行无返回值的任务。 -
指定线程池:默认使用
ForkJoinPool.commonPool()
,可自定义Executor
。
2> 结果处理
-
thenApply()
:同步处理结果,返回新值。 -
thenApplyAsync()
:异步处理结果(使用默认或自定义线程池)。 -
thenAccept()
:消费结果(无返回值)。 -
thenRun()
:任务完成后执行操作,不依赖结果。
3> 异常处理
-
exceptionally()
:捕获异常并返回默认值。 -
handle()
:无论成功与否,统一处理结果和异常。 -
whenComplete()
:类似handle
,但不修改结果,仅用于副作用(如日志记录)。
4> 组合多个任务
-
thenCompose()
:串联两个任务(扁平化)。 -
thenCombine()
:合并两个任务的结果。 -
allOf()
:等待所有任务完成。 -
anyOf()
:等待任意一个任务完成。
5> 手动控制
-
complete()
:手动完成任务。 -
completeExceptionally()
:手动抛出异常,标记任务失败。
6> 超时处理(Java 9+)
-
orTimeout(long timeout, TimeUnit unit):
设置超时时间,超时后抛出TimeoutException
-
completeOnTimeout(T value, long timeout, TimeUnit unit)
:超时后返回默认值。
使用1:电商订单异步处理
步骤:
1> 校验库存(远程调用库存服务)
2> 生成订单(数据库操作)
3> 发送通知(短信、邮件,可并行)
4> 记录日志(异步落盘)
// 自定义线程池(I/O 密集型任务使用更大的线程数)
ExecutorService ioExecutor = Executors.newFixedThreadPool(8);// 1. 异步校验库存
CompletableFuture<Boolean> checkStockFuture = CompletableFuture.supplyAsync(() -> {return inventoryService.checkStock(order.getProductId(), order.getQuantity());
}, ioExecutor);// 2. 库存充足后生成订单
CompletableFuture<Order> createOrderFuture = checkStockFuture.thenComposeAsync(isStockSufficient -> {if (!isStockSufficient) {throw new InsufficientStockException("库存不足");}return CompletableFuture.supplyAsync(() -> orderService.create(order), ioExecutor);
}, ioExecutor);// 3. 并行发送通知
CompletableFuture<Void> sendSmsFuture = createOrderFuture.thenAcceptAsync(order -> {smsService.send(order.getUserId(), "订单创建成功");
}, ioExecutor);CompletableFuture<Void> sendEmailFuture = createOrderFuture.thenAcceptAsync(order -> {emailService.send(order.getUserId(), "订单确认邮件");
}, ioExecutor);// 4. 合并通知任务,记录日志(任意通知完成即可记录)
CompletableFuture<Void> allNotifications = CompletableFuture.allOf(sendSmsFuture, sendEmailFuture);
allNotifications.thenRunAsync(() -> logService.log(order), ioExecutor);// 5. 异常统一处理
createOrderFuture.exceptionally(ex -> {if (ex instanceof InsufficientStockException) {notifyUser("库存不足,订单失败");} else {notifyUser("系统错误,请重试");}return null;
});
使用2:订单接口直接返回,异步发送短信
@RestController
public class OrderController {private final OrderService orderService;private final SmsService smsService;private final Executor asyncExecutor; // 自定义线程池// 初始化服务与线程池public OrderController(OrderService orderService, SmsService smsService) {this.orderService = orderService;this.smsService = smsService;// 创建专用的异步线程池(I/O 密集型任务建议使用较大的线程数)this.asyncExecutor = Executors.newFixedThreadPool(8);}@PostMapping("/order")public ResponseEntity<String> createOrder(@RequestBody OrderRequest request) {// 1. 同步处理订单(主流程)Order order = orderService.createOrder(request);// 2. 异步发送短信(不阻塞主流程)CompletableFuture.runAsync(() -> {try {smsService.sendSms(order.getUserPhone(), "您的订单已创建,订单号:" + order.getId());} catch (Exception e) {// 异常处理:记录日志,但不影响主流程log.error("短信发送失败,订单号:{}", order.getId(), e);}}, asyncExecutor); // 指定自定义线程池// 3. 立即返回响应return ResponseEntity.ok("订单创建成功,订单号:" + order.getId());}
}
注意事项:
1> 线程池选择
-
CPU 密集型:线程数 ≈ CPU 核数。
-
I/O 密集型:线程数 ≈ (任务等待时间 / 任务总时间) * CPU 核数 * 目标利用率(通常 0.8~0.9)。
-
使用
CompletableFuture
时显式指定线程池,避免污染公共池。
2> 避免阻塞主线程:使用 join()
或 get()
时需谨慎,必要时设置超时;不要在 CompletableFuture
链中执行阻塞代码(如同步 I/O),否则可能拖慢整个线程池
3> 资源泄漏:确保在所有分支中关闭线程池
4> 异常处理
-
始终处理可能的异常(
exceptionally()
或handle()
),防止静默失败。 -
避免在异步任务中抛出未检查异常,需显式捕获。
5> 循环引用问题:避免在回调中嵌套创建新的 CompletableFuture
,导致内存泄漏
6> 调试技巧:使用 thenApplyAsync
时附加日志,追踪任务链路:
.thenApplyAsync(data -> {log.debug("处理数据: {}", data);return process(data);
})
适用场景:
-
并行调用多个服务:同时请求订单、库存、物流接口,聚合结果后返回
-
批量任务处理:使用
allOf()
等待所有子任务完成(如批量文件上传) -
分阶段数据处理:先查询数据,再处理数据,最后保存结果
-
异步流水线操作:如订单创建后异步发送通知、更新缓存
-
超时与熔断控制:结合
orTimeout()
和completeOnTimeout()
实现超时降级
Future与 CompletableFuture
的对比
特性 | Future | CompletableFuture |
---|---|---|
结果获取 | 阻塞式(get() ) | 非阻塞式(thenApply() 、thenAccept() ) |
任务组合 | 不支持 | 支持链式调用、allOf() 、anyOf() |
异常处理 | 需手动捕获 ExecutionException | 内置方法(exceptionally() 、handle() ) |
手动完成任务 | 不支持 | 支持(complete() 、completeExceptionally() ) |
2.7 Lock与Condition(多条件变量锁)
作用:ReentrantLock
可以创建多个 Condition
对象,实现更精细的线程等待与唤醒机制(替代wait()
/notify()
的更灵活方式)
优势:通过多个 Condition
分离等待条件,避免无效唤醒
使用:生产者-消费者模型
ReentrantLock lock = new ReentrantLock();
Condition notEmpty = lock.newCondition();
Condition notFull = lock.newCondition();// 生产者
lock.lock();
try {while (queue.isFull()) {notFull.await();}queue.add(data);notEmpty.signal();
} finally {lock.unlock();
}
详细见:Java并发:锁 2.1.1.3 多条件变量(Condition)
适用场景
-
复杂条件同步(如多条件等待队列)。
-
需要可中断或超时控制的锁操作。
2.8 BlockingQueue(阻塞队列)
作用:线程安全的队列,支持阻塞插入/移除操作
常用实现:
-
ArrayBlockingQueue
:有界数组队列。 -
LinkedBlockingQueue
:可选有界链表队列。 -
PriorityBlockingQueue
:优先级阻塞队列。
使用:任务调度队列
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
queue.put(task); // 阻塞插入
Runnable task = queue.take(); // 阻塞获取
适用场景:
-
生产者-消费者任务队列。
-
线程池任务调度。
2.9 Fork/Join 框架(并行任务处理)
核心思想:Fork/Join框架基于分而治之(Divide and Conquer),将大任务递归拆分为子任务(Fork)并行执行,最终合并结果(Join)。例如归并排序、数组求和等场景均适用此模型
核心组件:
类/接口 | 作用 |
---|---|
ForkJoinPool | 管理线程的专用线程池(默认线程数=CPU核数) |
RecursiveTask<V> | 有返回值的任务(需实现compute() 方法) |
RecursiveAction | 无返回值的任务(需实现compute() 方法) |
ForkJoinWorkerThread | 框架内部使用的优化线程 |
使用示例:计算1~n的和(RecursiveTask实现)
class SumTask extends RecursiveTask<Long> {private final int start;private final int end;private static final int THRESHOLD = 10000; // 任务拆分阈值SumTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Long compute() {if (end - start <= THRESHOLD) {// 直接计算小任务long sum = 0;for (int i = start; i <= end; i++) sum += i;return sum;} else {// 拆分任务int mid = (start + end) / 2;SumTask left = new SumTask(start, mid);SumTask right = new SumTask(mid + 1, end);left.fork(); // 异步执行左任务long rightResult = right.compute(); // 同步计算右任务long leftResult = left.join(); // 等待左任务结果return leftResult + rightResult;}}
}// 使用方式
ForkJoinPool pool = ForkJoinPool.commonPool();
long result = pool.invoke(new SumTask(1, 1000000));
工作原理:
-
工作窃取算法:每个线程优先处理自己队列的头部任务,空闲时从其他队列尾部窃取任务,减少线程竞争,提升负载均衡
-
递归任务拆分:在
compute()
方法中判断任务是否足够小(通过阈值THRESHOLD
),若超过阈值则继续拆分,否则直接计算
适用场景:
- 计算密集型任务:如大规模数组运算、矩阵乘法、归并排序等。
- 可递归拆分的任务:例如斐波那契数列、图像分块处理等。
- 大数据处理:分片处理日志或数据集,提高并行效率。