Java 多个CompletableFuture之间的协调(第2部分)

Java 多个CompletableFuture之间的协调,用户希望通过更复杂的示例了解如何协调多个 CompletableFuture 实例,在美国职棒大联盟(MLB)赛季的每个比赛日访问官方网站,其中包括指向当日比赛的链接。下载每场比赛的技术统计信息(box score),并将其转换为一个 Java 类。采用异步方式保存数据后计算每场比赛的结果,找出总分最高的比赛,然后打印最高分以及出现最高分的那场比赛。

Java 多个CompletableFuture之间的协调 问题描述

用户希望通过更复杂的示例了解如何协调多个 CompletableFuture 实例。

Java 多个CompletableFuture之间的协调 解决方案

在美国职棒大联盟(MLB)赛季的每个比赛日访问官方网站,其中包括指向当日比赛的链接。下载每场比赛的技术统计信息(box score),并将其转换为一个 Java 类。采用异步方式保存数据后计算每场比赛的结果,找出总分最高的比赛,然后打印最高分以及出现最高分的那场比赛。

Java 多个CompletableFuture之间的协调 具体实例

较之其他简单的示例,本范例所讨论的应用程序更为复杂。希望读者能从中受到启发,理解如何通过合并多个 CompletableFuture 任务来完成工作。
MLB 官方网站保存了指定比赛日中每场比赛的得分,我们的应用程序即以此为基础。16 以 2017 年 6 月 14 日为例,包括当日所有比赛信息的页面如图 9-1 所示。
16只要了解以下内容,理解本例就不会存在障碍:在棒球比赛中,两队轮流攻守,得分较高的队胜出,比赛的统计数据称为技术统计。

图 9-1:2017 年 6 月 14 日进行的比赛
在上述页面中,每个链接指向一场比赛。链接以字母 gid 开头,后跟年、月、日以及主队和客队代码。点击某个链接,跳转后的页面包含一个文件列表,其中有一个名为 boxscore.json 的文件。
我们的应用程序将完成以下任务。
(1) 访问提供指定日期范围内各场比赛信息的网站;
(2) 确定每个页面的比赛链接;
(3)下载每场比赛的 boxscore.json 文件;
(4)将 boxscore.json 文件转换为相应的 Java 对象;
(5) 将下载结果保存到本地文件;
(6) 计算每场比赛的得分;
(7) 检索总分最高的比赛;
(8) 打印所有比赛的得分,以及出现最高得分的比赛及其得分。
可以将大部分任务安排为并发执行,不少任务都能以并行方式运行。
受篇幅所限,无法将完整的程序代码复制到书中,不过读者可以从GitHub 下载 。本范例将重点讨论并行流和 CompletableFuture 的应用。

第一个难点在于如何找出给定范围内每个比赛日的比赛链接。如例 9-28 所示,GamePageLinksSupplier 类实现了 Supplier 接口,其作用是生成一个表示比赛链接的字符串列表。

例 9-28 获取某个日期范围内的比赛链接

public class GamePageLinksSupplier implements Supplier<List<String>> {
    private static final String BASE =
        "http://gd2.mlb.com/components/game/mlb/";
    private LocalDate startDate;
    private int days;

    public GamePageLinksSupplier(LocalDate startDate, int days) {
        this.startDate = startDate;
        this.days = days;
    }

    public List<String> getGamePageLinks(LocalDate localDate) {
        // 使用Jsoup库解析HTML网页,并提取以"gid"开头的链接
    }

    @Override
    public List<String> get() {        ➊
        return Stream.iterate(startDate, d -> d.plusDays(1))
            .limit(days)
            .map(this::getGamePageLinks)
            .flatMap(list -> list.isEmpty() ? Stream.empty() : list.stream())
            .collect(Collectors.toList());
    }
}

Supplier<List<String>> 所需的方法
get 方法使用 Stream.iterate 方法对某个范围内的日期进行迭代:从给定日期开始,逐天递增直至上限。

Java 9 为 LocalDate 类引入了 datesUntil 方法,它将生成 Stream<LocalDate>。相关讨论请参见范例日期范围

每个 LocalDate 都成为 getGamePageLinks 方法的参数,它使用 Jsoup 库解析 HTML 页面,并查找所有以 gid 开头的链接,然后以字符串的形式返回这些链接。
接下来,程序通过实现 Function 接口的 BoxscoreRetriever 类来访问每个比赛链接中的 boxscore.json 文件,如例 9-29 所示。

例 9-29 在比赛链接列表中检索技术统计列表

public class BoxscoreRetriever implements Function<List<String>, List<Result>> {
    private static final String BASE =
        "http://gd2.mlb.com/components/game/mlb/";

    private OkHttpClient client = new OkHttpClient();
    private Gson gson = new Gson();
    @SuppressWarnings("ConstantConditions")
    public Optional<Result> gamePattern2Result(String pattern) {
        // 省略代码
        String boxscoreUrl = BASE + dateUrl + pattern + "boxscore.json";

        // 设置OkHttp库以创建网络调用
        try {
            // 获取响应
            if (!response.isSuccessful()) {
                System.out.println("Box score not found for " + boxscoreUrl);
                return Optional.empty();      ➊
            }

            return Optional.ofNullable(
                gson.fromJson(response.body().charStream(), Result.class)); ➋
        } catch (IOException e) {
            e.printStackTrace();
            return Optional.empty();          ➊
        }
    }

    @Override
    public List<Result> apply(List<String> strings) {
        return strings.parallelStream()
                .map(this::gamePattern2Result)
                .filter(Optional::isPresent)
                .map(Optional::get)
                .collect(Collectors.toList());
    }
}

❶ 如果由于降雨或其他因素而未能找到技术统计信息,则返回空 Optional
❷ 利用 Gson 库将 JSON 转换为 Result
BoxscoreRetriever 类需要使用 OkHttp 库和 Gson 库以下载 JSON 格式的技术统计信息,并将其转换为 Result 类型的对象。由于 BoxscoreRetriever 类实现了 Function 接口,可以实现 apply 方法,从而将字符串列表转换为结果列表。如果给定的比赛由于降雨而取消,或因为某些原因导致网络连接中断,则可能找不到该场比赛的技术统计。这种情况下,gamePattern2Result 方法将返回一个为空的 Optional<Result>
apply 方法读取各场比赛的链接,将它们转换为相应的 Optional<Result>。接下来,apply 方法对流进行筛选,仅传递非空的 Optional 实例,然后在这些 Optional 实例上调用 get 方法,最后将它们收集到结果列表。

Java 9 为 Optional 类引入了 stream 方法,它可以将 filter(Optional::isPresent)map(Optional::get) 简化为 flatMap(Optional::stream)。相关讨论请参见范例新增的Optional方法

检索到技术统计信息后可以将其保存为本地文件,如例 9-30 所示。

例 9-30 将每场比赛的技术统计信息保存为文件

private void saveResultList(List<Result> results) {
    results.parallelStream().forEach(this::saveResultToFile);
}

public void saveResultToFile(Result result) {
    // 根据比赛日期和队名确定文件名
    try {
        File file = new File(dir + "/" + fileName);
        Files.write(file.toPath().toAbsolutePath(),     ➊
                    gson.toJson(result).getBytes());
    } catch (IOException e) {
        e.printStackTrace();
    }
}

➊ 创建或覆盖文件,然后将其关闭
如果文件不存在,Files.write 方法(使用默认参数)将创建一个新文件,否则覆盖原有文件。创建或覆盖文件后将其关闭。
程序还使用其他两种后期处理方法:getMaxScore 用于确定某场给定比赛的最高总分,而 getMaxGame 将返回出现最高分的那场比赛。两种方法的应用如例 9-31 所示。

例 9-31 获取最高总分以及出现最高分的比赛

private int getTotalScore(Result result) {
    // 两队得分之和
}

public OptionalInt getMaxScore(List<Result> results) {
    return results.stream()
            .mapToInt(this::getTotalScore)
            .max();
}

public Optional<Result> getMaxGame(List<Result> results) {
    return results.stream()
            .max(Comparator.comparingInt(this::getTotalScore));
}

最后,通过 CompletableFuture 将前面讨论的所有方法与类合并在一起。主程序代码如例 9-32 所示。

例 9-32 主程序代码

public void printGames(LocalDate startDate, int days) {
    CompletableFuture<List<Result>> future =
        CompletableFuture.supplyAsync(
            new GamePageLinksSupplier(startDate, days))
                .thenApply(new BoxscoreRetriever());        ➊

    CompletableFuture<Void> futureWrite =
        future.thenAcceptAsync(this::saveResultList)        ➋
            .exceptionally(ex -> {
                System.err.println(ex.getMessage());
                return null;
            });

    CompletableFuture<OptionalInt> futureMaxScore =
        future.thenApplyAsync(this::getMaxScore);
    CompletableFuture<Optional<Result>> futureMaxGame =
        future.thenApplyAsync(this::getMaxGame);
    CompletableFuture<String> futureMax =
        futureMaxScore.thenCombineAsync(futureMaxGame,      ➌
            (score, result) ->
                String.format("Highest score: %d, Max Game: %s",
                              score.orElse(0), result.orElse(null)));

    CompletableFuture.allOf(futureWrite, futureMax).join(); ➍

    future.join().forEach(System.out::println);
    System.out.println(futureMax.join());
}

❶ 检索技术统计信息的协调任务
❷ 保存为文件,如果出现问题则异常完成
❸ 合并最高总分与出现最高分的比赛这两个任务
❹ 完成所有任务
可以看到,程序创建了多个 CompletableFuture 实例。第一个 CompletableFuture 实例使用 GamePageLinksSupplier 类检索指定日期内所有比赛的页面链接,然后通过 BoxscoreRetriever 类将这些链接转换为结果。第二个 CompletableFuture 实例设置将结果写入磁盘,如果出现问题则异常完成。两个后期处理方法 getMaxScoregetMaxGame 分别用于查找最高总分以及出现最高分的那场比赛,18allOf 方法用于完成所有任务。最后,程序打印相应的结果。
18这两项操作显然可以一起完成,程序将二者分开是为了展示 thenCombine 的应用。

注意 thenApplyAsync 方法的应用。该方法并非必需,但能使任务以异步方式运行。
如果需要检索 2017 年 5 月 5 日到 5 月 7 日三天的技术统计信息,请使用以下语句:

GamePageParser parser = new GamePageParser();
parser.printGames(LocalDate.of(2017, Month.MAY, 5), 3);

输出结果如下:

Box score not found for Los Angeles at San Diego on May 5, 2017
May 5, 2017: Arizona Diamondbacks 6, Colorado Rockies 3
May 5, 2017: Boston Red Sox 3, Minnesota Twins 4
May 5, 2017: Chicago White Sox 2, Baltimore Orioles 4
// 更多数据
May 7, 2017: Toronto Blue Jays 2, Tampa Bay Rays 1
May 7, 2017: Washington Nationals 5, Philadelphia Phillies 6
Highest score: 23, Max Game: May 7, 2017: Boston Red Sox 17, Minnesota Twins 6

希望本范例能对读者有所启发,通过综合运用本书介绍的各种知识,包括 Future 任务(使用 CompletableFuture)、函数式接口(如 SupplierFunction)、类(如 OptionalStreamLocalDate)以及方法(如 mapfilterflatMap),掌握如何解决复杂而有趣的问题。

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程