目录
? ? 在 JDK 中,提供了这样一种功能:它能够将复杂的逻辑拆分成一个个简单的逻辑来并行执行,待每个并行执行的逻辑执行完成后,再将各个结果进行汇总,得出最终的结果数据。有点像Hadoop 中的 MapReduce。
? ? ForkJoin 是由 JDK1.7 之后提供的多线程并发处理框架。ForkJoin 框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进行汇总。相应的,ForkJoin 将复杂的计算当做一个任务,而分解的多个计算则是当做一个个子任务来并行执行。
并发和并行在本质上还是有所区别的。
? ? 并发指的是在同一时刻,只有一个线程能够获取到 CPU 执行任务,而多个线程被快速的轮换执行,这就使得在宏观上具有多个线程同时执行的效果,并发不是真正的同时执行,并发可以使用下图表示:
? ? 并行指的是无论何时,多个线程都是在多个 CPU 核心上同时执行的,是真正的同时执行。
? ? 把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解得到原问题的解。在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题。
? ? 步骤可分为:1.?分割原问题;2.?求解子问题;3.?合并子问题的解为原问题的解。我们可以使用如下伪代码来表示这个步骤:
if (任务很小){
直接计算得到结果
} else {
分拆成N个子任务
调用子任务的fork()进行计算
调用子任务的join()合并计算结果
}
典型应用有:二分搜索、大整数乘法、Strassen矩阵乘法、棋盘覆盖、合并排序、快速排序、线性时间选择、汉诺塔。
? ? Java 1.7 引入了一种新的并发框架 —— Fork/Join Framework,主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数。
? ? ForkJoin 框架的本质是一个用于并行执行任务的框架,能够把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务的计算结果。在 Java 中,ForkJoin 框架与 ThreadPool 共存,并不是要替换 ThreadPool。
? ? 其实,在 Java 8 中引入的并行流计算,内部就是采用的 ForkJoinPool 来实现的。例如,下面使用并行流实现打印数组元组的程序:
public class SumArray {
public static void main(String[] args) {
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
numberList.parallelStream().forEach(System.out::println);
}
}
说到这里,可能有读者会问:可以使用线程池的 ThreadPoolExecutor 来实现啊?为什么要使用ForkJoinPool??接下来,我们就来回答这个问题。
? ? ForkJoin 框架是从 JDK1.7 中引入的新特性,它同 ThreadPoolExecutor 一样,也实现了Executor 和 ExecutorService 接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入指定的线程数量,那么当前计算机可用的 CPU 数量会被设置为线程数量作为默认值。
? ? ForkJoinPool 主要使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool 能够使用相对较少的线程来处理大量的任务。
? ? 比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。
? ? 比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概200万+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。
? ? 所以当使用 ThreadPoolExecutor 时,使用分治法会存在问题,因为 ThreadPoolExecutor 中的线程无法向任务队列中再添加一个任务并在等待该任务完成之后再继续执行。而使用 ForkJoinPool就能够解决这个问题,它就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。
那么使用 ThreadPoolExecutor 或者 ForkJoinPool,性能上会有什么差异呢?
? ? 首先,使用 ForkJoinPool 能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用 ThreadPoolExecutor 时,是不可能完成的,因为 ThreadPoolExecutor 中的 Thread 无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,很显然这是不可行的,也是很不合理的!
? ? 假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如 A 线程负责处理 A 队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点:
充分利用线程进行并行计算,并减少了线程间的竞争。
工作窃取算法的缺点:
在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。
ForkJoin 框架中一些重要的类如下所示:
? ? 既然任务是被逐渐的细化的,那就需要把这些任务存在一个池子里面,这个池子就是ForkJoinPool,它与其它的 ExecutorService 区别主要在于它使用“工作窃取”。由类图可以看出,ForkJoinPool 类实现了线程池的 Executor接口。我们还可以使用 Executors.newWorkStealPool() 方法来创建 ForkJoinPool。
ForkJoinPool 中提供了如下提交任务的方法:
public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)
实现 ForkJoin 框架中的线程。
? ? ForkJoinTask 封装了数据及其相应的计算,并且支持细粒度的数据并行。ForkJoinTask 比线程要轻量,ForkJoinPool 中少量工作线程能够运行大量的 ForkJoinTask。ForkJoinTask 类中主要包括两个方法 fork() 和 join(),分别实现任务的分拆与合并。
? ? fork()?方法类似于 Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中。跟Thread.join() 方法不同,ForkJoinTask 的 join() 方法并不简单的阻塞线程,而是利用工作线程运行其他任务,当一个工作线程中调用 join(),它将处理其他任务,直到注意到目标子任务已经完成。
我们可以使用下图来表示这个过程:
ForkJoinTask有3个子类:
package com.lm.concurrency.example;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
// 生成一个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
// 执行一个任务
Future<Integer> result = forkjoinPool.submit(task);
try {
System.out.println("result: " + result.get());
} catch (Exception e) {
System.out.println(e);
}
}
}
Fork/Join 框架局限性:
对于 Fork/Join 框架而言,当一个任务正在等待它使用 Join 操作创建的子任务结束时,执行这个任务的工作线程查找其他未被执行的任务,并开始执行这些未被执行的任务,通过这种方式,线程充分利用它们的运行时间来提高应用程序的性能。为了实现这个目标,Fork/Join 框架执行的任务有一些局限性。