Java并行流parallelStream()下InheritableThreadLocal引起的问题
引起问题的代码。
List orgs00 = Arrays.asList(
new Org("aaa"),
new Org("bbb"),
new Org("aa0")
);
List orgs = orgs00.parallelStream()
.map(org -> {
// 模拟从数据库中获取 Org 对象的操作
// 这里可以是实际的数据库查询操作
ConditionImpl cmd = new ConditionImpl();
cmd.op("orgName", Op.eq, org.getOrgName());
// cmd.orderBy("name", OrderType.ASC);
System.out.println("-----------------111111111111111111111111-");
List select1 = getSuidRich().select(new Org(), cmd); //语句1:这句会调用数据库
return new Org(org.getOrgName());
})
.collect(Collectors.toList());
问题:语句1调用的方法有条词句:
private static ThreadLocal>> sqlPreValueLocal;
经过orgs00.parallelStream().map()) 执行调用语句1,sqlPreValueLocal是同一个本地变量吗?
并行流场景下使用 ThreadLocal, sqlPreValueLocal = new InheritableThreadLocal<>(); //语句1
或者
sqlPreValueLocal = new ThreadLocal<>();//语句2
用语句1有时上下文丢失,但用语句2不会, 可以解释为什么吗?
在并行流场景下,使用InheritableThreadLocal
和ThreadLocal
可能会出现不同的行为,导致上下文丢失的问题。下面我会解释一下为什么语句1有时会出现上下文丢失,而语句2不会。
InheritableThreadLocal
(语句1):InheritableThreadLocal
会使子线程继承父线程中的ThreadLocal
变量的值。在并行流中,由于线程池的调度机制,父线程有可能会参与到并行流线程池的调度。如果父线程的上下文在某个时刻被清理,那么后续拷贝到子线程的上下文可能为空,导致上下文丢失的问题。这种情况可能会导致语句1有时会出现上下文丢失的情况。
ThreadLocal
(语句2):ThreadLocal
不具备继承性,子线程不会继承父线程中的ThreadLocal
变量的值。因此,在并行流中使用ThreadLocal
时,不会出现父线程上下文被清理导致的上下文丢失问题。每个线程都有自己独立的ThreadLocal
副本,不会受到父线程的影响。
综上所述,使用InheritableThreadLocal
在并行流场景下可能会出现上下文丢失的问题,而使用ThreadLocal
则不会受到这种影响。因此,如果在并行流中需要使用ThreadLocal
来传递上下文,可以考虑使用ThreadLocal
而不是InheritableThreadLocal
,以避免上下文丢失的情况。
原因在于并行流的设计比较特殊,父线程也有可能参与到并行流线程池的调度,那如果?方法被父线程执行,那么父线程的上下文会被清理。导致后续拷贝到子线程的上下文都为 null,同样产生丢失上下文的问题。
以上结论不保证十分准确。
但有下面的测试实验结论:
// 1. parallelStream().map + sqlPreValueLocal = new InheritableThreadLocal<>();
// 很容易出现: No value specified for parameter 1
// 2. parallelStream().map + sqlPreValueLocal = new ThreadLocal<>();
// 不会出现: No value specified for parameter 1
// 2.5 stream().map + sqlPreValueLocal = new ThreadLocal<>();
// 不会出现: No value specified for parameter 1
// 1.5 stream().map + sqlPreValueLocal = new InheritableThreadLocal<>();
// 不会出现: No value specified for parameter 1
// 结论:只有1. parallelStream().map + InheritableThreadLocal 才会现现 No value specified for parameter 1
测试代码:
import java.util.List;
import java.util.stream.Collectors;
import org.teasoft.bee.osql.Op;
import org.teasoft.bee.osql.api.SuidRich;
import org.teasoft.honey.osql.core.ConditionImpl;
import org.teasoft.honey.osql.shortcut.BF;
//parallelStream().map并行流测ORM
public class StreamSelectTest3 {
public static void main(String[] args) {
List<Org> orgs00 = getSuidRich().select(new Org()); //约50条记录
// 1. parallelStream().map + sqlPreValueLocal = new InheritableThreadLocal<>();
// 很容易出现: No value specified for parameter 1
// 2. parallelStream().map + sqlPreValueLocal = new ThreadLocal<>();
// 不会出现: No value specified for parameter 1
// 2.5 stream().map + sqlPreValueLocal = new ThreadLocal<>();
// 不会出现: No value specified for parameter 1
// 1.5 stream().map + sqlPreValueLocal = new InheritableThreadLocal<>();
// 不会出现: No value specified for parameter 1
// 结论:只有1. parallelStream().map + InheritableThreadLocal 才会现现 No value specified for parameter 1
// 直接在map中进行数据库查询
// List<Org> orgs = orgs00.stream()
List<Org> orgs = orgs00.parallelStream() // 这个才会
.map(org -> {
// 从数据库中获取 Org 对象的操作
// 这里可以是实际的数据库查询操作
ConditionImpl cmd = new ConditionImpl();
cmd.op("orgName", Op.eq, org.getOrgName());
System.out.println("-----------------111111111111111111111111-");
List<Org> select1 = getSuidRich().select(new Org(), cmd);
return new Org(org.getOrgName());
}).collect(Collectors.toList());
orgs.forEach(org -> System.out.println("Org name: " + org.getOrgName()));
}
static SuidRich getSuidRich() {
return BF.getSuidRich();
}
}
多线程测试测没有发现相应问题。?
import java.util.List;
import org.teasoft.bee.osql.api.SuidRich;
import org.teasoft.honey.osql.shortcut.BF;
public class ThreadSelectTest2 extends Thread{
// static SuidRich suidRich=BF.getSuidRich();
public static void main(String[] args) throws Exception{
ThreadSelectTest2 test[]=new ThreadSelectTest2[5];
for (int i = 0; i < test.length; i++) {
test[i]=new ThreadSelectTest2();
test[i].start();
}
System.out.println("finished!");
}
// sqlPreValueLocal = new InheritableThreadLocal<>(); //没问题 , 没那么容易连不上
// sqlPreValueLocal = new ThreadLocal<>(); // 很容易连不上; 但没碰到 No value specified for parameter 1
// org.teasoft.bee.osql.BeeSQLException: The driver was unable to create a connection due to an inability to establish the client portion of a socket.
// This is usually caused by a limit on the number of sockets imposed by the operating system. This limit is usually configurable.
// For Unix-based platforms, see the manual page for the 'ulimit' command. Kernel or system reconfiguration may also be required.
public void run() {
SuidRich suidRich=BF.getSuidRich();
List<Org> listOrg=suidRich.select(new Org()); //约50条
for (int i = 0; i < 2; i++) {
for (int j = 0; j < listOrg.size(); j++) {
Org org = new Org(listOrg.get(i).getOrgName()); //拿外层每一条的值 又去查一遍;只是测试性能,不考虑业务正确与否
suidRich.select(org);
}
}
}
}