ForkJoin详解及应用

ForkJoin是由JDK1.7后提供多线并发处理框架。主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。

应用场景

计算1~10的和,每组计算3个,代码如下:

public class ForkJoinExample extends RecursiveTask<Integer> {      private final int threshold = 3;     private int first;     private int last;      public ForkJoinExample(int first, int last) {         this.first = first;         this.last = last;     }      @Override     protected Integer compute() {         int result = 0;         if (last - first <= threshold) {             // 任务足够小则直接计算             for (int i = first; i <= last; i++) {                 result += i;             }             System.out.println(Thread.currentThread().getName() + ":∑(" + first + "," + last + ") = " + result);         } else {             // 拆分成小任务             int middle = first + (last - first) / 2;             ForkJoinExample leftTask = new ForkJoinExample(first, middle);             ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);             leftTask.fork();             rightTask.fork();             result = leftTask.join() + rightTask.join();             System.out.println(Thread.currentThread().getName() + ":"                     + "∑(" + first + "," + last + ") = "                     + "∑(" + first + "," + middle + ") + "                     + "∑(" + (middle + 1) + "," + last + ") = "                     + result);         }         return result;     }      public static void main(String[] args) throws ExecutionException, InterruptedException {         ForkJoinExample example = new ForkJoinExample(1, 10);         ForkJoinPool forkJoinPool = new ForkJoinPool();         Future result = forkJoinPool.submit(example);         System.out.println("result = " + result.get());     } }
ForkJoinPool-1-worker-2:∑(1,3) = 6 ForkJoinPool-1-worker-1:∑(4,5) = 9 ForkJoinPool-1-worker-2:∑(1,5) = ∑(1,3) + ∑(4,5) = 15 ForkJoinPool-1-worker-2:∑(9,10) = 19 ForkJoinPool-1-worker-3:∑(6,8) = 21 ForkJoinPool-1-worker-3:∑(6,10) = ∑(6,8) + ∑(9,10) = 40 ForkJoinPool-1-worker-1:∑(1,10) = ∑(1,5) + ∑(6,10) = 55 result = 55

ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。

ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率。每个线程都维护了一个双端队列,用来存储需要执行的任务。工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。

ForkJoinTask是RecursiveAction与RecursiveTask的父类, ForkJoinTask中使用了模板模式进行设计
,将ForkJoinTask的执行相关的代码进行隐藏,通过提供抽象类暴露用户的实际业务处理。

RecursiveTask

通过源码的查看我们可以发现RecursiveTask在进行exec之后会使用一个result的变量进行接受返回的结果。而result返回结果类型是通过泛型进行传入。也就是说RecursiveTask执行后是有返回结果。

RecursiveAction

RecursiveAction在exec后是不会保存返回结果,因此RecursiveAction与RecursiveTask区别在与RecursiveTask是有返回结果而RecursiveAction是没有返回结果。