Java 多个CompletableFuture之间的协调,用户希望在一个 Future
完成之后触发另一个动作(action),使用 CompletableFuture
类中用于协调动作的各种实例方法,如 thenApply
、thenCompose
、thenRun
等。
Java 多个CompletableFuture之间的协调 问题描述
用户希望在一个 Future
完成之后触发另一个动作(action)。
Java 多个CompletableFuture之间的协调 解决方案
使用 CompletableFuture
类中用于协调动作的各种实例方法,如 thenApply
、thenCompose
、thenRun
等。
Java 多个CompletableFuture之间的协调 具体实例
CompletableFuture
类的最大优势在于能很容易地将多个 Future
链接起来。我们可以创建多个 Future
以表示需要执行的各种任务,然后通过一个 Future
的完成来触发另一个 Future
的执行,达到协调动作的目的。
我们考虑如何实现下面这个简单的任务:
- 向
Supplier
请求一个包含数字的字符串 - 将数字解析为整数
- 将整数倍增
- 打印倍增后的结果
相关实现如例 9-23 所示,程序并不复杂。
例 9-23 利用
CompletableFuture
协调多个任务
private String sleepThenReturnString() {
try {
Thread.sleep(100); ➊
} catch (InterruptedException ignored) {
}
return "42";
}
CompletableFuture.supplyAsync(this::sleepThenReturnString)
.thenApply(Integer::parseInt) ➋
.thenApply(x -> 2 * x) ➋
.thenAccept(System.out::println) ➌
.join(); ➍
System.out.println("Running...");
❶ 人为引入延迟
❷ 在前一阶段完成后应用 Function
❸ 在前一阶段完成后应用 Consumer
❹ 检索完成的结果
由于对 join
的调用属于阻塞调用,执行上述程序将输出 84 并后跟“Running…”。supplyAsync
方法传入 Supplier
(本例为 String
类型)。thenApply
方法传入 Function
,其输入参数为前一个 CompletionStage
的结果。其中第一个 thenApply
方法中的函数将字符串转换为一个整数,第二个 thenApply
方法中的函数将整数倍增。最后,thenAccept
方法传入 Consumer
,在前一个阶段完成后执行。
CompletableFuture
类定义了多种不同的协调方法,完整列表(不包括重载形式,将在之后讨论)如表 9-1 所示。
表9-1:CompletableFuture
类定义的协调方法
修饰符 | 返回类型 | 方法名 | 参数 |
---|---|---|---|
CompletableFuture<Void> |
acceptEither |
CompletionStage<? extends T> other, Consumer<? super T> action |
|
static |
CompletableFuture<Void> |
allOf |
CompletableFuture<?>... cfs |
static |
CompletableFuture<Object> |
anyOf |
CompletableFuture<?>... cfs |
<U> |
CompletableFuture<U> |
applyToEither |
CompletionStage<? extends T> other, Function<? super T, U> fn |
CompletableFuture<Void> |
runAfterBoth |
CompletionStage<?> other, Runnable action |
|
CompletableFuture<Void> |
thenAccept |
Consumer<? super T> action |
|
<U> |
CompletableFuture<U> |
thenApply |
Function<? super T> action, ? extends U> fn |
<U,V> |
CompletableFuture<V> |
thenCombine |
CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn |
<U> |
CompletableFuture<U> |
thenCompose |
Function<? super T, ? extends CompletionStage<U>> fn |
CompletableFuture<Void> |
thenRun |
Runnable action |
|
CompletableFuture<T> |
whenComplete |
BiConsumer<? super T, ? super Throwable> action |
上表列出的所有方法均使用工作者线程(worker thread)的通用 ForkJoinPool
,其大小与处理器数量相等。前面已经讨论过工厂方法 runAsync
和 supplyAsync
,二者指定 Runnable
或 Supplier
,并返回 CompletableFuture
。如上表所示,可以通过链接其他方法(如 thenApply
或 thenCompose
)来添加将在前一个任务完成后开始执行的任务。
表 9-1 并未列出每种方法包含的其他两种模式,二者以 Async
结尾,一种传入 Executor
,而另一种不传入。以 thenAccept
方法为例,其完整形式如下:
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(
Consumer<? super T> action, Executor executor)
第一种形式在与原有任务相同的线程中执行其 Consumer
参数;第二种形式将 Consumer
再次提交给线程池;第三种形式提供一个 Executor
,用于运行任务而非通用 fork/join 线程池。
是否采用这些方法的
Async
形式应视情况而定。尽管异步操作能加快单个任务的执行速度,但引入的开销可能无助于整体速度的提升。
由于 ExecutorService
接口实现了 Executor
接口,我们也可以使用自定义 Executor
而不是通用线程池。例 9-24 显示了利用单独的线程池执行 CompletableFuture
任务。
例 9-24 在单独的线程池中运行
CompletableFuture
任务
ExecutorService service = Executors.newFixedThreadPool(4);
CompletableFuture.supplyAsync(this::sleepThenReturnString, service) ➊
.thenApply(Integer::parseInt)
.thenApply(x -> 2 * x)
.thenAccept(System.out::println)
.join();
System.out.println("Running...");
➊ 提供单独的线程池作为参数
在本例中,thenApply
和 thenAccept
方法使用的线程与 supplyAsync
方法相同。不过,如果使用 thenApplyAsync
方法,那么除非添加另一个线程池作为附加参数,否则任务将被提交到线程池。
在通用 ForkJoinPool 中等待完成
默认情况下,CompletableFuture 使用所谓的“通用” fork/join 线程池,后者是一种经过优化并执行工作窃取(work stealing)的线程池。根据 Javadoc 的描述,这种线程池中的所有线程“尝试查找并执行提交到线程池或由其他活动任务创建的任务”。需要注意的是,所有工作者线程均为守护线程(daemon thread):如果在线程结束前退出程序,线程将终止。
换言之,在执行例 9-23 所示的代码时,如果没有调用join
方法,程序将只打印“Running…”,而不会输出Future
的结果,系统在任务完成前即告终止。
可以采用两种方案解决这个问题。一种方案是调用get
或join
方法,二者将阻塞调用进程,直至检索到结果。另一种方案是为通用线程池设置一个超时时间(time-out period),告诉程序等待直至所有线程执行完毕:ForkJoinPool.commonPool().awaitQuiescence(long timeout, TimeUnit unit)
如果将线程池的等待周期设置得足够长,
Future
就会完成。awaitQuiescence
方法用于通知系统等待,直至所有工作者线程空闲,或所设置的超时时间结束(以先到者为准)。
对于返回值的 CompletableFuture
实例,可以通过 get
或 join
方法检索该值。两种方法属于阻塞调用,直至 Future
完成或抛出异常。不同之处在于,get
方法抛出的是受检异常 ExecutionException
,而 join
方法抛出的是非受检异常 CompletionException
,因此在 lambda 表达式中更容易使用 join
方法。
cancel
方法用于取消 CompletableFuture
,该方法传入一个布尔值作为参数:
boolean cancel(boolean mayInterruptIfRunning)
如果 CompletableFuture
尚未完成,cancel
方法将利用 CancellationException
使其完成,所有依赖的 CompletableFuture
也会由于 CancellationException
引发的 CompletionException
而异常完成。此时,布尔参数不执行任何操作。14
14有趣的是,根据 Javadoc 的描述,参数 mayInterruptIfRunning
“不起作用,因为中断不用于控制处理”。
之前的示例介绍了 thenApply
和 thenAccept
方法的应用。而 thenCompose
是一种实例方法,可以将某个 Future
链接到原有 Future
,使得第一个 Future
的结果也能用于第二个 Future
。thenCompose
方法的应用如例 9-25 所示,这可能是实现两个数相加的最复杂的程序了。
例 9-25 两个
Future
的复合
@Test
public void compose() throws Exception {
int x = 2;
int y = 3;
CompletableFuture<Integer> completableFuture =
CompletableFuture.supplyAsync(() -> x)
.thenCompose(n -> CompletableFuture.supplyAsync(() -> n + y));
assertTrue(5 == completableFuture.get());
}
thenCompose
方法的参数是一个函数,它传入第一个 Future
的结果,并将其转换为第二个 Future
的输出。不过,如果希望两个 Future
彼此独立,可以改用 thenCombine
方法,如例 9-26 所示。15
15好吧,这同样可能是实现两个数字相加的最复杂的程序。
例 9-26 两个
Future
的合并
@Test
public void combine() throws Exception {
int x = 2;
int y = 3;
CompletableFuture<Integer> completableFuture =
CompletableFuture.supplyAsync(() -> x)
.thenCombine(CompletableFuture.supplyAsync(() -> y),
(n1, n2) -> n1 + n2);
assertTrue(5 == completableFuture.get());
}
thenCombine
方法传入 Future
和 BiFunction
作为参数。计算结果时,两个 Future
的结果都能在函数中使用。
CompletableFuture
类还定义了一种名为 handle
的方法,其签名如下:
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
BiFunction
的两个输入参数为 Future
的结果(正常完成)和抛出的异常(异常结束),返回的参数由程序决定。handle
方法也有两种 Async
模式,一种传入 BiFunction
,另一种传入 BiFunction
和 Executor
。handle
方法的应用如例 9-27 所示。
例 9-27
handle
方法的应用
private CompletableFuture<Integer> getIntegerCompletableFuture(String num) {
return CompletableFuture.supplyAsync(() -> Integer.parseInt(num))
.handle((val, exc) -> val != null ? val : 0);
}
@Test
public void handleWithException() throws Exception {
String num = "abc";
CompletableFuture<Integer> value = getIntegerCompletableFuture(num);
assertTrue(value.get() == 0);
}
@Test
public void handleWithoutException() throws Exception {
String num = "42";
CompletableFuture<Integer> value = getIntegerCompletableFuture(num);
assertTrue(value.get() == 42);
}
本例解析字符串以查找相应的整数。解析成功则返回整数,解析失败则抛出 ParseException
,handle
方法返回 0。上述两个测试表明,handle
方法能正确处理这两种情况。
从上面这些示例不难看到,可以通过多种方式在通用线程池或自定义执行器中同步或异步地合并多个任务。范例 9.7 将给出一个更复杂的示例,讨论如何协调多个 CompletableFuture
。