并行流是Java中Stream API的一部分,用于在多核处理器上并行执行流操作。在Java 8及更高版本中引入的Stream API提供了一种新的处理集合的方式,使得开发者能够更轻松地进行数据处理和操作。
在使用Stream API时,可以将集合转换为流,然后进行各种操作,例如过滤、映射、排序等。在这个过程中,流可以是串行流(Sequential Stream)或并行流(Parallel Stream)。
并行流通过使用多线程并行处理数据,充分利用多核处理器的优势,从而在某些情况下提高处理速度。使用并行流非常简单,只需在流上调用 parallel() 方法即可将其转换为并行流。其实本质上是使用线程池 FrorkJoinPool。
本文后面将通过测试来说明实际应用场景应该如何选择并行流。但是可能我的测试有局限性,如果有任何错误欢迎大家指正。
使用方法就是调用集合的 parallelStream() 方法转为并行流,然后再使用 stream 执行后续计算就是并行计算(使用线程池)。
List<String> list = new ArrayList<>();
Stream<String> stream = List.parallelStream()
使用 parallelStream 对集合进行并行操作,类似于普通的串行流操作,主要方法如下:
forEach 方法,对流中的每个元素执行指定的操作,可以并行执行。如下调用 System.out.println 输出显示 list 每个元素。
List<String> list = Arrays.asList("apple", "banana", "orange");
list.parallelStream().forEach(System.out::println); // System.out::println 是 System.out.println 的 Lambda 写法
filter 方法,使用给定的条件过滤流中的元素。如下筛选出偶数并输出显示。
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers.parallelStream().filter(n -> n % 2 == 0).forEach(System.out::println);// n -> n % 2 == 0 是 Lambda 写法,相当于一个函数 bool f(int n){ return n % 2 == 0; }
map 方法,将流中的每个元素映射到另一个值。如下把每个元素转成大写。
List<String> words = Arrays.asList("apple", "banana", "orange");
words.parallelStream().map(String::toUpperCase).forEach(System.out::println);
reduce 方法,使用给定的累积函数进行归约操作。如下,进行求和操作
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int sum = numbers.parallelStream().reduce(0, Integer::sum);
System.out.println("求合结果: " + sum);
collect 方法,将流中的元素收集到一个集合中。如下将并行计算结果存储到 uppercaseWords 变量中。
List<String> words = Arrays.asList("apple", "banana", "orange");
List<String> uppercaseWords = words.parallelStream().map(String::toUpperCase).collect(Collectors.toList());
anyMatch 和 allMatch 方法
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
boolean anyEven = numbers.parallelStream().anyMatch(n -> n % 2 == 0); // 只要有一个是偶数就是为真
boolean allEven = numbers.parallelStream().allMatch(n -> n % 2 == 0); // 必须全部是偶数才为真
这些方法在串行流中可以使用用,在并行流中可以充分发挥多核处理器的优势。
请注意,使用并行流时需要确保操作是无状态的,以避免并发问题。在实际应用中,可以根据任务的性质和数据规模选择适当的流类型。
下面是一个简单的示例,演示集合并行流的使用
private static void test1() {
List<String> list = Arrays.asList("apple", "banana", "orange", "grape", "peach");
// 串行流操作(单线程,效率低)
list.stream()
.map(String::toUpperCase)
.forEach(System.out::println);
System.out.println("---------");
// 并行流操作(多线程,效率高)
list.parallelStream()
.map(String::toUpperCase)
.forEach(System.out::println);
}
在这个示例中,关键是看 list.parallelStream(),使用他创建了一个并行流。然后使用 map
方法将元素转换为大写,然后通过 forEach 方法打印出来。
- String::toUpperCase 是 Lambda 写法(同 “xxxx”.toUpperCase ),结合 map ,相当于对集合中的每一个元素进行一次 toUpperCase (转大写)
- System.out::println 也是 Lambda 写法(同 System.out.println(“xxxx”) ),结合 forEach,相当于对集合中的每个元素进行打印显示。
并不是所有情况下都应该使用并行流,因为在某些场景下,多线程的开销可能会超过并行执行所带来的性能提升。选择使用串行流还是并行流应该根据实际情况和性能测试来进行权衡。在某些情况下,使用并行流可以有效提高处理速度,但在其他情况下,串行流可能更为合适。
在接下的示例中我,准备随机生成 1 千万个随机数,然后第使用 for 循环、串行流、并行流这三种方法找出数字为 250 的数。以此来检查运行效率。
生成数据代码
private static long DATA_COUNT = 1000_0000;
List<Double> list = new ArrayList<>();
for (int i = 0; i < DATA_COUNT; i++) {
list.add(Math.random() * 1000 % 500);
}
for 循环查找代码
List<Double> result = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
//sleep(1);
Double val = list.get(i);
if(val > 250 && val < 251){
result.add(val);
}
}
串行流筛选代码
Stream<Double> result = list.stream().filter((val) -> {
//sleep(1);
return val > 250 && val < 251;
});
并行流筛选代码
Stream<Double> result = list.parallelStream().filter((val) -> {
//sleep(1);
return val > 250 && val < 251;
});
数据量1万时测试结果(DATA_COUNT = 1_0000)
for 循环单线程查找,共用时(ms):3
串行流单线程查找,共用时(ms):4
并行流多线程查找,共用时(ms):4
数据量100万时测试结果(DATA_COUNT = 100_0000)
for 循环单线程查找,共用时(ms):40
串行流单线程查找,共用时(ms):28
并行流多线程查找,共用时(ms):45
数据量1000万时测试结果(DATA_COUNT = 1000_0000)
for 循环单线程查找,共用时(ms):181
串行流单线程查找,共用时(ms):145
并行流多线程查找,共用时(ms):161
通过以上测试发现,在处理简单任务计算时,这三种方式并与多大差别。但是简单任务更推荐使用串行流。
通过修改以下数字修改数据量
private static long DATA_COUNT = 1000_0000;
注意上面的代码,每个计算中都有一个 sleep(1)
方法,现在我们使用这个方法来模拟复杂任务处理(如访问数据库、读写文件、访问网络等)进行测试。
数据量1千时测试结果(DATA_COUNT = 1000)
for 循环单线程查找,共用时(ms):1608
串行流单线程查找,共用时(ms):1626
并行流多线程查找,共用时(ms):165
数据量1万时测试结果(DATA_COUNT = 1_0000)
for 循环单线程查找,共用时(ms):16302
串行流单线程查找,共用时(ms):16965
并行流多线程查找,共用时(ms):2126
通过以上测试发现,在处理复杂任务计算时,并行流表现十分优越。所以复杂任务推荐使用并行流。
以下完整测试代码
package top.yiqifu.study.p004_thread;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Stream;
// 包装类
public class Test131_ParallelStream {
private static long DATA_COUNT = 10000;
public static void main(String[] args) {
test1();
List<Double> list = new ArrayList<>();
for (int i = 0; i < DATA_COUNT; i++) {
list.add(Math.random() * 1000 % 500);
}
long ctime = test2(list);
long stime = test3(list);
long mtime = test4(list);
System.out.println("for 循环单线程查找,共用时(ms):"+(ctime));
System.out.println("串行流单线程查找,共用时(ms):"+(stime));
System.out.println("并行流多线程查找,共用时(ms):"+(mtime));
}
private static void test1() {
List<String> list = Arrays.asList("apple", "banana", "orange", "grape", "peach");
// 串行流操作(单线程)
list.stream()
.map(String::toUpperCase)
.forEach(System.out::println);
System.out.println("---------");
// 并行流操作(多线程)
list.parallelStream()
.map(String::toUpperCase)
.forEach(System.out::println);
}
private static long test2(List<Double> list) {
long startTime = System.currentTimeMillis();
List<Double> result = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
sleep(1);
Double val = list.get(i);
if(val > 250 && val < 251){
result.add(val);
}
}
for(Double val : result){
System.out.println(val);
};
long endTime = System.currentTimeMillis();
return endTime - startTime;
}
private static long test3(List<Double> list) {
long startTime = System.currentTimeMillis();
Stream<Double> result = list.stream().filter((val) -> {
sleep(1);
return val > 250 && val < 251;
});
result.forEach(val->{
System.out.println(val);
});
long endTime = System.currentTimeMillis();
return endTime - startTime;
}
private static long test4(List<Double> list) {
long startTime = System.currentTimeMillis();
Stream<Double> result = list.parallelStream().filter((val) -> {
sleep(1);
return val > 250 && val < 251;
});
result.forEach(val->{
System.out.println(val);
});
long endTime = System.currentTimeMillis();
return endTime - startTime;
}
private static void sleep(long ms){
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}