Fork/Join框架及应用讲解
Fork/Join是Java 7引入的用于处理可拆分任务的并行计算框架,属于JUC(java.util.concurrent)的一部分。它特别适合把大任务拆分成小任务并行执行,然后合并结果的场景。
1. 基本概念
- Fork:将一个大任务拆分成多个子任务并行执行。
- Join:等待子任务执行完成,并合并结果。
- ForkJoinPool:专门的线程池,用于执行Fork/Join任务。
- RecursiveTask:有返回值的任务。
- RecursiveAction:无返回值的任务。
Fork/Join采用工作窃取算法,空闲线程会去窃取忙线程的任务,提高CPU利用率。
2. 典型例子:数组求和
假设有一个大数组,求数组元素的总和,如果直接用单线程循环可能比较慢,我们可以用Fork/Join并行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import java.util.concurrent.RecursiveTask; import java.util.concurrent.ForkJoinPool;
public class SumArrayTask extends RecursiveTask<Long> { private static final int THRESHOLD = 1000; private final long[] array; private final int start, end;
public SumArrayTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; }
@Override protected Long compute() { if (end - start <= THRESHOLD) { long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } else { int mid = (start + end) / 2; SumArrayTask leftTask = new SumArrayTask(array, start, mid); SumArrayTask rightTask = new SumArrayTask(array, mid, end); leftTask.fork(); long rightResult = rightTask.compute(); long leftResult = leftTask.join(); return leftResult + rightResult; } }
public static void main(String[] args) { long[] array = new long[10_000]; for (int i = 0; i < array.length; i++) array[i] = i + 1;
ForkJoinPool pool = new ForkJoinPool(); long total = pool.invoke(new SumArrayTask(array, 0, array.length)); System.out.println("Total sum: " + total); } }
|
3. 解析
- 任务拆分:数组大于阈值
THRESHOLD就拆成左右两半。
- 并行执行:使用
fork()异步执行左半部分,当前线程执行右半部分。
- 结果合并:
join()等待左半任务完成,然后合并结果。
- 线程池优化:ForkJoinPool会利用多核CPU并行计算,提高性能。
4. 使用场景
- 数组/集合批量计算(求和、最大值、平均值)
- 图像处理(分块滤镜、像素处理)
- 大规模矩阵计算
- 递归算法,如快速排序、归并排序
5. 小技巧
- 阈值
THRESHOLD的选取影响性能,太小任务太多,太大可能并行化不明显
- RecursiveTask用于有返回值的任务,RecursiveAction用于无返回值的任务
- Fork/Join适合CPU密集型任务,不适合I/O密集型任务
总结:
Fork/Join框架能让你将大任务拆分成小任务并行执行,非常适合大数据量的CPU密集型计算。通过这个数组求和例子,你可以看到fork和join的实际应用,既有实践价值,又不难理解。