JUC之CompletionService
文章目录
- 一、为什么需要 CompletionService?
- 1.1 概述
- 1.2 CompletionService要解决的痛点
- 二、CompletionService 的核心机制
- 2.1 接口定义
- 2.2 接口实现
- 2.3 工作原理及流程
- 2.4 实现原理
- 三、使用示例
- 3.1 基本用法(对比 ExecutorService)
- 3.2 处理超时与异常
- 3.3 竞态场景 - 同时处理结果并取消剩余任务
- 3.4 网页内容抓取(谁快谁先处理)
- 3.5 批量任务处理与结果合并
- 四、总结与最佳实践
- 4.1 优点
- 4.2 缺点与注意事项
- 4.3 适用场景
- 4.4 选择合适队列
- 4.5 异常处理
- 4.6 资源管理
- 4.7 替代方案【重要】
- 4.8 CompletionService 与其他相关类的对比
- 4.9 总结
一、为什么需要 CompletionService?
1.1 概述
在并发编程中,我们经常提交一组任务到线程池,然后等待所有任务完成并处理它们的结果。使用标准的 ExecutorService
,我们可能会这样做:
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<Result>> futures = new ArrayList<>();// 提交一批任务
for (Callable<Result> task : tasks) {futures.add(executor.submit(task));
}// 获取结果 - 这种方式有问题!
for (Future<Result> future : futures) {Result result = future.get(); // 问题所在:会阻塞,必须按提交顺序获取// 处理 result
}
使用 ExecutorService
提交多个 Callable
任务时,通常有两种等待方式:
invokeAll()
:等待所有任务完成,按提交顺序返回结果。- 手动轮询
Future.get()
:需要自己管理Future
集合,效率低。
但如果我们希望 “哪个任务先执行完,就先处理哪个结果”(即“完成优先”),传统方式难以高效实现。
此种方法的核心问题:
- 阻塞性:
future.get()
是阻塞调用。如果列表中的第一个任务耗时很长,即使后面的任务早就完成了,你也无法处理它们的结果,必须等待第一个任务完成。 - 顺序性: 你只能按照任务提交的顺序(即
Future
在列表中的顺序)来获取结果,无法优先处理那些先完成的任务的结果。
1.2 CompletionService要解决的痛点
它解耦了“任务提交”与“结果完成”的顺序。你可以在任务完成的时刻就立刻处理其结果,而不必等待之前提交的耗时任务。
它就像一个消息队列,生产者是已完成的任务,消费者是任何调用 take()
或 poll()
方法的线程。 completed tasks are placed on a queue.
二、CompletionService 的核心机制
2.1 接口定义
java.util.concurrent.CompletionService<V>
接口非常简单,只定义了四个核心方法:
public interface CompletionService<V> {// 1. 提交一个任务以供执行Future<V> submit(Callable<V> task);Future<V> submit(Runnable task, V result); // Runnable任务可通过result预设返回值// 2. 获取并移除下一个已完成任务对应的Future。// 如果没有任何任务完成,则阻塞等待。Future<V> take() throws InterruptedException;// 3. 获取并移除下一个已完成任务对应的Future。// 如果没有任何任务完成,则立即返回null。不会阻塞。Future<V> poll();// 4. 带超时时间的poll操作。// 在指定的时间内等待任务完成,超时后仍无结果则返回null。Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
方法 | 作用 |
---|---|
submit(Callable<V> task) | 提交一个带返回值的任务 |
submit(Runnable task, V result) | 提交一个 Runnable 任务,并指定返回结果 |
take() | 获取并移除下一个已完成任务的 Future,若没有则阻塞 |
poll() | 获取并移除下一个已完成任务的 Future,若没有则返回 null |
poll(long timeout, TimeUnit unit) | 在指定时间内获取并移除下一个已完成任务的 Future |
✅ 核心思想:
- 生产者:提交任务(
submit
) - 消费者:获取已完成任务的结果(
take
/poll
) - 中间队列:内部维护一个
BlockingQueue<Future<V>>
,存放已完成的Future
2.2 接口实现
唯一的标准实现:ExecutorCompletionService
ExecutorCompletionService
是 JUC 提供的唯一实现。它的构造器需要两个东西:
Executor executor
: 真正负责执行任务的底层执行器(如线程池)。BlockingQueue<Future<V>> completionQueue
(可选): 用于存储已完成任务结果的队列。如果未指定,默认使用LinkedBlockingQueue
。
public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {if (executor == null || completionQueue == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = completionQueue;
}
2.3 工作原理及流程
- 流程处理
- 工作原理
✅ 关键点:任务的执行线程在完成任务后,会自动将 Future
放入完成队列,消费者线程通过 take()
按完成顺序获取。
- 组成部分
2.4 实现原理
ExecutorCompletionService
并没有自己执行任务的能力,它只是对 Executor
的一层包装。其魔力在于它使用了一个自定义的 QueueingFuture
,它继承了 FutureTask
并重写了 done()
方法。
public class ExecutorCompletionService<V> implements CompletionService<V> {private final Executor executor;private final BlockingQueue<Future<V>> completionQueue;// 核心内部类private class QueueingFuture extends FutureTask<Void> {private final Future<V> task;QueueingFuture(Future<V> task) {// ... 构造器}// 关键方法:当任务完成(无论是正常结束还是异常)时,JVM会自动调用此方法protected void done() {completionQueue.add(task); // 将已完成的任务加入到结果队列中}}// 提交任务时的包装过程public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task); // 创建一个FutureTaskexecutor.execute(new QueueingFuture(f)); // 用QueueingFuture包装后提交给Executorreturn f;}private RunnableFuture<V> newTaskFor(Callable<V> task) {if (aes == null)return new FutureTask<V>(task);elsereturn aes.newTaskFor(task);}private RunnableFuture<V> newTaskFor(Runnable task, V result) {if (aes == null)return new FutureTask<V>(task, result);elsereturn aes.newTaskFor(task, result);}// ... take, poll 等方法直接委托给 completionQueue
}
- 提交任务: 将用户提交的
Callable
包装成一个标准的FutureTask
。 - 二次包装: 再将这个
FutureTask
包装到一个自定义的QueueingFuture
中。 - 执行: 将
QueueingFuture
提交给底层的Executor
。 - 钩子触发: 当
FutureTask
执行完毕(run()
方法结束),JVM 会自动调用其done()
方法。 - 入队:
QueueingFuture
重写的done()
方法会将已完成的任务(即原始的FutureTask
)放入构造时传入的completionQueue
中。 - 消费: 用户调用
take()
或poll()
时,实际上是从这个completionQueue
中获取已完成的任务。
✅ 这就是“自动完成通知”的核心
简化后的核心源码
// QueueingFuture 是 FutureTask 的子类
private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}// 任务完成时会调用此方法protected void done() {// 将任务结果放入阻塞队列completionQueue.add(task);}private final Future<V> task;
}// 提交任务的实现
public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);// 包装成 QueueingFuture 提交给执行器executor.execute(new QueueingFuture(f));return f;
}// 获取结果的实现
public Future<V> take() throws InterruptedException {return completionQueue.take();
}
三、使用示例
3.1 基本用法(对比 ExecutorService)
假设我们有一批模拟的网络请求任务,每个任务耗时随机。
package cn.tcmeta.usecompletionservice;import java.util.concurrent.*;
import java.util.Random;public class CompletionServiceDemo {// 模拟一个耗时的网络请求任务static class NetworkRequestTask implements Callable<String> {private final int taskId;private final int latency; // 模拟网络延迟(毫秒)public NetworkRequestTask(int taskId, int latency) {this.taskId = taskId;this.latency = latency;}@Overridepublic String call() throws Exception {// 模拟工作耗时Thread.sleep(latency);String result = "Result from Task-" + taskId + " (took " + latency + "ms)";System.out.println(Thread.currentThread().getName() + " completed: " + result);return result;}}public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletionService<String> completionService = new ExecutorCompletionService<>(threadPool);Random random = new Random();// 提交10个任务,每个任务的延迟是随机的(0-4秒)System.out.println("Submitting 10 tasks with random latency...");for (int i = 1; i <= 10; i++) {int latency = random.nextInt(4000);completionService.submit(new NetworkRequestTask(i, latency));}System.out.println("\n--- Retrieving results in completion order ---");// 获取10个结果for (int i = 0; i < 10; i++) {// take() 会阻塞直到有完成的任务Future<String> completedFuture = completionService.take();String result = completedFuture.get(); // 此时get()会立即返回,因为任务已完成System.out.println("Retrieved: " + result);}threadPool.shutdown();threadPool.awaitTermination(5, TimeUnit.SECONDS);}
}
运行输出(结果顺序每次不同,但总是快的先出来):
3.2 处理超时与异常
在实际生产中,我们不仅要处理正常结果,还要优雅地处理超时和任务执行异常。
package cn.tcmeta.usecompletionservice;import java.util.concurrent.*;public class RobustCompletionServiceDemo {static class MixedTask implements Callable<String> {private final String name;private final boolean shouldFail;MixedTask(String name, boolean shouldFail) {this.name = name;this.shouldFail = shouldFail;}@Overridepublic String call() throws Exception {Thread.sleep(1000); // 模拟工作if (shouldFail) {throw new RuntimeException("Simulated exception in " + name);}return "Success: " + name;}}public static void main(String[] args) {ExecutorService pool = Executors.newCachedThreadPool( r -> new Thread(r, "tc-thread-"));CompletionService<String> cs = new ExecutorCompletionService<>(pool);// 提交一组混合任务(有的成功,有的失败)cs.submit(new MixedTask("Task-A", false));cs.submit(new MixedTask("Task-B", true)); // 这个任务会抛出异常cs.submit(new MixedTask("Task-C", false));cs.submit(new MixedTask("Task-D", true)); // 这个任务也会抛出异常int completedCount = 0;int taskCount = 4;// 使用 poll 进行超时控制,避免无限等待while (completedCount < taskCount) {try {// 等待最多2秒获取一个完成的任务Future<String> future = cs.poll(2, TimeUnit.SECONDS);if (future == null) {System.out.println("Poll timeout, no task completed in last 2 seconds. Checking again...");// 在实际场景中,这里可以添加中断或结束循环的逻辑continue;}completedCount++;try {// get() 现在不会阻塞,因为future已经完成// 但如果任务异常,get() 会抛出 ExecutionExceptionString result = future.get();System.out.println("Result: " + result);} catch (ExecutionException e) {// 捕获任务执行过程中抛出的原始异常Throwable cause = e.getCause();System.err.println("Task failed with cause: " + cause.getMessage());// 根据异常类型进行精细化的错误处理}} catch (InterruptedException e) {System.err.println("Interrupted while waiting for result.");Thread.currentThread().interrupt(); // 恢复中断状态break; // 优雅退出}}pool.shutdownNow();}
}
这个示例展示了如何:
- 使用
poll(long timeout, TimeUnit unit)
避免无限期阻塞。 - 使用
try-catch
捕获ExecutionException
来处理任务自身的异常。 - 正确处理线程中断。
3.3 竞态场景 - 同时处理结果并取消剩余任务
一个常见的场景是:只要取得第一个有效结果,就立即取消所有其他任务以节省资源。
package cn.tcmeta.usecompletionservice;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;public class RaceConditionDemo {static class SearchTask implements Callable<String> {private final String engine;private final String query;SearchTask(String engine, String query) {this.engine = engine;this.query = query;}@Overridepublic String call() throws Exception {// 模拟不同搜索引擎的响应时间int latency = switch (engine) {case "Google" -> 800;case "Bing" -> 500;case "DuckDuckGo" -> 1200;default -> 1000;};Thread.sleep(latency);// 模拟其中一个引擎没有结果if ("Bing".equals(engine)) {return null; // Bing 没找到}return engine + " found: [" + query + "] in " + latency + "ms";}}public static void main(String[] args) throws InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletionService<String> cs = new ExecutorCompletionService<>(pool);String query = "Java CompletionService";List<Future<String>> futures = new ArrayList<>();// 向多个“搜索引擎”提交搜索任务futures.add(cs.submit(new SearchTask("Google", query)));futures.add(cs.submit(new SearchTask("Bing", query)));futures.add(cs.submit(new SearchTask("DuckDuckGo", query)));String firstValidResult = null;try {for (int i = 0; i < 3; i++) {Future<String> future = cs.take(); // 等待下一个任务完成String result = future.get();if (result != null) {firstValidResult = result;break; // 找到第一个有效结果,跳出循环}// 如果结果为null(比如Bing),继续等待下一个结果}} catch (ExecutionException e) {e.printStackTrace();} finally {// 无论是否找到结果,取消所有尚未完成的任务for (Future<String> f : futures) {// 参数 true 表示中断正在执行的任务// 对于能处理中断的任务,这能立即停止它们;否则只是取消未开始的任务f.cancel(true);}}if (firstValidResult != null) {System.out.println("First valid result: " + firstValidResult);} else {System.out.println("No valid result found from any engine.");}pool.shutdown();}
}
这个示例演示了经典的“竞态”模式:利用 CompletionService
快速获取第一个有效结果,并通过保留 Future
列表来取消所有后台任务,极大提升系统响应速度和资源利用率。
3.4 网页内容抓取(谁快谁先处理)
假设我们要从多个 URL 并行抓取网页内容,只要有一个返回,就立即处理(如搜索引擎的“最快响应”策略)
3.5 批量任务处理与结果合并
在需要处理大量任务并合并结果的场景中,CompletionService
非常有用:
package cn.tcmeta.usecompletionservice;import java.util.*;
import java.util.concurrent.*;public class CompletionServiceBatchDemo {public static void main(String[] args) throws InterruptedException, ExecutionException {// 模拟一批需要处理的数据List<String> dataList = Arrays.asList("数据1", "数据2", "数据3", "数据4", "数据5");ExecutorService executor = Executors.newFixedThreadPool(2, new ThreadFactory() {private int threadNumber = 1;@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("tc-custom-pool-worker-" + threadNumber++); // 自定义名称thread.setDaemon(false); // 非守护线程return thread;}});CompletionService<String> completionService = new ExecutorCompletionService<>(executor);// 提交所有任务for (String data : dataList) {completionService.submit(() -> {// 模拟数据处理Thread.sleep(new Random().nextInt(2000));System.out.printf("线程名称: 【%s】 , msg: %s \n", Thread.currentThread().getName(), " 开始处理任务了 ~~~");return "处理结果: " + data;});}// 收集结果List<String> results = new ArrayList<>();for (int i = 0; i < dataList.size(); i++) {Future<String> future = completionService.take();results.add(future.get());}// 输出所有结果System.out.println("所有处理结果:");for (String result : results) {System.out.println(result);}executor.shutdown();}
}
四、总结与最佳实践
4.1 优点
- 结果响应性: 极大地提高了结果处理的响应速度,先完成的任务先被处理。
- 资源效率: 如上例所示,可以快速取得有效结果并释放资源。
- 简化逻辑: 将“生产者”(任务执行)和“消费者”(结果处理)清晰地分离开。
4.2 缺点与注意事项
- 失去顺序: 你无法知道结果对应的是哪个提交的任务,除非结果本身包含任务ID(如示例中的
taskId
)。 - 异常处理: 必须小心处理
ExecutionException
,否则异常容易被吞没。 - 资源管理: 务必确保最终能调用
take()
或poll()
足够次数来清空完成队列,否则可能导致持有已完成任务的引用,无法被GC回收,造成内存泄漏。
4.3 适用场景
- 聚合操作: 从多个微服务或数据源获取数据,谁先返回就用谁。
- 超时控制: 同时发起多个相同功能的请求(如查询多个镜像源),取第一个成功的。
- 批量任务处理: 处理大量独立任务,并希望尽快看到进展和结果。
场景 | 说明 |
---|---|
并行搜索 | 多个数据源查询,返回第一个可用结果。 |
冗余调用(Redundant Calls) | 同时调用多个副本,取最快响应,提升系统可用性。 |
批量任务监控 | 实时监控任务进度,无需等待全部完成。 |
流水线处理 | 上游任务完成后立即触发下游处理。 |
4.4 选择合适队列
- 默认
LinkedBlockingQueue
:无界,可能 OOM。 - 生产环境建议使用有界队列:
BlockingQueue<Future<String>> queue = new ArrayBlockingQueue<>(10);
CompletionService<String> cs = new ExecutorCompletionService<>(executor, queue);
4.5 异常处理
try {Future<String> future = completionService.take();String result = future.get(); // ExecutionException 可能被抛出
} catch (ExecutionException e) {Throwable cause = e.getCause();if (cause instanceof IOException) {System.err.println("网络错误: " + cause.getMessage());}
}
4.6 资源管理
- 务必在 finally 块中
shutdown()
线程池。 - 如果提前退出循环,考虑取消剩余任务:
// 取消所有未完成的任务
while ((future = completionService.poll()) != null) {future.cancel(true);
}
4.7 替代方案【重要】
在 Java 8+ 中,CompletableFuture
提供了更强大和灵活的异步编程能力(如组合、链式调用),可以实现类似 CompletionService
“先完成先处理”的模式(例如使用 anyOf()
),但 CompletionService
在简单的批量任务结果收集场景中依然非常直观和高效。
4.8 CompletionService 与其他相关类的对比
特性 | CompletionService | ExecutorService + Future | ForkJoinPool |
---|---|---|---|
结果获取方式 | 按完成顺序 | 按提交顺序 | 工作窃取算法 |
阻塞行为 | take () 阻塞直到有结果 | get () 阻塞直到特定任务完成 | join () 阻塞直到任务完成 |
适用场景 | 多任务异步执行,需要按完成顺序处理结果 | 简单的异步任务执行 | 可分解的大型任务 |
复杂度 | 中等 | 简单到中等 | 较高 |
4.9 总结
特性 | 说明 |
---|---|
核心价值 | 实现“完成优先”的任务结果处理模式 |
关键方法 | submit() 提交任务,take() 获取最先完成的 Future |
底层机制 | 通过 FutureTask.done() 钩子 + 阻塞队列实现自动通知 |
性能优势 | 减少等待时间,提高系统响应速度 |
使用建议 | 优先于 invokeAll() 处理可独立完成的任务 |
💡 一句话总结:
CompletionService
是 ExecutorService
的“结果增强版”**,它让你不再关心任务的提交顺序,只关注“**谁先跑完,就先处理谁”,是实现高响应性并发程序的利器。
✅ 推荐使用场景:当你需要处理多个异步任务,且希望尽快处理已完成的任务结果时,CompletionService
应该是你的首选方案。