parallelStream踩坑指南,出现null元素

发布时间:2023年12月23日

项目问题代码:
在这里插入图片描述在这里插入图片描述
问题:使用多线程给list添加元素,导致空指针问题

1. 使用parallelStream()出现的一些奇怪情形

有时候,为了使用多线程加快代码运行速度,我们会使用parallelStream()来代替stream(),我们先来看一段示例代码:

List<Integer> integerList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
		integerList.add(i);
}
List<Integer> list = new ArrayList<>();
integerList.parallelStream().forEach(list::add);
System.out.println(list);


    1
    2
    3
    4
    5
    6
    7 。。。

我们的预期是,输出的list能够是0-99的一共99个数字,顺序不限;

然而,人生就是这样,就连我们如此简单的预期,也往往无法得到满足。。。

经过多次运行代码,会发现一些很奇怪的现象:

  1. 输出的list的size()不符合预期,有时候是100,有时候是99,甚至是97等;
  2. 输出的list中有时含有null元素,数量不定,有时甚至达到十几个之多;
  3. 有时会出现IndexOutOfBounds异常;
  4. 由于以上问题的出现,可能会导致业务代码中出现NPE;

2. 原因探究

为什么会出现以上问题呢?我们来逐个分析一下各个问题出现的原因。

2.1. 输出的list的size()不符合预期

【现象】输出的list的size()不符合预期,有时候会比预想的要少,也就是出现了元素丢失的现象;

【原因】

    我们来看一下ArrayList的add方法:

       /**
        * Appends the specified element to the end of this list.
        *
        * @param e element to be appended to this list
        * @return <tt>true</tt> (as specified by {@link Collection#add})
        */
       public boolean add(E e) {
           ensureCapacityInternal(size + 1);  // Increments modCount!!
           elementData[size++] = e;
           return true;
       }
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11

    add方法是分两步进行的,第一步是通过ensureCapacityInternal(size + 1); 进行扩容,第二步是通过elementData[size++] = e;添加新元素。在添加新元素时,先读取size的值,然后执行elementData[size] = e;,将e添加到size的位置,在执行size++;,有三个步骤,并不是原子操作。

    因此存在内存可见性问题。当线程 A 从内存读取 size 后,可能这是还没来得及继续执行,线程B就迅速地从内存中读取了size,并且将5写入到了size处,然后size++,然后线程A才将6写入到了size处,将 size 加 1,然后写入内存。在这种情况下,线程B的更新就丢失了,出现了元素丢失的现象。

2.2. 输出的list中有时含有null元素

    【现象】输出的list中有时含有null元素,数量不定,有时甚至达到十几个之多;

    【原因】
        null 元素产生跟元素数据丢失类似,也是由于elementData[size++] = e;这一步并不是原子操作导致的。
        假设存在三个线程,线程A、线程B、线程C。三个线程同时开始执行,初始 size 值为 1。
        线程A首先读取size值为1,然后线程B读取size值为1,然后线程C读取size为1,然后线程B将数据添加到size位置,然后线程A将数据也添加到了size位置,覆盖了B的更新,然后线程A将size更新为2;然后线程B将size更新为3;然后线程C将数据更新到size也就是3的位置,然后将size更新为4;这样2的位置就是null了。

2.3. 有时会出现IndexOutOfBounds异常;

    【现象】有时会出现IndexOutOfBounds异常;

    【原因】
        由于ArrayList的add方法,第一步是通过ensureCapacityInternal(size + 1); 进行扩容,第二步是通过elementData[size++] = e;添加新元素。
        如果线程A已经进行了扩容,但还没添加新元素,此时线程B也进行了扩容(注意此时扩容是无效的,因为在线程B看来,目前的size还是原来的size),然后线程A读取size,将数据更新到size的位置,size++后结束;线程B读取size,发现已经超出了数组的界限,抛出IndexOutOfBounds异常;

3. 解决方法

当使用List.parallelStream()方法进行多线程处理时,可能会涉及到数据安全问题。下面是一些常见的方法来处理parallelStream()的多线程数据安全问题:

  1. 使用线程安全的集合:Java中提供了线程安全的集合类,如CopyOnWriteArrayList和synchronizedList等。可以将原始的List转换为线程安全的集合,以确保并发访问时的数据安全性。
    可以使用线程安全的List:
List<Integer> list = new CopyOnWriteArrayList<>();

List<Integer> list = new ArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(list);

synchronizedList.parallelStream().forEach(item -> {
    // 并发操作
});
  1. 使用同步块或锁:通过在并发访问的代码块周围使用synchronized关键字或显式锁(如ReentrantLock)来对共享数据进行同步,确保一次只有一个线程可以访问共享数据。
List<Integer> list = new ArrayList<>();
Object lock = new Object();

list.parallelStream().forEach(item -> {
    synchronized (lock) {
        // 同步操作
    }
});

// 或者使用显式锁
Lock lock = new ReentrantLock();
list.parallelStream().forEach(item -> {
    lock.lock();
    try {
        // 同步操作
    } finally {
        lock.unlock();
    }
});
  1. 避免修改原始数据:如果需要在并行处理期间修改数据,可以创建一个新的集合或使用线程安全的操作来确保原始数据不被意外更改。例如,可以使用collect()方法来收集结果到一个新的集合中。
List<Integer> list = new ArrayList<>();
List<Integer> result = list.parallelStream()
                          .map(item -> item * 2)
                          .collect(Collectors.toList());

注意:虽然上述方法有助于处理多线程数据安全问题,但并不意味着一定能解决所有情况下的并发问题。在并行处理数据时,仍需特别关注可能引发竞态条件、死锁、饥饿等并发问题,并根据具体情况进行合适的设计和调整。

另外,除了考虑数据安全性外,还应该权衡多线程带来的性能开销。在某些情况下,单线程处理可能比多线程处理更高效,尤其是在数据量较小或任务本身存在较多的I/O等待时间时。因此,在决定使用多线程处理时,应先进行性能评估和测试,以确保获得预期的结果。

原文链接:https://blog.csdn.net/itigoitie/article/details/128429395

文章来源:https://blog.csdn.net/dumuzhouguohe/article/details/135150147
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。