上一个例子是当任意一个阶段完成后接着处理,接下来的两个例子演示当所有的阶段完成后才继续处理, 同步地方式和异步地方式两种。
static void allOfExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
assertTrue("Result was empty", result.length() > 0);
}
使用thenApplyAsync()
替换那些单个的CompletableFutures的方法,allOf()
会在通用池中的线程中异步地执行。所以我们需要调用join
方法等待它完成。
static void allOfAsyncExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
allOf.join();
assertTrue("Result was empty", result.length() > 0);
}
现在你已经了解了CompletionStage 和 CompletableFuture 的一些函数的功能,下面的例子是一个实践场景:
首先异步调用cars方法获得Car的列表,它返回CompletionStage场景。cars消费一个远程的REST API。
然后我们复合一个CompletionStage填写每个汽车的评分,通过rating(manufacturerId)返回一个CompletionStage, 它会异步地获取汽车的评分(可能又是一个REST API调用)
当所有的汽车填好评分后,我们结束这个列表,所以我们调用allOf得到最终的阶段, 它在前面阶段所有阶段完成后才完成。
在最终的阶段调用whenComplete(),我们打印出每个汽车和它的评分。
cars().thenCompose(cars -> {
List<CompletionStage> updatedCars = cars.stream()
.map(car -> rating(car.manufacturerId).thenApply(r -> {
car.setRating(r);
return car;
})).collect(Collectors.toList());
CompletableFuture done = CompletableFuture
.allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars, th) -> {
if (th == null) {
cars.forEach(System.out::println);
} else {
throw new RuntimeException(th);
}
}).toCompletableFuture().join();
因为每个汽车的实例都是独立的,得到每个汽车的评分都可以异步地执行,这会提高系统的性能(延迟),而且,等待所有的汽车评分被处理使用的是allOf
方法,而不是手工的线程等待(Thread#join() 或 a CountDownLatch)。