JDK8新特性(四):并行 Stream 流的使用

发布时间:2024年01月23日

我们在之前文章: 集合之 Stream 流式操作 和?Stream流 collect() 方法的详细使用介绍 中使用到的示例,使用的都是串行的流,就是说在一个线程上执行的流。

?????? JDK8 还为我们提供了并行的 Stream 流,即多线程执行的流。这显然在效率方面就会有很大的提升了。接下来我们来个示例验证一下串行流的执行,是不是单线程运行。
?

    /**
     * 串行 Stream 流测试
     */
    public static void main(String[] args) {
        Stream<Integer> stream = Stream.of(1, 5, 6, 3, 21, 56);
        long count = stream.filter(num -> {
            System.out.println(Thread.currentThread().getName() + "--" + num);
            return num > 3;
        }).count();
    }

测试结果:(我们发现都是在 main 主线程上执行的)

? 接下来,了解了串行 Stream? 之后,我们来了解一下并行 Stream 流。

1.并行 Stream 流

?1.并行 Stream 流的获取方式(两种)

    //我们可以通过isParallel()方法来判断
    public final boolean isParallel() {
        return sourceStage.parallel;
    }
/**
 * 判断当前流是串行流、并行流
 */
public class isParallelDemo{
    public static void main(String[] args) {
        Stream<Integer> stream01 = Stream.of(4, 5, 6, 3, 21, 56);
        System.out.println(stream01.isParallel()); //false
        Stream<Integer> stream02 = Stream.of(4, 5, 6, 3, 21, 56).parallel();
        System.out.println(stream02.isParallel()); //true
    }
}

?3.串行流、并行流、for循环 求和效率对比

? ? ? ? 现在我们通过 ①for循环? ②串行流? ③并行流?的方式,来完成对 5亿条数据的求和操作,通过它们的耗时情况来了解这三种方式下的效率。

public class EffectiveDemo {
    private int times = 500000000;
    long start;
    long end;
    @Before
    public void init(){
        start = System.currentTimeMillis();
    }

    @After
    public void destory(){
        end = System.currentTimeMillis();
        System.out.println("消耗时间:"+(end-start));
    }

    //for循环(求和5亿)
    @Test
    public void testFor(){
        int sum = 0;
        for (int i = 0; i < times; i++) {
            sum += i;
        }
        System.out.println(sum);
    }

    //串行Stream流(求和5亿)
    @Test
    public void testStream(){
        //5亿数字太大,所以此处使用 LongStream
        //range():需要传入开始节点和结束节点两个参数,返回的是一个有序的LongStream.包含开始节点和结束节点两个参数之间所有的参数,间隔为1.
        //rangeClosed():功能和range()类似.差别就是rangeClosed包含最后的结束节点,range()不包含。
        LongStream.rangeClosed(0, times).reduce(0, Long::sum);
    }

    //并行Stream流(求和5亿)
    @Test
    public void testParallelStream() {
        LongStream.rangeClosed(0, times).parallel().reduce(0, Long::sum);
    }
}

?测试结果:(效率对比发现:并行Stream流<for循环<串行Stream流)

(for循环)消耗时间:130
(串行Stream流)消耗时间:649
(并行Stream流)消耗时间:85

? ?我们可以看到 并行Stream 的效率是最高的。Stream 流并行处理的过程会分而治之,也就是将一个大任务分成多个小任务,这表示每个任务都是一个操作。

? ? ? ? 使用多线程 Stream 流,接下来我们就需要考虑 并行Stream 流的线程安全问题了。

? 4.并行 Stream 流?线程不安全问题

? ? ? ? 我们通过如下实例,来了解一下 parallel() 并行 Stream 流的 线程不安全问题。如下 案例,遍历 1000 个值,并依次将遍历后的结果存入 List 集合中。

    /**
     * 并行Stream流 线程不安全 案例
     */
    @Test
    public void parallelStreamNotice() throws InterruptedException {
        ArrayList<Integer> list = new ArrayList<>();
        IntStream.rangeClosed(1, 1000).parallel().forEach(i -> list.add(i));
        System.out.println("list="+list.size());
    }

? 测试结果:

list=865

? ? ? 我们返回的 list.size() 结果,根据实际情况,来说应该是 1000,然而返回的值却小于 1000。并且每次执行返回的结果还都是不一样的。这里就存在这线程安全性问题了

?5.并行Stream流线程安全问题解决

????? 针对如上情况,为什么会出现这种情况,我们先来分析一下。共总结出如下三种解决方案:

????? 1.①我们可以使用万能的 synchronized 同步代码块来解决线程安全性问题;②也可以使用 Collections 集合工具类,使用 synchronizedList() 方法,当我们传入一个线程不安全的 list 后,会给我们返回一个线程安全的 list ,然后我们便可以对线程安全的 list 进行操作,类似于 synchronized 同步代码块;

????? 2.我们分析发现 ArrayList 集合本身就是线程不安全的,所以我们可以使用线程安全的集合,比如:使用Vector来替换 ArrayList

????? 3.我们也可以调用 Stream 流的 collect()/toArray() 收集方法,它也会将集合变成线程安全的(此处会使用.boxed() 方法)

/**
 * 解决 parallelStream 线程安全问题
 */
public class StreamSafeResolve {
 
    @Test
    public void parallelStreamNotice() throws InterruptedException {
        
        //方案一:1.使用同步代码块
        Object obj = new Object();
        ArrayList<Integer> list = new ArrayList<>();
        IntStream.rangeClosed(1, 1000).parallel().forEach((str)->{
                synchronized (obj){
                    list.add(str);
                }
            }
        );
        System.out.println("list="+list.size());
 
        //2.使用 Collections 集合工具类,有一个 synchronizedList() 方法,传入一个不安全的list,会返回一个线程安全的list。然后对线程安全的list进行操作
        ArrayList<Integer> lists = new ArrayList<>();
        List<Integer> list = Collections.synchronizedList(lists);
        IntStream.rangeClosed(1, 1000).parallel().forEach((str)->{ list.add(str); });
        System.out.println("list="+list.size());
 
        //方案二:使用线程安全的集合类
        //Vector是线程安全的集合
        Vector<Integer> vector = new Vector<>();
        IntStream.rangeClosed(1, 1000).parallel().forEach((str)->{ vector.add(str); });
        System.out.println("vector="+vector.size());
 
       //方案三:调用 Stream 流的 collect()/toArray()收集方法。它也会变成线程安全的
        List<Integer> list = IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList());
        System.out.println("list="+list.size());
    }
}

2.并行 Stream 流底层原理分析

?????? 使用之所以执行效率高,因为它底层使用的是 Fork/Join 框架。Fork/Join 框架 是在 JDK 7 中引入的,Fork/Join 框架可以将一个大任务拆分为很多小任务来异步执行。

3.并行 Stream 流总结

? 1, parallel 并行 Stream 流是线程不安全的;
? 2, parallel 并行 Stream 流使用的场景是 CPU 密集型的,只是做到别浪费 CPU,假如本身电脑的 CPU 的负载很大,那还到处用并行流,那并不能起到作用;
? 3,I/O 密集型、磁盘I/O、网络I/O 都属于 I/O 操作,这部分操作时较少消耗 CPU 资源,一般并行流中不适用于 I/O密集型的操作,就比如使用并行流进行大批量的消息推送,涉及到了大量 I/O,使用并行流反而慢了很多;
?4, 在使用并行流的时候,是无法保证元素的顺序的,也就是即使你使用了同步集合也只能保证元素都正确,但无法保证其中的顺序。

文章来源:https://blog.csdn.net/weixin_58216062/article/details/135752830
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。