当前位置: 首页 > web >正文

java并发编程-ForkJoinPool

ForkJoinPool

  • Fork/Join
  • ForkJoinPool
  • ForkJoinTask
  • 工作原理
  • ForkJoinPool实现归并排序
  • 注意事项

Fork/Join

Fork/Join是一个是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的Fork对应的是分治任务模型里的任务分解,Join对应的是结果合并。Fork/Join框架的主要组成部分是ForkJoinPool、ForkJoinTask。ForkJoinPool是一个线程池,它用于管理ForkJoin任务的执行。ForkJoinTask是一个抽象类,用于表示可以被分割成更小部分的任务。

ForkJoinPool

构造器核心参数:
int parallelism:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定工作线程的数量。如果未设置的话,将使用Runtime.getRuntime().availableProcessors()来设置并行级别;
ForkJoinWorkerThreadFactory factory:ForkJoinPool在创建线程时,会通过factory来创建。如果你不指定factory,那么将由默认的DefaultForkJoinWorkerThreadFactory负责线程的创建工作;
UncaughtExceptionHandler handler:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理;
boolean asyncMode:设置队列的工作模式。当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式。

提交任务:

返回值方法
提交异步执行voidexecute(ForkJoinTask<?> task) / execute(Runnable task)
等待并获取结果Tinvoke(ForkJoinTask task)
提交执行获取Future结果ForkJoinTasksubmit(ForkJoinTask task) / submit(Callable task) / submit(Runnable task, T result) / submit(Runnable task)

ForkJoinTask

RecursiveAction:用于递归执行但不需要返回结果的任务。
RecursiveTask:用于递归执行需要返回结果的任务。

ForkJoinTask最核心的是fork()方法和join()方法: fork()方法用于向当前任务所运行的线程池中提交任务。join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。

工作原理

  1. ForkJoinPool内部有多个任务队列,当我们通过ForkJoinPool的invoke()或者submit()方法提交任务时,ForkJoinPool会把任务放入一个任务队列中(偶数位置,owner=null),然后启动工作线程(绑定到奇数位置,每个线程有自己的任务队列)拿取任务去执行。
  2. ForkJoinWorkerThread工作线程启动后就会扫描偷取任务执行,另外当其在ForkJoinTask#join()等待返回结果时如果被ForkJoinPool线程池发现其任务队列为空或者已经将当前任务执行完毕,也会通过工作窃取算法从其他任务队列中获取任务分配到其任务队列中并执行。

在这里插入图片描述

ForkJoinPool实现归并排序

public class ForkJoinPoolMergeSort {public static void main(String[] args) {long start = System.currentTimeMillis();int[] array = generateIntArray(20000000);MergeSortTask mergeSortTask = new MergeSortTask(array, 2000);mergeSortTask.invoke();System.out.println(Arrays.toString(mergeSortTask.getArray()));long end = System.currentTimeMillis();System.out.println("花费时间:" + (end - start));}private static int[] generateIntArray(int size) {int[] array = new int[size];Random random = new Random();for(int i = 0; i < size; i++) {array[i] = random.nextInt(100000000);}return array;}
}public class MergeSortTask extends RecursiveAction {private int[] array;private int threshold;public MergeSortTask(int[] array, int threshold) {this.array = array;this.threshold = threshold;}@Overrideprotected void compute() {if(array.length <= threshold) {Arrays.sort(array);return;}int mid = array.length / 2;int[] leftArray = Arrays.copyOfRange(array, 0, mid);int[] rightArray = Arrays.copyOfRange(array, mid, array.length);MergeSortTask leftTask = new MergeSortTask(leftArray, threshold);MergeSortTask rightTask = new MergeSortTask(rightArray, threshold);leftTask.fork();rightTask.fork();leftTask.join();rightTask.join();array = MergeSort.merge(leftTask.getArray(), rightTask.getArray());}public int[] getArray() {return array;}
}public class MergeSort {public static int[] merge(int[] a, int[] b) {int alen = a.length;int blen = b.length;int [] sortedArray = new int[alen+blen];int aIndex = 0;int bIndex = 0;int i = 0;while(aIndex < alen && bIndex < blen) {if(a[aIndex] < b[bIndex]) {sortedArray[i] = a[aIndex];aIndex++;} else {sortedArray[i] = b[bIndex];bIndex++;}i++;}while (aIndex < alen) {sortedArray[i] = a[aIndex];aIndex++;i++;}while (bIndex < blen) {sortedArray[i] = b[bIndex];bIndex++;i++;}return sortedArray;}
}

注意事项

  1. 处理递归任务时,需要根据实际情况来评估递归深度和任务粒度,避免栈溢出。
  2. 使用ForkJoinPool来处理非阻塞型任务。阻塞任务使用普通线程池处理。
http://www.xdnf.cn/news/308.html

相关文章:

  • fastdds:传输层SHM和DATA-SHARING的区别
  • I2C嵌入式开发实战指南:从入门到精通
  • 一级指针的介绍
  • python进阶: 深入了解调试利器 Pdb
  • 第R3周:RNN-心脏病预测
  • namesapce、cgroup
  • kubeadm极速部署Kubernetes 1.26.X 版本集群
  • AI语音助手 React 组件使用js-audio-recorder实现,将获取到的语音转成base64发送给后端,后端接口返回文本内容
  • 【学习笔记】文件上传漏洞--黑白盒审计
  • 数字友好战略视域下数字安全核心要素的理论解构与实践路径
  • 2022年世界青年科学家峰会-高端装备系统动力学与智能诊断维护学术研讨会
  • Java之this关键字
  • CTF--MD5
  • 慢速率拉伸热变形工艺试验机
  • 关于模拟噪声分析的11个误区
  • Dify快速入门之基于知识库构建聊天机器人
  • 汽车免拆诊断案例 | 2019款大众途观L车鼓风机偶尔不工作
  • 在浏览器中输入 URL 到页面加载完成都做了什么
  • 【含文档+PPT+源码】基于python爬虫的豆瓣电影、音乐、图书数据分析系统
  • nginx-基础知识(二)
  • 为什么计算「网络响应时间」或「定位响应时间」时,CACurrentMediaTime() 比 Date() 更优?
  • MCP系列之架构篇:深入理解MCP的设计架构
  • DeepSeek 操作 MySQL 数据库:使用 MCP 实现数据库查询
  • 【HDFS入门】联邦机制(Federation)与扩展性:HDFS NameNode水平扩展深度解析
  • 【AI提示词】儿童看护员
  • 实验五 内存管理实验
  • 如何在PDF.js中改造viewer.html以实现PDF的动态加载
  • STM32单片机入门学习——第41节: [12-1] Unix时间戳
  • MyBatis如何配置数据库连接并实现交互?
  • YOLOv5、YOLOv6、YOLOv7、YOLOv8、YOLOv9、YOLOv10、YOLOv11、YOLOv12的网络结构图