Java 流的拼接,用户希望将两个或多个流合并为一个流,Stream.concat
方法用于合并两个流。如果需要合并多个流,请使用 Stream.flatMap
方法。假设我们从多个信源获取到数据,且希望使用流来处理其中的每个元素。一种方案是采用 Stream
接口定义的 concat
方法。
Java 流的拼接 问题描述
用户希望将两个或多个流合并为一个流。
Java 流的拼接 解决方案
Stream.concat
方法用于合并两个流。如果需要合并多个流,请使用 Stream.flatMap
方法。
Java 流的拼接 具体实例
假设我们从多个信源获取到数据,且希望使用流来处理其中的每个元素。一种方案是采用 Stream
接口定义的 concat
方法,其签名如下:
static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)
concat
方法将创建一个惰性的拼接流(lazily concatenated stream),其元素是第一个流的所有元素,后跟第二个流的所有元素。根据 Javadoc 的描述,如果两个输入流均为有序流,则生成的流也是有序流;如果某个输入流为并行流,则生成的流也是并行流。关闭生成的流也会关闭两个输入流。
两个输入流所包含的元素类型必须相同。
例 3-59 显示了拼接两个流的简单示例。
例 3-59 拼接两个流
@Test
public void concat() throws Exception {
Stream<String> first = Stream.of("a", "b", "c").parallel();
Stream<String> second = Stream.of("X", "Y", "Z");
List<String> strings = Stream.concat(first, second) ➊
.collect(Collectors.toList());
List<String> stringList = Arrays.asList("a", "b", "c", "X", "Y", "Z");
assertEquals(stringList, strings);
}
➊ 将第二个流的元素附加到第一个流的元素之后
如果需要添加第三个流,可以嵌套使用拼接操作,如例 3-60 所示。
例 3-60 拼接多个流(
concat
方法)
@Test
public void concatThree() throws Exception {
Stream<String> first = Stream.of("a", "b", "c").parallel();
Stream<String> second = Stream.of("X", "Y", "Z");
Stream<String> third = Stream.of("alpha", "beta", "gamma");
List<String> strings = Stream.concat(Stream.concat(first, second), third)
.collect(Collectors.toList());
List<String> stringList = Arrays.asList("a", "b", "c",
"X", "Y", "Z", "alpha", "beta", "gamma");
assertEquals(stringList, strings);
}
尽管嵌套拼接的方案可行,但请注意 Javadoc 所做的注释:
通过重复拼接操作构建流时应谨慎行事。访问一个深度拼接流中的元素可能导致深层调用链(deep call chain)甚至抛出
StackOverflowException
。
换言之,concat
方法实际上构建了一个流的二叉树(binary tree),使用过多就会变得难以处理。
另一种方案是采用 reduce
方法执行多次拼接操作,如例 3-61 所示。
例 3-61 拼接多个流(
reduce
方法)
@Test
public void reduce() throws Exception {
Stream<String> first = Stream.of("a", "b", "c").parallel();
Stream<String> second = Stream.of("X", "Y", "Z");
Stream<String> third = Stream.of("alpha", "beta", "gamma");
Stream<String> fourth = Stream.empty();
List<String> strings = Stream.of(first, second, third, fourth)
.reduce(Stream.empty(), Stream::concat) ➊
.collect(Collectors.toList());
List<String> stringList = Arrays.asList("a", "b", "c",
"X", "Y", "Z", "alpha", "beta", "gamma");
assertEquals(stringList, strings);
}
➊ 对空流使用 reduce
方法和二元运算符
由于用作方法引用的 concat
方法属于二元运算符,上述程序同样有效。不过请注意,虽然代码更简洁,但并不能解决潜在的栈溢出(stack overflow)问题。
有鉴于此,在合并多个流时,使用 flatMap
方法成为一种自然而然的解决方案,如例 3-62 所示。
例 3-62 拼接多个流(
flatMap
方法)
@Test
public void flatMap() throws Exception {
Stream<String> first = Stream.of("a", "b", "c").parallel();
Stream<String> second = Stream.of("X", "Y", "Z");
Stream<String> third = Stream.of("alpha", "beta", "gamma");
Stream<String> fourth = Stream.empty();
List<String> strings = Stream.of(first, second, third, fourth)
.flatMap(Function.identity())
.collect(Collectors.toList());
List<String> stringList = Arrays.asList("a", "b", "c",
"X", "Y", "Z", "alpha", "beta", "gamma");
assertEquals(stringList, strings);
}
上述代码可以运行,但仍然有其不足。如果任何一个输入流为并行流,那么通过 concat
方法创建的流也是并行流,但 flatMap
方法返回的则不是并行流(例 3-63)。
例 3-63 并行流还是非并行流
@Test
public void concatParallel() throws Exception {
Stream<String> first = Stream.of("a", "b", "c").parallel();
Stream<String> second = Stream.of("X", "Y", "Z");
Stream<String> third = Stream.of("alpha", "beta", "gamma");
Stream<String> total = Stream.concat(Stream.concat(first, second), third);
assertTrue(total.isParallel());
}
@Test
public void flatMapNotParallel() throws Exception {
Stream<String> first = Stream.of("a", "b", "c").parallel();
Stream<String> second = Stream.of("X", "Y", "Z");
Stream<String> third = Stream.of("alpha", "beta", "gamma");
Stream<String> fourth = Stream.empty();
Stream<String> total = Stream.of(first, second, third, fourth)
.flatMap(Function.identity());
assertFalse(total.isParallel());
}
尽管如此,只要尚未开始处理数据,总是可以通过调用 parallel
方法来实现并行流(例 3-64)。
例 3-64 将
flatMap
方法返回的流转换为并行流
@Test
public void flatMapParallel() throws Exception {
Stream<String> first = Stream.of("a", "b", "c").parallel();
Stream<String> second = Stream.of("X", "Y", "Z");
Stream<String> third = Stream.of("alpha", "beta", "gamma");
Stream<String> fourth = Stream.empty();
Stream<String> total = Stream.of(first, second, third, fourth)
.flatMap(Function.identity());
assertFalse(total.isParallel());
total = total.parallel();
assertTrue(total.isParallel());
}
如上所示,由于 flatMap
属于中间操作,可以通过 parallel
方法对流进行修改。
简而言之,concat
方法适用于两个流的拼接,可以作为一种一般性归约操作使用,flatMap
方法则更具普遍意义。