java并发编程-ForkJoinPool
ForkJoinPool
- Fork/Join
- ForkJoinPool
- ForkJoinTask
- 工作原理
- ForkJoinPool实现归并排序
- 注意事项
Fork/Join
Fork/Join是一个是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的Fork对应的是分治任务模型里的任务分解,Join对应的是结果合并。Fork/Join框架的主要组成部分是ForkJoinPool、ForkJoinTask。ForkJoinPool是一个线程池,它用于管理ForkJoin任务的执行。ForkJoinTask是一个抽象类,用于表示可以被分割成更小部分的任务。
ForkJoinPool
构造器核心参数:
int parallelism:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定工作线程的数量。如果未设置的话,将使用Runtime.getRuntime().availableProcessors()来设置并行级别;
ForkJoinWorkerThreadFactory factory:ForkJoinPool在创建线程时,会通过factory来创建。如果你不指定factory,那么将由默认的DefaultForkJoinWorkerThreadFactory负责线程的创建工作;
UncaughtExceptionHandler handler:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理;
boolean asyncMode:设置队列的工作模式。当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式。
提交任务:
返回值 | 方法 | |
---|---|---|
提交异步执行 | void | execute(ForkJoinTask<?> task) / execute(Runnable task) |
等待并获取结果 | T | invoke(ForkJoinTask task) |
提交执行获取Future结果 | ForkJoinTask | submit(ForkJoinTask task) / submit(Callable task) / submit(Runnable task, T result) / submit(Runnable task) |
ForkJoinTask
RecursiveAction:用于递归执行但不需要返回结果的任务。
RecursiveTask:用于递归执行需要返回结果的任务。
ForkJoinTask最核心的是fork()方法和join()方法: fork()方法用于向当前任务所运行的线程池中提交任务。join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。
工作原理
- ForkJoinPool内部有多个任务队列,当我们通过ForkJoinPool的invoke()或者submit()方法提交任务时,ForkJoinPool会把任务放入一个任务队列中(偶数位置,owner=null),然后启动工作线程(绑定到奇数位置,每个线程有自己的任务队列)拿取任务去执行。
- ForkJoinWorkerThread工作线程启动后就会扫描偷取任务执行,另外当其在ForkJoinTask#join()等待返回结果时如果被ForkJoinPool线程池发现其任务队列为空或者已经将当前任务执行完毕,也会通过工作窃取算法从其他任务队列中获取任务分配到其任务队列中并执行。
ForkJoinPool实现归并排序
public class ForkJoinPoolMergeSort {public static void main(String[] args) {long start = System.currentTimeMillis();int[] array = generateIntArray(20000000);MergeSortTask mergeSortTask = new MergeSortTask(array, 2000);mergeSortTask.invoke();System.out.println(Arrays.toString(mergeSortTask.getArray()));long end = System.currentTimeMillis();System.out.println("花费时间:" + (end - start));}private static int[] generateIntArray(int size) {int[] array = new int[size];Random random = new Random();for(int i = 0; i < size; i++) {array[i] = random.nextInt(100000000);}return array;}
}public class MergeSortTask extends RecursiveAction {private int[] array;private int threshold;public MergeSortTask(int[] array, int threshold) {this.array = array;this.threshold = threshold;}@Overrideprotected void compute() {if(array.length <= threshold) {Arrays.sort(array);return;}int mid = array.length / 2;int[] leftArray = Arrays.copyOfRange(array, 0, mid);int[] rightArray = Arrays.copyOfRange(array, mid, array.length);MergeSortTask leftTask = new MergeSortTask(leftArray, threshold);MergeSortTask rightTask = new MergeSortTask(rightArray, threshold);leftTask.fork();rightTask.fork();leftTask.join();rightTask.join();array = MergeSort.merge(leftTask.getArray(), rightTask.getArray());}public int[] getArray() {return array;}
}public class MergeSort {public static int[] merge(int[] a, int[] b) {int alen = a.length;int blen = b.length;int [] sortedArray = new int[alen+blen];int aIndex = 0;int bIndex = 0;int i = 0;while(aIndex < alen && bIndex < blen) {if(a[aIndex] < b[bIndex]) {sortedArray[i] = a[aIndex];aIndex++;} else {sortedArray[i] = b[bIndex];bIndex++;}i++;}while (aIndex < alen) {sortedArray[i] = a[aIndex];aIndex++;i++;}while (bIndex < blen) {sortedArray[i] = b[bIndex];bIndex++;i++;}return sortedArray;}
}
注意事项
- 处理递归任务时,需要根据实际情况来评估递归深度和任务粒度,避免栈溢出。
- 使用ForkJoinPool来处理非阻塞型任务。阻塞任务使用普通线程池处理。