目录
3.1 thenApply / thenAccept / thenRun
CompletableFuture是java.util.concurrent库在java 8中新增的主要工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。由于函数式编程在java中越来越多的被使用到,熟练掌握CompletableFuture,对于更好的使用java 8后的主要新特性很重要。简单起见,本文使用的CompletableFuture版本为java 8(java 11的CompletableFuture新增了一些方法)。
CompletableFuture字面翻译过来,就是“可完成的Future”。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回。
下面的示例,比较简单的说明了,CompletableFuture是如何被主动完成的。在下面这段代码中,由于调用了complete方法,所以最终的打印结果是“manual test”,而不是"test"。
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
try{
Thread.sleep(1000L);
return "test";
} catch (Exception e){
return "failed test";
}
});
future.complete("manual test");
System.out.println(future.join());
最简单的方式就是通过构造函数创建一个CompletableFuture实例。如下代码所示。由于新创建的CompletableFuture还没有任何计算结果,这时调用join,当前线程会一直阻塞在这里。
CompletableFuture<String> future = new CompletableFuture();
String result = future.join();
System.out.println(result);
此时,如果在另外一个线程中,主动设置该CompletableFuture的值,则上面线程中的结果就能返回。
future.complete("test");
这展示了CompletableFuture最简单的创建及使用方法。
CompletableFuture.supplyAsync()也可以用来创建CompletableFuture实例。通过该函数创建的CompletableFuture实例会异步执行当前传入的计算任务。在调用端,则可以通过get或join获取最终计算结果。
supplyAsync有两种签名:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
第一种只需传入一个Supplier实例(一般使用lamda表达式),此时框架会默认使用ForkJoin的线程池来执行被提交的任务。
第二种可以指定自定义的线程池,然后将任务提交给该线程池执行。
下面为使用supplyAsync创建CompletableFuture的示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
System.out.println("compute test");
return "test";
});
String result = future.join();
System.out.println("get result: " + result);
在示例中,异步任务中会打印出“compute test”,并返回"test"作为最终计算结果。所以,最终的打印信息为“get result: test”。
CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值。因此,runAsync适合创建不需要返回值的计算任务。同supplyAsync()类似,runAsync()也有两种签名:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
下面为使用runAsync()的例子:
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
System.out.println("compute test");
});
System.out.println("get result: " + future.join());
在示例中,由于任务没有返回值, 所以最后的打印结果是"get result: null"。
同Future相比,CompletableFuture最大的不同是支持流式(Stream)的计算处理,多个任务之间,可以前后相连,从而形成一个计算流。比如:任务1产生的结果,可以直接作为任务2的入参,参与任务2的计算,以此类推。
CompletableFuture中常用的流式连接函数包括:
thenApply
thenApplyAsync
thenAccept
thenAcceptAsync
thenRun
thenRunAsync
thenCombine
thenCombineAsync
thenCompose
thenComposeAsync
whenComplete
whenCompleteAsync
handle
handleAsync
其中,带Async后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的。除此之外,两者没有其他区别。因此,为了快速理解,在接下来的介绍中,我们主要介绍不带Async的版本。
这里将thenApply / thenAccept / thenRun放在一起讲,因为这几个连接函数之间的唯一区别是提交的任务类型不一样。thenApply提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。thenAccept提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。thenRun提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。
因此,简单起见,我们这里主要讲thenApply。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture<Integer> future2 = future1.thenApply((p)->{
System.out.println("compute 2");
return p+10;
});
System.out.println("result: " + future2.join());
在上面的示例中,future1通过调用thenApply将后置任务连接起来,并形成future2。该示例的最终打印结果为11,可见程序在运行中,future1的结果计算出来后,会传递给通过thenApply连接的任务,从而产生future2的最终结果为1+10=11。当然,在实际使用中,我们理论上可以无限连接后续计算任务,从而实现链条更长的流式计算。
需要注意的是,通过thenApply / thenAccept / thenRun连接的任务,当且仅当前置任务计算完成时,才会开始后置任务的计算。因此,这组函数主要用于连接前后有依赖的任务链。
同前面一组连接函数相比,thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture(或者是任意实现了CompletionStage的类型),从而允许前后连接的两个任务可以并行执行(后置任务不需要等待前置任务执行完成),最后当两个任务均完成时,再将其结果同时传递给下游处理任务,从而得到最终结果。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 2");
return 10;
});
CompletableFuture<Integer> future3 = future1.thenCombine(future2, (r1, r2)->r1 + r2);
System.out.println("result: " + future3.join());
上面示例代码中,future1和future2为独立的CompletableFuture任务,他们分别会在各自的线程中并行执行,然后future1通过thenCombine与future2连接,并且以lamda表达式传入处理结果的表达式,该表达式代表的任务会将future1与future2的结果作为入参并计算他们的和。
因此,上面示例代码中,最终的打印结果是11。
一般,在连接任务之间互相不依赖的情况下,可以使用thenCombine来连接任务,从而提升任务之间的并发度。
注意,thenAcceptBoth、thenAcceptBothAsync、runAfterBoth、runAfterBothAsync的作用与thenConbime类似,唯一不同的地方是任务类型不同,分别是BiConumser、Runnable。
前面讲了thenCombine主要用于没有前后依赖关系之间的任务进行连接。那么,如果两个任务之间有前后依赖关系,但是连接任务又是独立的CompletableFuture,该怎么实现呢?
先来看一下直接使用thenApply来实现:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture<CompletableFuture<Integer>> future2 =
future1.thenApply((r)->CompletableFuture.supplyAsync(()->r+10));
System.out.println(future2.join().join());
可以发现,上面示例代码中,future2的类型变成了CompletableFuture嵌套,而且在获取结果的时候,也需要嵌套调用join或者get。这样,当连接的任务越多时,代码会变得越来越复杂,嵌套获取层级也越来越深。因此,需要一种方式,能将这种嵌套模式展开,使其没有那么多层级。thenCompose的主要目的就是解决这个问题(这里也可以将thenCompose的作用类比于stream接口中的flatMap,因为他们都可以将类型嵌套展开)。
看一下通过thenCompose如何实现上面的代码:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture<Integer> future2 = future1.thenCompose((r)->CompletableFuture.supplyAsync(()->r+10));
System.out.println(future2.join());
通过示例代码可以看出来,很明显,在使用了thenCompose后,future2不再存在CompletableFuture类型嵌套了,从而比较简洁的达到了我们的目的。
whenComplete主要用于注入任务完成时的回调通知逻辑。这个解决了传统future在任务完成时,无法主动发起通知的问题。前置任务会将计算结果或者抛出的异常作为入参传递给回调通知函数。
以下为示例:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture future2 = future1.whenComplete((r, e)->{
if(e != null){
System.out.println("compute failed!");
} else {
System.out.println("received result is " + r);
}
});
System.out.println("result: " + future2.join());
需要注意的是,future2获得的结果是前置任务的结果,whenComplete中的逻辑不会影响计算结果。
handle与whenComplete的作用有些类似,但是handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果。
以下为示例:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture<Integer> future2 = future1.handle((r, e)->{
if(e != null){
System.out.println("compute failed!");
return r;
} else {
System.out.println("received result is " + r);
return r + 10;
}
});
System.out.println("result: " + future2.join());
在以上示例中,打印出的最终结果为11。说明经过handle计算后产生了新的结果。