Java 完成CompletableFuture

Java 完成CompletableFuture,用户希望显式地完成 CompletableFuture,要么设置它的值,要么在调用 get 方法时使其抛出异常。可以使用 CompletableFuture 类定义的 completedFuturecompletecompleteExceptionally 方法。

Java 完成CompletableFuture 问题描述

用户希望显式地完成 CompletableFuture,要么设置它的值,要么在调用 get 方法时使其抛出异常。

Java 完成CompletableFuture 解决方案

使用 CompletableFuture 类定义的 completedFuturecompletecompleteExceptionally 方法。

Java 完成CompletableFuture 具体实例

CompletableFuture 类不仅能实现 Future 接口,也能实现 CompletionStage 接口,它定义的数十种方法可以满足各种需要。
CompletableFuture 类的最大优势在于无须编写嵌套回调(nested callback)就能协调操作,相关讨论请参见范例 9.6 和范例 9.7。本范例将探讨如何在已有返回值时完成 CompletableFuture
假设我们的应用程序需要根据产品 ID 检索相关产品。由于涉及某种形式的远程访问,检索过程可能会引入大量开销。这些开销包括对 RESTful Web 服务的网络调用、数据库调用以及其他相对耗时的操作。
为此,我们采用映射的形式在本地创建产品缓存:请求某个产品时,系统首先在映射中检索。如果返回 null,再进行开销较大的操作。例 9-19 显示了通过本地和远程两种方式来检索产品。

例 9-19 产品检索

private Map<Integer, Product> cache = new HashMap<>();
private Logger logger = Logger.getLogger(this.getClass().getName());

private Product getLocal(int id) {
    return cache.get(id);                                ➊
}

private Product getRemote(int id) {
    try {
        Thread.sleep(100);                               ➋
        if (id == 666) {
            throw new RuntimeException("Evil request");  ➌
        }
    } catch (InterruptedException ignored) {
    }
    return new Product(id, "name");
}

❶ 立即返回,但可能为 null
❷ 人为引入延迟,然后检索
❸ 模拟网络、数据库或其他形式的错误
接下来,我们创建一个名为 getProduct 的方法,传入产品 ID 作为参数,并返回相应的产品。不过,如果将返回类型设置为 CompletableFuture<Product>getProduct 方法将立即返回;而在实际的检索过程中,程序还可以处理其他任务。
为此,需要通过某种方式来完成 CompletableFutureCompletableFuture 类定义了三种相关的方法:

boolean              complete(T value)
static <U> CompletableFuture<U> completedFuture(U value)
           boolean              completeExceptionally(Throwable ex)

如果已有 CompletableFuture 且希望将其设置为给定的值,可以采用 complete 方法; CompletableFuture 方法是一种工厂方法,它使用给定的值创建一个新的 CompletableFuturecompleteExceptionally 方法使用给定的异常来结束 Future
例 9-20 显示了上述三种方法的应用。程序假定存在一种从远程系统返回产品的遗留机制,希望使用这种机制完成 Future

例 9-20 完成 CompletableFuture

public CompletableFuture<Product> getProduct(int id) {
    try {
        Product product = getLocal(id);
        if (product != null) {
            return CompletableFuture.completedFuture(product);  ➊
        } else {
            CompletableFuture<Product> future = new CompletableFuture<>();
            Product p = getRemote(id);                          ➋
            cache.put(id, p);
            future.complete(p);                                 ➌
            return future;
        }
    } catch (Exception e) {
        CompletableFuture<Product> future = new CompletableFuture<>();
        future.completeExceptionally(e);                        ➍
        return future;
    }
}

❶ 在缓存中检索到产品(如果有的话)后完成
❷ 遗留检索方式
❸ 在执行遗留检索方式后完成(异步操作的情况参见例 9-22)
❹ 如果出现问题,在抛出异常后完成
在本例中,getProduct 方法首先尝试在缓存中检索产品。如果映射返回非空值,则使用工厂方法 completedFuture 返回该值。
如果缓存返回 null,说明需要进行远程访问。程序模拟了一种可能是遗留代码的同步方案,稍后将详细讨论。CompletableFuture 类被实例化,complete 方法采用生成的值进行填充。
最后,如果出现严重问题(将 ID 设置为 666 以便模拟),程序将抛出 RuntimeExceptioncompleteExceptionally 方法传入 RuntimeException 作为参数,并使用该异常结束 Future
例 9-21 的测试用例显示了异常处理的工作方式。

例 9-21 completeExceptionally 方法的应用

@Test(expected = ExecutionException.class)
public void testException() throws Exception {
    demo.getProduct(666).get();
}

@Test
public void testExceptionWithCause() throws Exception {
    try {
        demo.getProduct(666).get();
        fail("Houston, we have a problem...");
    } catch (ExecutionException e) {
        assertEquals(ExecutionException.class, e.getClass());
        assertEquals(RuntimeException.class, e.getCause().getClass());
    }
}

上述两项测试均能通过。在 CompletableFuture 上调用 completeExceptionally 方法时,get 方法将抛出 ExecutionException,这是由首先导致问题的异常(本例为 RuntimeException)所引起的。

get 方法抛出的 ExecutionException 是一个受检异常(checked exception)。join 方法与 get 方法相同,不同之处在于,如果异常完成,join 方法将抛出由底层异常所引发的 CompletionException,它是一个非受检异常(unchecked exception)。

在例 9-20 中,最有可能替换的是执行产品同步检索的那部分代码。为此,可以使用 CompletableFuture 类定义的另一种静态工厂方法 supplyAsync,它包括以下形式:

static     CompletableFuture<Void> runAsync(Runnable runnable)
static     CompletableFuture<Void> runAsync(Runnable runnable,
                                            Executor executor)

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                            Executor executor)

supplyAsync 方法返回一个使用给定 Supplier 的对象;如果不需要返回对象,则使用 runAsync 方法更方便。两种方法的单参数形式使用默认的通用 fork/join 线程池,而双参数形式使用给定的执行器(executor)作为第二个参数。
例 9-22 显示了采用异步模式检索产品。

例 9-22 利用 supplyAsync 方法检索产品

public CompletableFuture<Product> getProductAsync(int id) {
    try {
        Product product = getLocal(id);
        if (product != null) {
            logger.info("getLocal with id=" + id);
            return CompletableFuture.completedFuture(product);
        } else {
            logger.info("getRemote with id=" + id);

            return CompletableFuture.supplyAsync(() -> {    ➊
                Product p = getRemote(id);
                cache.put(id, p);
                return p;
            });
        }
    } catch (Exception e) {
        logger.info("exception thrown");
        CompletableFuture<Product> future = new CompletableFuture<>();
        future.completeExceptionally(e);
        return future;
    }
}

➊ 与之前的操作相同,但采用异步模式返回检索到的产品
可以看到,程序在实现 Supplier<Product> 的 lambda 表达式中检索产品。我们总是可以将其作为独立的方法提取出来,并将代码简化为一个方法引用。
不过,如何在 CompletableFuture 完成之后调用其他操作颇费周章。有关多个 CompletableFuture 实例之间的协调请参见范例多个CompletableFuture之间的协调

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程