当前位置: 首页 > news >正文

【CompletableFuture】基础Future (一)

【CompletableFuture】基础Future

  • 1. Future理论知识
    • 1.1 基本理解
    • 1.2 常用方法
    • 1.3 使用案例
    • 1.4 注意事项
    • 1.5 Future vs CompletableFuture
  • 2. Future接口常用实现类FutureTask异步任务
  • 2.1 类定义
    • 2.2 核心属性
    • 2.3 状态流转(state)
    • 2.4 代码示例
    • 2.5 FutureTask的优缺点
      • 优点
      • 缺点
    • 2.6 书中介绍
  • 附录

1. Future理论知识

/** ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.**//**** Written by Doug Lea with assistance from members of JCP JSR-166* Expert Group and released to the public domain, as explained at* http://creativecommons.org/publicdomain/zero/1.0/*/package java.util.concurrent;/*** A {@code Future} represents the result of an asynchronous* computation.  Methods are provided to check if the computation is* complete, to wait for its completion, and to retrieve the result of* the computation.  The result can only be retrieved using method* {@code get} when the computation has completed, blocking if* necessary until it is ready.  Cancellation is performed by the* {@code cancel} method.  Additional methods are provided to* determine if the task completed normally or was cancelled. Once a* computation has completed, the computation cannot be cancelled.* If you would like to use a {@code Future} for the sake* of cancellability but not provide a usable result, you can* declare types of the form {@code Future<?>} and* return {@code null} as a result of the underlying task.** <p>* <b>Sample Usage</b> (Note that the following classes are all* made-up.)* <pre> {@code* interface ArchiveSearcher { String search(String target); }* class App {*   ExecutorService executor = ...*   ArchiveSearcher searcher = ...*   void showSearch(final String target)*       throws InterruptedException {*     Future<String> future*       = executor.submit(new Callable<String>() {*         public String call() {*             return searcher.search(target);*         }});*     displayOtherThings(); // do other things while searching*     try {*       displayText(future.get()); // use future*     } catch (ExecutionException ex) { cleanup(); return; }*   }* }}</pre>** The {@link FutureTask} class is an implementation of {@code Future} that* implements {@code Runnable}, and so may be executed by an {@code Executor}.* For example, the above construction with {@code submit} could be replaced by:*  <pre> {@code* FutureTask<String> future =*   new FutureTask<String>(new Callable<String>() {*     public String call() {*       return searcher.search(target);*   }});* executor.execute(future);}</pre>** <p>Memory consistency effects: Actions taken by the asynchronous computation* <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a>* actions following the corresponding {@code Future.get()} in another thread.** @see FutureTask* @see Executor* @since 1.5* @author Doug Lea* @param <V> The result type returned by this Future's {@code get} method*/
public interface Future<V> {/*** Attempts to cancel execution of this task.  This attempt will* fail if the task has already completed, has already been cancelled,* or could not be cancelled for some other reason. If successful,* and this task has not started when {@code cancel} is called,* this task should never run.  If the task has already started,* then the {@code mayInterruptIfRunning} parameter determines* whether the thread executing this task should be interrupted in* an attempt to stop the task.** <p>After this method returns, subsequent calls to {@link #isDone} will* always return {@code true}.  Subsequent calls to {@link #isCancelled}* will always return {@code true} if this method returned {@code true}.** @param mayInterruptIfRunning {@code true} if the thread executing this* task should be interrupted; otherwise, in-progress tasks are allowed* to complete* @return {@code false} if the task could not be cancelled,* typically because it has already completed normally;* {@code true} otherwise*/boolean cancel(boolean mayInterruptIfRunning);/*** Returns {@code true} if this task was cancelled before it completed* normally.** @return {@code true} if this task was cancelled before it completed*/boolean isCancelled();/*** Returns {@code true} if this task completed.** Completion may be due to normal termination, an exception, or* cancellation -- in all of these cases, this method will return* {@code true}.** @return {@code true} if this task completed*/boolean isDone();/*** Waits if necessary for the computation to complete, and then* retrieves its result.** @return the computed result* @throws CancellationException if the computation was cancelled* @throws ExecutionException if the computation threw an* exception* @throws InterruptedException if the current thread was interrupted* while waiting*/V get() throws InterruptedException, ExecutionException;/*** Waits if necessary for at most the given time for the computation* to complete, and then retrieves its result, if available.** @param timeout the maximum time to wait* @param unit the time unit of the timeout argument* @return the computed result* @throws CancellationException if the computation was cancelled* @throws ExecutionException if the computation threw an* exception* @throws InterruptedException if the current thread was interrupted* while waiting* @throws TimeoutException if the wait timed out*/V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

翻译后结果

/** ORACLE 专有/机密。使用受许可条款约束。*//** 在 JCP JSR-166 专家组成员的协助下,由 Doug Lea 编写,* 并已发布到公共领域,如以下网址所述:* http://creativecommons.org/publicdomain/zero/1.0/*/package java.util.concurrent;/*** {@code Future} 表示一个异步计算的结果。* 提供了用于检查计算是否完成、等待其完成以及检索其结果的方法。* 只有在计算完成之后,才能使用 {@code get} 方法来获取结果;如有必要,该方法会阻塞直到计算完成。* 通过 {@code cancel} 方法可以取消计算。* 还提供了额外的方法来判断任务是否是正常完成或已被取消。* 一旦计算完成,就不能再取消该计算。* * 如果你希望使用 {@code Future} 实现可取消性但不需要返回结果,* 可以声明类型为 {@code Future<?>} 并让底层任务返回 {@code null} 作为结果。** <p>* <b>示例用法</b>(注意,以下类均为虚构):* <pre> {@code* interface ArchiveSearcher { String search(String target); }* class App {*   ExecutorService executor = ...*   ArchiveSearcher searcher = ...*   void showSearch(final String target)*       throws InterruptedException {*     Future<String> future*       = executor.submit(new Callable<String>() {*         public String call() {*             return searcher.search(target);*         }});*     displayOtherThings(); // 在搜索期间执行其他操作*     try {*       displayText(future.get()); // 使用 future 获取结果*     } catch (ExecutionException ex) { cleanup(); return; }*   }* }}</pre>** {@link FutureTask} 类是 {@code Future} 的一个实现类,它还实现了 {@code Runnable},* 因此可以由 {@code Executor} 执行。* 例如,上述使用 {@code submit} 的方式可以替换为如下形式:*  <pre> {@code* FutureTask<String> future =*   new FutureTask<String>(new Callable<String>() {*     public String call() {*       return searcher.search(target);*   }});* executor.execute(future);}</pre>** <p>内存一致性效应:由异步计算所执行的操作,* 会 <a href="package-summary.html#MemoryVisibility"><i>先于(happen-before)</i></a>* 另一个线程中随后调用 {@code Future.get()} 方法之后的操作。** @see FutureTask* @see Executor* @since 1.5* @author Doug Lea* @param <V> 此 Future 的 {@code get} 方法返回的结果类型*/
public interface Future<V> {/*** 尝试取消此任务的执行。* 如果任务已经完成、已被取消,或因其他原因无法取消,则该尝试将失败。* 如果任务尚未启动并成功取消,那么该任务将永远不会被执行。* 如果任务已经启动,则参数 {@code mayInterruptIfRunning} 决定是否应中断执行此任务的线程,* 以尝试停止任务的执行。** <p>此方法返回后,对 {@link #isDone} 的调用将始终返回 {@code true}。* 如果此方法返回 {@code true},则对 {@link #isCancelled} 的调用也将返回 {@code true}。** @param mayInterruptIfRunning 如果应中断正在执行此任务的线程,则为 {@code true};*                              否则,允许正在进行的任务继续完成。* @return 如果任务无法被取消(通常是因为它已经正常完成),则返回 {@code false};*         否则返回 {@code true}*/boolean cancel(boolean mayInterruptIfRunning);/*** 如果此任务在正常完成之前已被取消,则返回 {@code true}。** @return 如果任务在完成之前被取消,则返回 {@code true}*/boolean isCancelled();/*** 如果此任务已完成,则返回 {@code true}。** 完成可能是由于正常结束、抛出异常或被取消——* 在所有这些情况下,此方法都将返回 {@code true}。** @return 如果此任务已完成,则返回 {@code true}*/boolean isDone();/*** 如有必要,等待计算完成,然后检索其结果。** @return 计算结果* @throws CancellationException 如果计算被取消* @throws ExecutionException 如果计算抛出异常* @throws InterruptedException 如果当前线程在等待期间被中断*/V get() throws InterruptedException, ExecutionException;/*** 如有必要,最多等待指定的时间以完成计算,然后(如果可用)检索其结果。** @param timeout 最长等待时间* @param unit 超时时间的时间单位* @return 计算结果* @throws CancellationException 如果计算被取消* @throws ExecutionException 如果计算抛出异常* @throws InterruptedException 如果当前线程在等待期间被中断* @throws TimeoutException 如果等待超时*/V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

上面代码是Java 并发包(java.util.concurrent)中的一个核心接口,由并发编程专家 Doug Lea 编写,并从 JDK 1.5 开始引入。它用于表示异步计算的结果。
即:一个任务可能还在执行,但你可以提前获取它的引用,并在之后查询、等待或取消它的执行。

1.1 基本理解

Future<T> 接口表示一个异步任务的结果,T 是结果的类型。它的主要用途是:

  • 查询任务是否完成
  • 获取任务结果
  • 取消任务

1.2 常用方法

public interface Future<V> {// 取消任务boolean cancel(boolean mayInterruptIfRunning); // 判断任务是否被取消boolean isCancelled(); // 判断任务是否已完成boolean isDone(); // 获取任务结果(阻塞)V get() throws InterruptedException, ExecutionException; // 限时获取任务结果V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; 
} 

1.3 使用案例

配合 ExecutorService 使用:

import java.util.concurrent.*;public class FutureExample {public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newSingleThreadExecutor();// 提交一个 Callable 任务,返回 FutureFuture<Integer> future = executor.submit(() -> {Thread.sleep(2000); // 模拟耗时操作return 42;});System.out.println("任务已提交,等待结果...");// 阻塞等待结果Integer result = future.get(); System.out.println("任务结果:" + result);executor.shutdown();}
}

1.4 注意事项

  1. get() 方法是阻塞的:如果任务没执行完,get() 会一直阻塞等待。
  2. 任务可以取消:通过 cancel(true) 取消正在执行的任务(如果支持中断)。
  3. 配合 Callable 使用:与 Runnable 不同,Callable 可以有返回值。
  4. 推荐使用 CompletableFuture(Java 8 及以后):功能更强大,支持链式调用、异步处理等。

1.5 Future vs CompletableFuture

在这里插入图片描述

2. Future接口常用实现类FutureTask异步任务

/** ORACLE 专有/机密。使用需遵守许可证条款。*//** 由 Doug Lea 编写,并在 JCP JSR-166 专家组成员协助下完成。* 根据 http://creativecommons.org/publicdomain/zero/1.0/ 中的说明,* 此代码发布到公有领域。*/package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;/*** 一个可取消的异步计算任务。此类提供了 {@link Future} 的基础实现,* 包括启动和取消计算、查询是否完成,以及获取计算结果的方法。* 只有在计算完成后才能获取结果;如果尚未完成,调用 {@code get} 方法会阻塞。* 一旦计算完成,就不能重新开始或取消(除非使用 {@link #runAndReset} 方法重新执行)。** <p>{@code FutureTask} 可以用来包装一个 {@link Callable} 或 {@link Runnable} 对象。* 因为它实现了 {@code Runnable} 接口,所以可以提交到 {@link Executor} 中执行。** <p>除了作为一个独立的类使用外,此类还提供了一些受保护的功能,* 可供创建自定义任务类时使用。** @since 1.5* @author Doug Lea* @param <V> 此 FutureTask 的 {@code get} 方法返回的结果类型*/
public class FutureTask<V> implements RunnableFuture<V> {/** 版本说明:与之前基于 AbstractQueuedSynchronizer 的版本不同,* 当前设计通过使用 CAS 更新一个 “state” 字段来控制同步,* 避免了在取消竞争期间保留中断状态的问题。* 等待线程采用简单的 Treiber 栈结构。** 风格说明:与往常一样,我们绕过 AtomicXFieldUpdater 带来的开销,* 而直接使用 Unsafe 内在方法。*//*** 当前任务的运行状态,初始为 NEW。* 状态只会在 set、setException 和 cancel 方法中转换为终态。* 在完成过程中,状态可能暂时处于 COMPLETING 或 INTERRUPTING 状态。* 从中间状态转换为最终状态使用的是懒加载写入(比普通写入性能更好)。** 状态转换图:* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/private volatile int state;private static final int NEW          = 0;private static final int COMPLETING   = 1;private static final int NORMAL       = 2;private static final int EXCEPTIONAL  = 3;private static final int CANCELLED    = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED  = 6;/** 待执行的 Callable;执行完成后会置为 null */private Callable<V> callable;/** get() 返回的结果或抛出的异常 */private Object outcome; // 非 volatile,由 state 的读写保证可见性/** 正在执行任务的线程;在 run() 中通过 CAS 设置 */private volatile Thread runner;/** Treiber 栈结构,存储等待中的线程 */private volatile WaitNode waiters;/*** 根据任务状态返回结果或抛出异常。** @param s 完成状态码*/@SuppressWarnings("unchecked")private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}/*** 创建一个 FutureTask,在运行时会执行给定的 Callable。** @param callable 可调用任务* @throws NullPointerException 如果 callable 为 null*/public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;}/*** 创建一个 FutureTask,在运行时会执行给定的 Runnable,并返回指定结果。** @param runnable 可运行任务* @param result 成功执行后返回的结果* @throws NullPointerException 如果 runnable 为 null*/public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;}public boolean isCancelled() {return state >= CANCELLED;}public boolean isDone() {return state != NEW;}public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally {UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {finishCompletion();}return true;}public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}/*** 当任务完成(包括取消)时调用。* 默认实现什么也不做。子类可以覆盖此方法来添加回调或记录。*/protected void done() { }/*** 设置成功结果(如果未设置或未取消)。** @param v 计算结果*/protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL);finishCompletion();}}/*** 设置异常结果(如果未设置或未取消)。** @param t 异常原因*/protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);finishCompletion();}}public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}/*** 执行任务但不设置结果,然后重置状态。* 如果任务执行过程中抛出异常或被取消则返回 false。*/protected boolean runAndReset() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // 不设置结果ran = true;} catch (Throwable ex) {setException(ex);}}} finally {runner = null;s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;}/*** 确保 cancel(true) 发出的中断只在 run/runAndReset 中传递。*/private void handlePossibleCancellationInterrupt(int s) {if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield();// Thread.interrupted(); // 清除中断状态(留空以允许用户处理)}/*** Treiber 栈中用于记录等待线程的节点。*/static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}/*** 移除并唤醒所有等待线程,调用 done() 并清理 callable。*/private void finishCompletion() {for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null;q = next;}break;}}done();callable = null;}/*** 等待任务完成,或因中断/超时而终止。*/private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING)Thread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}/*** 从等待链表中移除超时或中断的节点。*/private void removeWaiter(WaitNode node) {if (node != null) {node.thread = null;retry:for (;;) {for (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;if (q.thread != null)pred = q;else if (pred != null) {pred.next = s;if (pred.thread == null)continue retry;}else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}}}// Unsafe 机制private static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}}}

FutureTask<V> 是 Java 并发包中的一个类,它同时实现了:

  • Runnable:可被线程执行
  • Future<V>:可获取结果、支持取消

🔧 作用简述:
用于封装一个可以在未来执行并获取结果的任务,支持:

  • 提交异步任务执行
  • 等待任务完成并获取结果
  • 支持任务取消、中断
  • 支持异常处理

在这里插入图片描述

2.1 类定义

public class FutureTask<V> implements RunnableFuture<V> {// 实现了 Runnable 和 Future 接口的组合接口
}public interface RunnableFuture<V> extends Runnable, Future<V> {void run();
}

2.2 核心属性

在这里插入图片描述

2.3 状态流转(state)

在这里插入图片描述

2.4 代码示例

在这里插入图片描述

2.5 FutureTask的优缺点

优点

在这里插入图片描述
FutureTask结合线程池可以提高效率

缺点

在这里插入图片描述

Timeout在这里插入图片描述

在这里插入图片描述

  1. get()方法是阻塞方法,一旦调用get方法求结果,如果没有计算完容易导致程序阻塞。
  2. Future获取的结果不是很好,只能通过阻塞或者轮训的方式获取到任务的结果。

2.6 书中介绍

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

附录

  1. 课程视频 https://www.bilibili.com/video/BV1ar4y1x727?spm_id_from=333.788.player.switch&vd_source=240d9002f7c7e3da63cd9a975639409a&p=14
http://www.xdnf.cn/news/1045333.html

相关文章:

  • 大模型笔记5:Agent
  • 大模型笔记2:提示词工程
  • 鸿蒙运动开发实战:打造专属运动视频播放器
  • SpringBoot新闻项目学习day2-前后端搭建以及数据查询(分页查询)
  • 「Linux文件及目录管理」文件内容的显示和处理类命令
  • 深入探究其内存开销与JVM布局——Java Record
  • RabbitMQ全面学习指南
  • ArcGIS安装出现1606错误解决办法
  • Linux-多线程安全
  • NY271NY274美光科技固态NY278NY284
  • 【SpringBoot+SpringCloud】nacos配置管理问题解决
  • 38-Oracle 23 ai Accelerate Securefiles LOB Performance
  • 使用x64dbg破解密钥exe程序
  • React学习001-创建 React 应用
  • Spark简介脑图
  • 分割函数(Split Function)
  • 电阻篇---下拉电阻的取值
  • 【运维系列】【ubuntu22.04】Docker安装mysql 8.0.36 教程
  • Java安全管理器-(Security Manager)
  • 《江西南昌棒垒球》一级运动员 vs 二级运动员·棒球1号位
  • Python打卡训练营Day54
  • 【AI学习】【Ubuntu 22.04】【安装Ollama】两种方式
  • 【图片识别改名】如何批量识别大量图片的文字并重命名图片,基于WPF和京东OCR识别接口的实现方案
  • 电脑上的.ssh目录只做什么的
  • 微调技术:Prefix-tuning vs Prompt-tuning vs P-tuning
  • Java设计模式完整学习指南(23+4种模式)
  • 跨域问题之前后端解决办法
  • Photoshop矢量蒙版全教程
  • 我的JavaWeb软件开发作品学生信息管理系统项目/JavaWeb软件开发 课程考察标准
  • 【Bluedroid】蓝牙启动之核心模块(startProfiles )初始化与功能源码解析