实际项目中,多线程都是结合线程池使用的。
多线程实际应用有两种情况,一种是异步任务执行,不需要返回值,另一种是异步任务的查询,需要返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {
Integer count = 10;
// 批量创建时采用多线程
ExecutorService executorService = new ThreadPoolExecutor(count, count,
10L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder()
.setNameFormat("excute-pool-%d")
.build(),
new ThreadPoolExecutor.AbortPolicy());
CountDownLatch countDownLatch = new CountDownLatch(count);
for (int i = 1; i <= count; i++) {
Runnable runnable = () -> {
try {
Thread.sleep(5000);
//执行任务逻辑代码
System.out.println("任务:" + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
};
executorService.submit(runnable);
}
try {
//任务执行完,才继续往下走,如果不用管任务是否执行完,把该代码注释掉即可
//具体要结合业务场景使用
countDownLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
}
executorService.shutdown();
System.out.println(".......");
}
CompletableFuture是java8推出的一个非常简便的多线程写法,在上面 "1、批量执行异步任务"中也可以用CompletableFuture来写,而且更简便。
1、runAsync()是没返回结果的,supplyAsync()可以指定返回结果。
2、使用没有指定Executor(线程池)的方法时,内部使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
3、如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable runnable = () -> System.out.println("无返回结果异步任务");
CompletableFuture.runAsync(runnable);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
System.out.println("有返回值的异步任务1");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello World";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
System.out.println("有返回值的异步任务2");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello World";
});
//get方法会阻塞主线程,直到线程池中的线程执行完
future1.get();
future2.get();
//项目中一般会采用这个写法,作用实际是和get方法一样的
CompletableFuture.allOf(future1,future2).join();
System.out.println("......");
}
当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
1、Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
2、方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
3、这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable runnable = () -> System.out.println("无返回结果异步任务");
CompletableFuture.runAsync(runnable);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
System.out.println("有返回值的异步任务1");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello World";
});
//等待任务执行完成
future1.join();
// 任务完成或异常方法完成时执行该方法
// 如果出现了异常,任务结果为null
future1.whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(String t, Throwable action) {
System.out.println(t + " 执行完成!");
}
});
System.out.println("......");
}
将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。
thenApply接收一个函数作为参数,使用该函数处理上一个CompletableFuture调用的结果,并返回一个具有处理结果的Future对象。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Hello World";
}).thenApply(str-> str+"-haha");
System.out.println(future1);
thenCompose的参数为一个返回CompletableFuture实例的函数,该函数的参数是先前计算步骤的结果。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int number = new Random().nextInt(30);
System.out.println("第一次运算:" + number);
return number;
}
})
.thenCompose(new Function<Integer, CompletionStage<Integer>>() {
@Override
public CompletionStage<Integer> apply(Integer param) {
return CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int number = param * 2;
System.out.println("第二次运算:" + number);
return number;
}
});
}
});
thenApply 和 thenCompose的区别:
1、thenApply转换的是泛型中的类型,返回的是同一个CompletableFuture;
2、thenCompose将内部的CompletableFuture调用展开来并使用上一个CompletableFutre调用的结果在下一步的CompletableFuture调用中进行运算,是生成一个新的CompletableFuture。
https://blog.csdn.net/sermonlizhi/article/details/123356877