Java 流的拼接

Java 流的拼接,用户希望将两个或多个流合并为一个流,Stream.concat 方法用于合并两个流。如果需要合并多个流,请使用 Stream.flatMap 方法。假设我们从多个信源获取到数据,且希望使用流来处理其中的每个元素。一种方案是采用 Stream 接口定义的 concat 方法。

Java 流的拼接 问题描述

用户希望将两个或多个流合并为一个流。

Java 流的拼接 解决方案

Stream.concat 方法用于合并两个流。如果需要合并多个流,请使用 Stream.flatMap 方法。

Java 流的拼接 具体实例

假设我们从多个信源获取到数据,且希望使用流来处理其中的每个元素。一种方案是采用 Stream 接口定义的 concat 方法,其签名如下:

static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)

concat 方法将创建一个惰性的拼接流(lazily concatenated stream),其元素是第一个流的所有元素,后跟第二个流的所有元素。根据 Javadoc 的描述,如果两个输入流均为有序流,则生成的流也是有序流;如果某个输入流为并行流,则生成的流也是并行流。关闭生成的流也会关闭两个输入流。

两个输入流所包含的元素类型必须相同。

例 3-59 显示了拼接两个流的简单示例。

例 3-59 拼接两个流

@Test
public void concat() throws Exception {
    Stream<String> first = Stream.of("a", "b", "c").parallel();
    Stream<String> second = Stream.of("X", "Y", "Z");
    List<String> strings = Stream.concat(first, second) ➊
            .collect(Collectors.toList());
    List<String> stringList = Arrays.asList("a", "b", "c", "X", "Y", "Z");
    assertEquals(stringList, strings);
}

➊ 将第二个流的元素附加到第一个流的元素之后
如果需要添加第三个流,可以嵌套使用拼接操作,如例 3-60 所示。

例 3-60 拼接多个流(concat 方法)

@Test
public void concatThree() throws Exception {
    Stream<String> first = Stream.of("a", "b", "c").parallel();
    Stream<String> second = Stream.of("X", "Y", "Z");
    Stream<String> third = Stream.of("alpha", "beta", "gamma");

    List<String> strings = Stream.concat(Stream.concat(first, second), third)
            .collect(Collectors.toList());
    List<String> stringList = Arrays.asList("a", "b", "c",
        "X", "Y", "Z", "alpha", "beta", "gamma");
    assertEquals(stringList, strings);
}

尽管嵌套拼接的方案可行,但请注意 Javadoc 所做的注释:

通过重复拼接操作构建流时应谨慎行事。访问一个深度拼接流中的元素可能导致深层调用链(deep call chain)甚至抛出 StackOverflowException

换言之,concat 方法实际上构建了一个流的二叉树(binary tree),使用过多就会变得难以处理。
另一种方案是采用 reduce 方法执行多次拼接操作,如例 3-61 所示。

例 3-61 拼接多个流(reduce 方法)

@Test
public void reduce() throws Exception {
    Stream<String> first = Stream.of("a", "b", "c").parallel();
    Stream<String> second = Stream.of("X", "Y", "Z");
    Stream<String> third = Stream.of("alpha", "beta", "gamma");
    Stream<String> fourth = Stream.empty();

    List<String> strings = Stream.of(first, second, third, fourth)
            .reduce(Stream.empty(), Stream::concat)   ➊
            .collect(Collectors.toList());

    List<String> stringList = Arrays.asList("a", "b", "c",
        "X", "Y", "Z", "alpha", "beta", "gamma");
    assertEquals(stringList, strings);
}

➊ 对空流使用 reduce 方法和二元运算符
由于用作方法引用的 concat 方法属于二元运算符,上述程序同样有效。不过请注意,虽然代码更简洁,但并不能解决潜在的栈溢出(stack overflow)问题。
有鉴于此,在合并多个流时,使用 flatMap 方法成为一种自然而然的解决方案,如例 3-62 所示。

例 3-62 拼接多个流(flatMap 方法)

@Test
public void flatMap() throws Exception {
    Stream<String> first = Stream.of("a", "b", "c").parallel();
    Stream<String> second = Stream.of("X", "Y", "Z");
    Stream<String> third = Stream.of("alpha", "beta", "gamma");
    Stream<String> fourth = Stream.empty();

    List<String> strings = Stream.of(first, second, third, fourth)
            .flatMap(Function.identity())
            .collect(Collectors.toList());
    List<String> stringList = Arrays.asList("a", "b", "c",
        "X", "Y", "Z", "alpha", "beta", "gamma");
    assertEquals(stringList, strings);
}

上述代码可以运行,但仍然有其不足。如果任何一个输入流为并行流,那么通过 concat 方法创建的流也是并行流,但 flatMap 方法返回的则不是并行流(例 3-63)。

例 3-63 并行流还是非并行流

@Test
public void concatParallel() throws Exception {
    Stream<String> first = Stream.of("a", "b", "c").parallel();
    Stream<String> second = Stream.of("X", "Y", "Z");
    Stream<String> third = Stream.of("alpha", "beta", "gamma");

    Stream<String> total = Stream.concat(Stream.concat(first, second), third);

    assertTrue(total.isParallel());
}

@Test
public void flatMapNotParallel() throws Exception {
    Stream<String> first = Stream.of("a", "b", "c").parallel();
    Stream<String> second = Stream.of("X", "Y", "Z");
    Stream<String> third = Stream.of("alpha", "beta", "gamma");
    Stream<String> fourth = Stream.empty();

    Stream<String> total = Stream.of(first, second, third, fourth)
            .flatMap(Function.identity());
    assertFalse(total.isParallel());
}

尽管如此,只要尚未开始处理数据,总是可以通过调用 parallel 方法来实现并行流(例 3-64)。

例 3-64 将 flatMap 方法返回的流转换为并行流

@Test
public void flatMapParallel() throws Exception {
    Stream<String> first = Stream.of("a", "b", "c").parallel();
    Stream<String> second = Stream.of("X", "Y", "Z");
    Stream<String> third = Stream.of("alpha", "beta", "gamma");
    Stream<String> fourth = Stream.empty();

    Stream<String> total = Stream.of(first, second, third, fourth)
            .flatMap(Function.identity());
    assertFalse(total.isParallel());

    total = total.parallel();
    assertTrue(total.isParallel());
}

如上所示,由于 flatMap 属于中间操作,可以通过 parallel 方法对流进行修改。
简而言之,concat 方法适用于两个流的拼接,可以作为一种一般性归约操作使用,flatMap 方法则更具普遍意义。

赞(2)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

Java 实例