Java 多线程之并行流(parallelStream)

发布时间:2023年12月22日

一、概述

  • 并行流是Java中Stream API的一部分,用于在多核处理器上并行执行流操作。在Java 8及更高版本中引入的Stream API提供了一种新的处理集合的方式,使得开发者能够更轻松地进行数据处理和操作。

  • 在使用Stream API时,可以将集合转换为流,然后进行各种操作,例如过滤、映射、排序等。在这个过程中,流可以是串行流(Sequential Stream)或并行流(Parallel Stream)。

  • 并行流通过使用多线程并行处理数据,充分利用多核处理器的优势,从而在某些情况下提高处理速度。使用并行流非常简单,只需在流上调用 parallel() 方法即可将其转换为并行流。其实本质上是使用线程池 FrorkJoinPool

  • 本文后面将通过测试来说明实际应用场景应该如何选择并行流。但是可能我的测试有局限性,如果有任何错误欢迎大家指正。

二、使用方法

2.1 parallelStream 使用

  • 使用方法就是调用集合的 parallelStream() 方法转为并行流,然后再使用 stream 执行后续计算就是并行计算(使用线程池)。

    List<String> list = new ArrayList<>();
    Stream<String> stream =  List.parallelStream()
    

2.2 Stream 方法介绍

  • 使用 parallelStream 对集合进行并行操作,类似于普通的串行流操作,主要方法如下:

  • forEach 方法,对流中的每个元素执行指定的操作,可以并行执行。如下调用 System.out.println 输出显示 list 每个元素。

    List<String> list = Arrays.asList("apple", "banana", "orange");
    list.parallelStream().forEach(System.out::println); // System.out::println 是 System.out.println 的 Lambda 写法
    
  • filter 方法,使用给定的条件过滤流中的元素。如下筛选出偶数并输出显示。

    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    numbers.parallelStream().filter(n -> n % 2 == 0).forEach(System.out::println);// n -> n % 2 == 0 是 Lambda 写法,相当于一个函数 bool f(int n){ return n % 2 == 0; }
    
  • map 方法,将流中的每个元素映射到另一个值。如下把每个元素转成大写。

    List<String> words = Arrays.asList("apple", "banana", "orange");
    words.parallelStream().map(String::toUpperCase).forEach(System.out::println);
    
  • reduce 方法,使用给定的累积函数进行归约操作。如下,进行求和操作

    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
    int sum = numbers.parallelStream().reduce(0, Integer::sum);
    System.out.println("求合结果: " + sum);
    
  • collect 方法,将流中的元素收集到一个集合中。如下将并行计算结果存储到 uppercaseWords 变量中。

    List<String> words = Arrays.asList("apple", "banana", "orange");
    List<String> uppercaseWords = words.parallelStream().map(String::toUpperCase).collect(Collectors.toList());
    
  • anyMatch 和 allMatch 方法

    • anyMatch 用于判断流中是否有任意一个元素匹配给定条件
    • allMatch 用于判断流中的所有元素是否都匹配给定条件。
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
    boolean anyEven = numbers.parallelStream().anyMatch(n -> n % 2 == 0); // 只要有一个是偶数就是为真
    boolean allEven = numbers.parallelStream().allMatch(n -> n % 2 == 0); // 必须全部是偶数才为真
    

    这些方法在串行流中可以使用用,在并行流中可以充分发挥多核处理器的优势。

    请注意,使用并行流时需要确保操作是无状态的,以避免并发问题。在实际应用中,可以根据任务的性质和数据规模选择适当的流类型。

2.3 简单示例

  • 下面是一个简单的示例,演示集合并行流的使用

        private static void  test1()  {
            List<String> list = Arrays.asList("apple", "banana", "orange", "grape", "peach");
    
            // 串行流操作(单线程,效率低)
            list.stream()
                    .map(String::toUpperCase)
                    .forEach(System.out::println);
    
            System.out.println("---------");
    
            // 并行流操作(多线程,效率高)
            list.parallelStream()
                    .map(String::toUpperCase)
                    .forEach(System.out::println);
        }
    
  • 在这个示例中,关键是看 list.parallelStream(),使用他创建了一个并行流。然后使用 map 方法将元素转换为大写,然后通过 forEach 方法打印出来。

    • String::toUpperCase 是 Lambda 写法(同 “xxxx”.toUpperCase ),结合 map ,相当于对集合中的每一个元素进行一次 toUpperCase (转大写)
    • System.out::println 也是 Lambda 写法(同 System.out.println(“xxxx”) ),结合 forEach,相当于对集合中的每个元素进行打印显示。
  • 并不是所有情况下都应该使用并行流,因为在某些场景下,多线程的开销可能会超过并行执行所带来的性能提升。选择使用串行流还是并行流应该根据实际情况和性能测试来进行权衡。在某些情况下,使用并行流可以有效提高处理速度,但在其他情况下,串行流可能更为合适。

三、应用示例

3.1 示例介绍

  • 在接下的示例中我,准备随机生成 1 千万个随机数,然后第使用 for 循环、串行流、并行流这三种方法找出数字为 250 的数。以此来检查运行效率。

  • 生成数据代码

    private static long DATA_COUNT  = 1000_0000;
    
    List<Double> list = new ArrayList<>();
    for (int i = 0; i < DATA_COUNT; i++) {
        list.add(Math.random() * 1000 % 500);
    }
    
  • for 循环查找代码

    List<Double> result = new ArrayList<>();
    for (int i = 0; i < list.size(); i++) {
        //sleep(1);
        Double val = list.get(i);
        if(val > 250 && val < 251){
            result.add(val);
        }
    }
    
  • 串行流筛选代码

    Stream<Double> result = list.stream().filter((val) -> {
        //sleep(1);
        return val > 250 && val < 251;
    });
    
  • 并行流筛选代码

    Stream<Double> result = list.parallelStream().filter((val) -> {
        //sleep(1);
        return val > 250 && val < 251;
    });
    

3.2 简单任务测试结果

  • 数据量1万时测试结果(DATA_COUNT = 1_0000)

    for 循环单线程查找,共用时(ms):3
    串行流单线程查找,共用时(ms):4
    并行流多线程查找,共用时(ms):4

  • 数据量100万时测试结果(DATA_COUNT = 100_0000)

    for 循环单线程查找,共用时(ms):40
    串行流单线程查找,共用时(ms):28
    并行流多线程查找,共用时(ms):45

  • 数据量1000万时测试结果(DATA_COUNT = 1000_0000)

    for 循环单线程查找,共用时(ms):181
    串行流单线程查找,共用时(ms):145
    并行流多线程查找,共用时(ms):161

  • 通过以上测试发现,在处理简单任务计算时,这三种方式并与多大差别。但是简单任务更推荐使用串行流

  • 通过修改以下数字修改数据量

    private static long DATA_COUNT  = 1000_0000;
    

3.3 复杂任务测试结果

  • 注意上面的代码,每个计算中都有一个 sleep(1)方法,现在我们使用这个方法来模拟复杂任务处理(如访问数据库、读写文件、访问网络等)进行测试。

  • 数据量1千时测试结果(DATA_COUNT = 1000)

    for 循环单线程查找,共用时(ms):1608
    串行流单线程查找,共用时(ms):1626
    并行流多线程查找,共用时(ms):165

  • 数据量1万时测试结果(DATA_COUNT = 1_0000)

    for 循环单线程查找,共用时(ms):16302
    串行流单线程查找,共用时(ms):16965
    并行流多线程查找,共用时(ms):2126

  • 通过以上测试发现,在处理复杂任务计算时,并行流表现十分优越。所以复杂任务推荐使用并行流

3.4 结论

  • 简单任务推荐使用串行流
  • 复杂任务推荐使用并行流
  • 简单任务你可以理解为你要执行的任务占用CPU时间小于1ms,反之复杂任务就是大于1ms。

四、完整示例

  • 以下完整测试代码

    package top.yiqifu.study.p004_thread;
    
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.List;
    import java.util.concurrent.*;
    import java.util.stream.Stream;
    
    // 包装类
    public class Test131_ParallelStream {
    
        private static long DATA_COUNT  = 10000;
    
        public static void main(String[] args) {
            test1();
    
            List<Double> list = new ArrayList<>();
            for (int i = 0; i < DATA_COUNT; i++) {
                list.add(Math.random() * 1000 % 500);
            }
    
            long ctime = test2(list);
            long stime = test3(list);
            long mtime = test4(list);
    
            System.out.println("for 循环单线程查找,共用时(ms):"+(ctime));
            System.out.println("串行流单线程查找,共用时(ms):"+(stime));
            System.out.println("并行流多线程查找,共用时(ms):"+(mtime));
        }
    
        private static void  test1()  {
            List<String> list = Arrays.asList("apple", "banana", "orange", "grape", "peach");
    
            // 串行流操作(单线程)
            list.stream()
                    .map(String::toUpperCase)
                    .forEach(System.out::println);
    
            System.out.println("---------");
    
            // 并行流操作(多线程)
            list.parallelStream()
                    .map(String::toUpperCase)
                    .forEach(System.out::println);
        }
    
        private static long  test2(List<Double> list)  {
    
            long startTime = System.currentTimeMillis();
    
            List<Double> result = new ArrayList<>();
            for (int i = 0; i < list.size(); i++) {
                sleep(1);
                Double val = list.get(i);
                if(val > 250 && val < 251){
                    result.add(val);
                }
            }
    
            for(Double val : result){
                System.out.println(val);
            };
            long endTime = System.currentTimeMillis();
    
            return endTime - startTime;
        }
    
    
        private static long  test3(List<Double> list)  {
    
            long startTime = System.currentTimeMillis();
    
            Stream<Double> result = list.stream().filter((val) -> {
                sleep(1);
                return val > 250 && val < 251;
            });
    
    
            result.forEach(val->{
                System.out.println(val);
            });
            long endTime = System.currentTimeMillis();
    
            return endTime - startTime;
        }
    
        private static long  test4(List<Double> list)  {
    
            long startTime = System.currentTimeMillis();
    
            Stream<Double> result = list.parallelStream().filter((val) -> {
                sleep(1);
                return val > 250 && val < 251;
            });
    
    
            result.forEach(val->{
                System.out.println(val);
            });
            long endTime = System.currentTimeMillis();
    
            return endTime - startTime;
        }
    
    
        private static void sleep(long ms){
            try {
                Thread.sleep(ms);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    
文章来源:https://blog.csdn.net/qifu123/article/details/135158602
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。