我们在之前文章: 集合之 Stream 流式操作 和?Stream流 collect() 方法的详细使用介绍 中使用到的示例,使用的都是串行的流,就是说在一个线程上执行的流。
?????? JDK8 还为我们提供了并行的 Stream 流,即多线程执行的流。这显然在效率方面就会有很大的提升了。接下来我们来个示例验证一下串行流的执行,是不是单线程运行。
?
/**
* 串行 Stream 流测试
*/
public static void main(String[] args) {
Stream<Integer> stream = Stream.of(1, 5, 6, 3, 21, 56);
long count = stream.filter(num -> {
System.out.println(Thread.currentThread().getName() + "--" + num);
return num > 3;
}).count();
}
测试结果:(我们发现都是在 main 主线程上执行的)
? 接下来,了解了串行 Stream? 之后,我们来了解一下并行 Stream 流。
//我们可以通过isParallel()方法来判断
public final boolean isParallel() {
return sourceStage.parallel;
}
/**
* 判断当前流是串行流、并行流
*/
public class isParallelDemo{
public static void main(String[] args) {
Stream<Integer> stream01 = Stream.of(4, 5, 6, 3, 21, 56);
System.out.println(stream01.isParallel()); //false
Stream<Integer> stream02 = Stream.of(4, 5, 6, 3, 21, 56).parallel();
System.out.println(stream02.isParallel()); //true
}
}
? ? ? ? 现在我们通过 ①for循环? ②串行流? ③并行流?的方式,来完成对 5亿条数据的求和操作,通过它们的耗时情况来了解这三种方式下的效率。
public class EffectiveDemo {
private int times = 500000000;
long start;
long end;
@Before
public void init(){
start = System.currentTimeMillis();
}
@After
public void destory(){
end = System.currentTimeMillis();
System.out.println("消耗时间:"+(end-start));
}
//for循环(求和5亿)
@Test
public void testFor(){
int sum = 0;
for (int i = 0; i < times; i++) {
sum += i;
}
System.out.println(sum);
}
//串行Stream流(求和5亿)
@Test
public void testStream(){
//5亿数字太大,所以此处使用 LongStream
//range():需要传入开始节点和结束节点两个参数,返回的是一个有序的LongStream.包含开始节点和结束节点两个参数之间所有的参数,间隔为1.
//rangeClosed():功能和range()类似.差别就是rangeClosed包含最后的结束节点,range()不包含。
LongStream.rangeClosed(0, times).reduce(0, Long::sum);
}
//并行Stream流(求和5亿)
@Test
public void testParallelStream() {
LongStream.rangeClosed(0, times).parallel().reduce(0, Long::sum);
}
}
?测试结果:(效率对比发现:并行Stream流<for循环<串行Stream流)
(for循环)消耗时间:130
(串行Stream流)消耗时间:649
(并行Stream流)消耗时间:85
? ?我们可以看到 并行Stream 的效率是最高的。Stream 流并行处理的过程会分而治之,也就是将一个大任务分成多个小任务,这表示每个任务都是一个操作。
? ? ? ? 使用多线程 Stream 流,接下来我们就需要考虑 并行Stream 流的线程安全问题了。
? ? ? ? 我们通过如下实例,来了解一下 parallel() 并行 Stream 流的 线程不安全问题。如下 案例,遍历 1000 个值,并依次将遍历后的结果存入 List 集合中。
/**
* 并行Stream流 线程不安全 案例
*/
@Test
public void parallelStreamNotice() throws InterruptedException {
ArrayList<Integer> list = new ArrayList<>();
IntStream.rangeClosed(1, 1000).parallel().forEach(i -> list.add(i));
System.out.println("list="+list.size());
}
? 测试结果:
list=865
? ? ? 我们返回的 list.size() 结果,根据实际情况,来说应该是 1000,然而返回的值却小于 1000。并且每次执行返回的结果还都是不一样的。这里就存在这线程安全性问题了
????? 针对如上情况,为什么会出现这种情况,我们先来分析一下。共总结出如下三种解决方案:
????? 1.①我们可以使用万能的 synchronized 同步代码块来解决线程安全性问题;②也可以使用 Collections 集合工具类,使用 synchronizedList() 方法,当我们传入一个线程不安全的 list 后,会给我们返回一个线程安全的 list ,然后我们便可以对线程安全的 list 进行操作,类似于 synchronized 同步代码块;
????? 2.我们分析发现 ArrayList 集合本身就是线程不安全的,所以我们可以使用线程安全的集合,比如:使用Vector来替换 ArrayList
????? 3.我们也可以调用 Stream 流的 collect()/toArray() 收集方法,它也会将集合变成线程安全的(此处会使用.boxed() 方法)
/**
* 解决 parallelStream 线程安全问题
*/
public class StreamSafeResolve {
@Test
public void parallelStreamNotice() throws InterruptedException {
//方案一:1.使用同步代码块
Object obj = new Object();
ArrayList<Integer> list = new ArrayList<>();
IntStream.rangeClosed(1, 1000).parallel().forEach((str)->{
synchronized (obj){
list.add(str);
}
}
);
System.out.println("list="+list.size());
//2.使用 Collections 集合工具类,有一个 synchronizedList() 方法,传入一个不安全的list,会返回一个线程安全的list。然后对线程安全的list进行操作
ArrayList<Integer> lists = new ArrayList<>();
List<Integer> list = Collections.synchronizedList(lists);
IntStream.rangeClosed(1, 1000).parallel().forEach((str)->{ list.add(str); });
System.out.println("list="+list.size());
//方案二:使用线程安全的集合类
//Vector是线程安全的集合
Vector<Integer> vector = new Vector<>();
IntStream.rangeClosed(1, 1000).parallel().forEach((str)->{ vector.add(str); });
System.out.println("vector="+vector.size());
//方案三:调用 Stream 流的 collect()/toArray()收集方法。它也会变成线程安全的
List<Integer> list = IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList());
System.out.println("list="+list.size());
}
}
?????? 使用之所以执行效率高,因为它底层使用的是 Fork/Join 框架。Fork/Join 框架 是在 JDK 7 中引入的,Fork/Join 框架可以将一个大任务拆分为很多小任务来异步执行。
? 1, parallel 并行 Stream 流是线程不安全的;
? 2, parallel 并行 Stream 流使用的场景是 CPU 密集型的,只是做到别浪费 CPU,假如本身电脑的 CPU 的负载很大,那还到处用并行流,那并不能起到作用;
? 3,I/O 密集型、磁盘I/O、网络I/O 都属于 I/O 操作,这部分操作时较少消耗 CPU 资源,一般并行流中不适用于 I/O密集型的操作,就比如使用并行流进行大批量的消息推送,涉及到了大量 I/O,使用并行流反而慢了很多;
?4, 在使用并行流的时候,是无法保证元素的顺序的,也就是即使你使用了同步集合也只能保证元素都正确,但无法保证其中的顺序。