Fork/Join 是一种在多线程领域中常用的算法或技术,它的核心思想是将大任务分割成若干个小任务,然后将这些小任务分配给多个线程并行处理,最终将结果合并起来。这种思想可以应用于多种场景,例如图像处理、批处理、并行排序等。
在 Java 中,Fork/Join 这种思想被封装在了 java.util.concurrent 包中的 ForkJoinPool 类和 RecursiveTask 类中。ForkJoinPool 类是一个线程池,用于管理多个线程的执行,而 RecursiveTask 类则是一个抽象类,用于定义可分解的任务。通过使用这些类,开发者可以非常方便地实现 Fork/Join 的并行计算功能,从而提高应用程序的性能和效率。
总之,Fork/Join 并不是一个框架,而是一种并发编程技术,它可以帮助开发者实现高效的并行计算,并发挥多核 CPU 的计算能力。
Fork/Join 框架提供了一些核心的方法来支持任务的分解和合并,下面我会对这些方法进行理论讲解:
这些方法和概念是 Fork/Join 框架中非常重要的部分,它们通过任务的分解、合并和工作窃取机制,实现了高效的并行计算。理解并熟练使用这些方法可以帮助开发者更好地利用 Fork/Join 框架来处理并行计算任务。
ForkjoinDemo.java
package org.Test6;
import java.util.concurrent.RecursiveTask;
public class ForkjoinDemo extends RecursiveTask<Long> {
private Long start; // 1
private Long end; // 1990900000
// 临界值
private Long temp = 10000L;
public ForkjoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
// 计算方法
@Override
protected Long compute() {
if ((end - start) < temp) {
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else { // forkjoin 递归
long middle = (start + end) / 2; // 中间值
ForkjoinDemo task1 = new ForkjoinDemo(start, middle);
task1.fork(); // 拆分任务,把任务压入线程队列
ForkjoinDemo task2 = new ForkjoinDemo(middle + 1, end);
task2.fork(); // 拆分任务,把任务压入线程队列
return task1.join() + task2.join();
}
}
}
Test.java
package org.Test6;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
public class Test {
public static void test1() {
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + " 时间:" + (end - start));
}
// 会使用ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkjoinDemo(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任务
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + " 时间:" + (end - start));
}
public static void test3() {
long start = System.currentTimeMillis();
// Stream并行流
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + " 时间:" + (end - start));
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
test2();
test3();
}
}
在技术的道路上,我们不断探索、不断前行,不断面对挑战、不断突破自我。科技的发展改变着世界,而我们作为技术人员,也在这个过程中书写着自己的篇章。让我们携手并进,共同努力,开创美好的未来!愿我们在科技的征途上不断奋进,创造出更加美好、更加智能的明天!