Java 异步编程的完美利器:CompletableFuture 指北

发布时间:2024年01月22日

Future获取异步执行结果

之前我们详细探索了线程池,在上一篇文章中,我们仅仅介绍了 ThreadPoolExecutor 的 void execute(Runnable command) 方法,利用这个方法虽然可以提交任务,但是却没有办法获取任务的执行结果(execute() 方法没有返回值)。而很多场景下,我们又都是需要获取任务的执行结果的。

Future介绍

Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。

图片

我们发现它们的返回值都是 Future 接口。

图片

Future 接口有 5 个方法:

?取消任务的方法 cancel()?判断任务是否已取消的方法 isCancelled()?判断任务是否已结束的方法 isDone()?获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。

下面我们简单看下Future的例子:

图片

FutureTask

FutureTask是一个实现了RunnableFuture接口的类,它既可以作为Runnable对象传递给线程执行,也可以作为Future对象获取任务的结果。因此,我们可以通过FutureTaskCallable任务转化为可执行的异步任务,并在需要时获取任务的结果。

图片

下面是一个FutureTask的例子:

图片

FutureTask也可以直接作为ExecutorService的参数进行提交,以便执行任务,而无需手动创建线程。这样可以更方便地管理线程池和异步任务。

Future与FutureTask的不足

尽管Future与FutureTask在Java中提供了一种基本的异步编程方式,但它也存在一些不足之处:

1.缺乏异步回调机制:FutureTaskFuture接口都没有直接提供异步回调的机制。在某些场景下,我们可能希望在任务完成后立即执行一些操作,而不是阻塞等待结果。需要手动编写额外的代码来实现异步回调逻辑,增加了代码的复杂性。

2.无法手动完成或取消任务:FutureTaskFuture都没有提供主动完成或取消任务的方法。一旦任务提交,就无法在外部控制其执行状态。这可能会导致无法优雅地管理任务的生命周期和资源。

3.阻塞式获取结果:在使用get()方法获取结果时,如果任务还未完成,调用线程会被阻塞,无法进行其他操作。这种阻塞式获取结果的方式可能导致整体性能下降,特别是在多个异步任务同时执行时。

4.缺乏异常处理灵活性:FutureFutureTask在处理任务执行过程中的异常时,比较简单且不够灵活。通过捕获ExecutionException来获取异常信息,可能需要额外的处理逻辑来处理不同类型的异常情况。

为了解决这些问题,Guava提供了ListenableFuture,Java 8引入了CompletableFuture,它们都提供了更丰富的功能和灵活性,如异步回调、异常处理、任务组合等。

CompletableFuture使用

CompletableFutur介绍

CompletableFuture提供下面几种方法创建任务,它们之间的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。

图片

默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。

如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以我们要根据不同的业务类型创建不同的线程池,以避免互相干扰。

下面是一个CompletableFuture获取异步结果的例子:

图片

时序依赖关系

通常任务的依赖分为以下几种:

1.串行执行Start->Future1->Future2->Future3->En

2.所有都执行完:

图片

3.任意完成:

图片

CompletableFuture的CompletionStage 接口可以清晰地描述任务之间的这种时序依赖关系。下面我们看下CompletionStage 接口如何描述串行关系、AND 聚合关系、OR 聚合关系以及异常处理。

CompletionStage编排

串行执行

描述串行关系CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四种类型的接口。

?thenApply 系列函数里参数 fn 的类型是接口 Function,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage。

?thenAccept 系列方法里参数 consumer 的类型是接口Consumer,这个接口里与 CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage。

?thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage。这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。

?thenCompose 方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。

图片

通过下面的示例代码,我们可以看一下 thenApply() 方法是如何使用的。首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。

图片

AND汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

图片

OR 汇聚关系

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

图片

下面的示例代码展示了如何使用 applyToEither() 方法来描述一个 OR 汇聚关系。

图片

异常处理

异常处理虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常。正常业务代码中,我们可以使用 try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?

图片

CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

图片

下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

图片

获取结果

CompletableFuture提供了下面几种常用的方法获取结果:

图片

1.get()get()方法是最常用的获取CompletableFuture结果的方法之一。它会阻塞当前线程,直到异步操作完成并返回结果,或者抛出异常。如果异步操作抛出异常,get()方法会将异常包装在ExecutionException中抛出。这个方法可以用于同步地获取结果。

2.join()join()方法与get()方法类似,也会阻塞当前线程,直到异步操作完成并返回结果,或者抛出异常。不同之处在于,join()方法不会抛出ExecutionException,而是直接将异常抛出。这个方法可以用于同步地获取结果。

3.whenComplete(BiConsumer<? super T,? super Throwable> action)whenComplete()方法允许注册一个回调函数,在异步操作完成后执行该函数。回调函数接收异步操作的结果(如果成功完成)或异常(如果发生异常),并可以对结果进行进一步处理或执行其他操作。这个方法不会阻塞线程,异步操作完成后立即执行回调函数。

4.handle(BiFunction<? super T, Throwable, ? extends U> fn)handle()方法类似于whenComplete(),也允许注册一个回调函数,在异步操作完成后执行该函数。不同之处在于,回调函数的返回值会被包装在新的CompletableFuture中返回,而不是忽略返回值。这个方法可以用于对结果进行处理或转换,并返回包装后的新CompletableFuture。

5.allOf():就是所有任务都完成时触发。allOf()可以配合get()一起使用。

6.anyOf():等待任意一个完成。anyOf()方法返回一个新的CompletableFuture<Object>对象,该对象在任意一个输入的CompletableFuture完成后完成,并持有该完成的结果。

下面是allOf()的一个例子

图片

文章来源:https://blog.csdn.net/huanghuozhiye/article/details/135615758
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。