当前位置: 首页 > news >正文

深入解析FutureTask:原理与实战

我们来深入解析 FutureTask。下面将从它们的用法开始,逐步深入到底层实现、方法和接口设计,并探讨它们对于我们自己设计实现以及其他编程实践的学习意义。

主要就是放入等待队列(CAS操作一个链表头),完成任务的线程唤醒等待线程继续逻辑。

FutureTask 深入解析

FutureTask 是一个可取消的异步计算任务。它实现了 RunnableFuture 接口,该接口同时继承了 Runnable 和 Future 接口。这意味着 FutureTask 既可以作为一个 Runnable 被执行器(如 ExecutorService)执行,也可以作为一个 Future 来获取异步计算的结果、查询计算状态或取消计算。

FutureTask 通常用于包装一个 Callable 或 Runnable 对象。

基本用法示例:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;public class FutureTaskExample {public static void main(String[] args) throws Exception {// 1. 创建 Callable 任务Callable<String> callableTask = () -> {System.out.println(Thread.currentThread().getName() + " is executing callable task...");TimeUnit.SECONDS.sleep(2); // 模拟耗时操作return "Callable Result";};// 2. 使用 Callable 创建 FutureTaskFutureTask<String> futureTask1 = new FutureTask<>(callableTask);// 也可以使用 Runnable 创建 FutureTask (通常需要提供一个结果,若无则为 null)Runnable runnableTask = () -> {System.out.println(Thread.currentThread().getName() + " is executing runnable task...");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {Thread.currentThread().interrupt();}};FutureTask<String> futureTask2 = new FutureTask<>(runnableTask, "Runnable Result (if provided)");// 3. 提交 FutureTask 到 ExecutorService 执行// FutureTask 本身就是 Runnable,可以直接被线程执行或提交给 ExecutorServiceExecutorService executor = Executors.newFixedThreadPool(2);executor.submit(futureTask1);executor.submit(futureTask2);// 或者直接 new Thread(futureTask1).start();System.out.println("Tasks submitted.");// 4. 获取结果 (get() 方法会阻塞直到任务完成)try {System.out.println("Waiting for futureTask1 result...");String result1 = futureTask1.get(); // 阻塞等待System.out.println("futureTask1 result: " + result1);System.out.println("Waiting for futureTask2 result with timeout...");// get(long timeout, TimeUnit unit) 可以设置超时String result2 = futureTask2.get(2, TimeUnit.SECONDS);System.out.println("futureTask2 result: " + result2);} catch (InterruptedException e) {System.err.println("Task interrupted: " + e.getMessage());} catch (java.util.concurrent.ExecutionException e) {System.err.println("Task execution failed: " + e.getCause());} catch (java.util.concurrent.TimeoutException e) {System.err.println("Task timed out: " + e.getMessage());}// 5. 检查任务状态和取消if (!futureTask1.isDone()) {System.out.println("futureTask1 is not done yet.");}if (futureTask1.isCancelled()) {System.out.println("futureTask1 was cancelled.");}// 尝试取消一个未完成的任务FutureTask<Integer> cancellableTask = new FutureTask<>(() -> {TimeUnit.SECONDS.sleep(5);return 100;});new Thread(cancellableTask).start();Thread.sleep(100); // 给任务一点时间启动boolean cancelled = cancellableTask.cancel(true); // true 表示如果任务正在运行,则中断它System.out.println("CancellableTask cancelled: " + cancelled);System.out.println("CancellableTask isCancelled: " + cancellableTask.isCancelled());System.out.println("CancellableTask isDone: " + cancellableTask.isDone()); // cancel 后 isDone() 也为 trueexecutor.shutdown();}
}

FutureTask 是 Java 并发包中一个非常核心的类,它代表一个可取消的异步计算。它巧妙地结合了 Future 接口(用于获取异步计算的结果)和 Runnable 接口(使得它可以被 Executor 执行)。

1. 状态管理 (State Management)

FutureTask 内部维护一个 volatile int state 字段来表示任务的当前状态。状态包括:

  • NEW: 初始状态,任务尚未开始或正在运行。

  • COMPLETING: 任务已完成,正在设置结果(一个短暂的中间状态)。

  • NORMAL: 任务正常完成,结果已设置。

  • EXCEPTIONAL: 任务因抛出异常而完成,异常已设置。

  • CANCELLED: 任务被取消(在开始运行前)。

  • INTERRUPTING: 任务被取消,并且正在尝试中断运行任务的线程(一个短暂的中间状态)。

  • INTERRUPTED: 任务被取消,并且运行任务的线程已被中断。

状态之间的转换通过 CAS (Compare-And-Set) 操作(使用 VarHandle STATE)来保证原子性。

2. 任务执行 (run() 方法)

FutureTaskrun() 方法被调用时(通常由一个 Executor 的工作线程调用):

  1. 首先会通过 CAS 操作尝试将 runner 字段(volatile Thread runner)从 null 设置为当前线程。这确保了只有一个线程可以实际执行任务。

  2. 如果设置成功并且任务状态是 NEW,则会调用内部的 Callable 对象的 call() 方法。

  3. 如果 call() 方法正常返回,则调用 set(V result) 方法设置结果,并将状态转换为 NORMAL

  4. 如果 call() 方法抛出异常,则调用 setException(Throwable t) 方法设置异常,并将状态转换为 EXCEPTIONAL

  5. finally 块中,runner 字段会被重置为 null。还会检查任务是否在运行期间被取消并需要中断(状态为 INTERRUPTINGINTERRUPTED),如果是,则会调用 handlePossibleCancellationInterrupt() 处理。

3. 获取结果 (get()get(long, TimeUnit) 方法)

  • get() 方法:

    • 首先检查当前状态 s = state

    • 如果任务尚未完成 (s <= COMPLETING),则调用 awaitDone(boolean timed, long nanos) 方法阻塞等待。

    • 一旦任务完成(状态变为 NORMAL, EXCEPTIONAL, CANCELLED, 或 INTERRUPTED),awaitDone 返回,然后 get() 方法调用 report(int s) 来返回结果或抛出相应的异常。

    • NORMAL: 返回结果。

    • EXCEPTIONAL: 抛出 ExecutionException (包装了原始异常)。

    • CANCELLEDINTERRUPTED: 抛出 CancellationException

  • get(long, TimeUnit)** 方法**:类似 get(),但带有超时机制。如果在超时时间内任务未完成,则抛出 TimeoutException

4. 取消任务 (cancel(boolean mayInterruptIfRunning) 方法)

  1. 尝试通过 CAS 将状态从 NEW 转换为 CANCELLED (如果 mayInterruptIfRunningfalse) 或 INTERRUPTING (如果 mayInterruptIfRunningtrue)。

  2. 如果 CAS 成功:

    1. 如果 mayInterruptIfRunningtrue

      • 获取 runner 线程。

      • 如果 runner 不为 null,则调用 runner.interrupt() 来中断执行任务的线程。

      • finally 块中,将状态设置为 INTERRUPTED (使用 STATE.setRelease 保证内存可见性)。

    2. 最后,调用 finishCompletion() 来唤醒所有等待的线程。

  3. 如果 CAS 失败(例如任务已经完成或已被取消),则返回 false

5. 等待队列 (WaitNodewaiters 字段)

  • private volatile WaitNode waiters;:这是一个指向等待线程链表头部的指针。这个链表是一个简单的 Treiber 栈 (LIFO 栈)。

  • WaitNode 是一个静态内部类,代码如下:

// ... 
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } 
} 
// ... 

每个 WaitNode 封装了一个等待结果的线程 (thread = Thread.currentThread()) 和一个指向下一个节点的指针 (next)。

6. 阻塞和唤醒机制 (awaitDone()finishCompletion())

  • awaitDone(boolean timed, long nanos)

当一个线程调用 get() 并且任务未完成时,会进入此方法。它会创建一个新的 WaitNode,然后在一个循环中:

  1. 通过 CAS 将新的 WaitNode 添加到 waiters 链表的头部(实现入栈)。

  2. 再次检查任务状态,如果已完成,则移除刚添加的节点并返回状态。

  3. 如果任务仍未完成,则调用 LockSupport.park(this) (或 LockSupport.parkNanos(this, nanos)) 使当前线程阻塞。

  4. 当线程被唤醒时,如果是因为中断,则从等待队列中移除节点并抛出 InterruptedException。如果是因为超时,则从等待队列中移除节点并返回当前状态。

  • finishCompletion()

当任务完成(通过 set, setException, 或 cancel)时,此方法被调用。

  1. 它会遍历 waiters 链表,并对每个 WaitNode 中的线程调用 LockSupport.unpark(q.thread) 来唤醒它们。

  2. 遍历完成后,调用 done() 方法(这是一个空方法,供子类覆盖以执行完成回调)。

  3. 最后将 callable 设为 null 以帮助 GC。

哪个线程负责管理唤醒 get 等待的线程?

负责唤醒等待线程的是完成任务的那个线程。具体来说:

  • 如果是任务正常执行完成或抛出异常,那么是执行 run() 方法的线程(即 runner 线程)在调用 set()setException() 后,最终会调用 finishCompletion() 来唤醒所有等待者。

  • 如果是任务被取消,那么是调用 cancel() 方法的线程在成功取消任务后,会调用 finishCompletion() 来唤醒所有等待者。

等待 get() 方法结果的线程被封装在 WaitNode 对象中。每个 WaitNode 包含:

  • volatile Thread thread;: 对等待线程本身的引用。

  • volatile WaitNode next;: 指向链表中下一个 WaitNode 的引用。

这些 WaitNode 对象形成一个后进先出 (LIFO) 的栈式链表,其头节点由 FutureTaskvolatile WaitNode waiters; 字段指向。当一个线程需要等待时,它会创建一个新的 WaitNode 并将其 CAS 到 waiters 链表的头部。当任务完成时,finishCompletion() 方法会遍历这个链表并唤醒每个节点中的线程。

设计优势

这种设计避免了使用更重的锁(如 AbstractQueuedSynchronizer,早期版本的 FutureTask 使用过它),转而使用轻量级的 CAS 操作和 LockSupport 进行线程的阻塞和唤醒,这在很多情况下能提供更好的性能。

http://www.xdnf.cn/news/872389.html

相关文章:

  • 【RAG召回优化】rag召回阶段方法探讨
  • 学习STC51单片机27(芯片为STC89C52RCRC)
  • 34.1STM32下的can总线实现知识(区分linux)_csdn
  • 洛谷B2147 求 f(x,n)
  • 解决SQL Server SQL语句性能问题(9)——SQL语句改写(1)
  • 2ETLCloud:重新定义AI驱动的数据集成未来
  • 四、OpenCV图像处理- 视频操作
  • ArcGIS计算多个栅格数据的平均栅格
  • Educational Codeforces Round 179 (Rated for Div. 2)(A-E)
  • 看不见的守护者
  • 【机器人编程基础】循环语句for-while
  • 内存管理【Linux操作系统】
  • IEEE ICBCTIS 2025 会议征稿:探索区块链与信息安全的前沿学术之旅​
  • 操作系统学习(十三)——Linux
  • Elasticsearch 海量数据写入与高效文本检索实践指南
  • 上门服务小程序订单系统框架设计
  • Docker 常用命令详解
  • 洛谷每日1题-------Day40__P1720 月落乌啼算钱(斐波那契数列)
  • 卡西欧模拟器:Windows端功能强大的计算器
  • matlab实现高斯烟羽模型算法
  • AA-CLIP: Enhancing Zero-Shot Anomaly Detection via Anomaly-Aware CLIP
  • Linux操作系统Shell脚本概述与命令实战
  • 英伟达288GB HBM4+50P算力
  • 云数据库选型指南:关系型 vs NoSQL vs NewSQL的企业决策
  • Selenium自动化测试工具安装和使用(PyCharm)
  • Java运行环境配置日志(Log)运行条件,包含鸿蒙HarmonyOS
  • 函数与运算符重载
  • 【freertos-kernel】timer
  • 嵌入式链表操作原理详解
  • 《小明的一站式套餐服务平台:抽象工厂模式》