Java 将顺序流转换为并行流,无论默认情况如何,用户希望创建顺序流(sequential stream)或并行流(parallel stream)。既可以使用 Collection
接口定义的 stream
或 parallelStream
方法,也可以使用 BaseStream
接口定义的 sequential
或 parallel
方法。
Java 将顺序流转换为并行流 问题描述
无论默认情况如何,用户希望创建顺序流(sequential stream)或并行流(parallel stream)。
Java 将顺序流转换为并行流 解决方案
既可以使用 Collection
接口定义的 stream
或 parallelStream
方法,也可以使用 BaseStream
接口定义的 sequential
或 parallel
方法。
Java 将顺序流转换为并行流 具体实例
默认情况下,Java 创建的流都是顺序流。在 BaseStream
接口(Stream
的父接口)中,可以通过 isParallel
方法判断流是否采用并行方式执行。
从例 9-1 可以看到,所有采用标准机制创建的流都是顺序流。
例 9-1 创建顺序流(JUnit 测试的一部分)
@Test
public void sequentialStreamOf() throws Exception {
assertFalse(Stream.of(3, 1, 4, 1, 5, 9).isParallel());
}
@Test
public void sequentialIterateStream() throws Exception {
assertFalse(Stream.iterate(1, n -> n + 1).isParallel());
}
@Test
public void sequentialGenerateStream() throws Exception {
assertFalse(Stream.generate(Math::random).isParallel());
}
@Test
public void sequentialCollectionStream() throws Exception {
List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9);
assertFalse(numbers.stream().isParallel());
}
如果数据源为集合,可以通过 parallelStream
方法返回一个可能的并行流,如例 9-2 所示。
例 9-2
parallelStream
方法的应用
@Test
public void parallelStreamMethodOnCollection() throws Exception {
List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9);
assertTrue(numbers.parallelStream().isParallel());
}
之所以强调“可能的”,是因为 parallelStream
方法在某些情况下也会返回顺序流,但默认返回的是并行流。Javadoc 指出,仅当创建自定义 spliterator
时才会返回顺序流,不过这种情况相当罕见。4
4毋庸置疑,这是一个有趣的话题。不过受篇幅所限,本书不对此做讨论。
如例 9-3 所示,也可以通过在现有流上使用 parallel
方法来创建并行流。
例 9-3 在流上使用
parallel
方法
@Test
public void parallelMethodOnStream() throws Exception {
assertTrue(Stream.of(3, 1, 4, 1, 5, 9)
.parallel()
.isParallel());
}
与 parallel
方法相对的是 sequential
方法,它将返回顺序流,如例 9-4 所示。
例 9-4 将并行流转换为顺序流
@Test
public void parallelStreamThenSequential() throws Exception {
List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9);
assertFalse(numbers.parallelStream()
.sequential()
.isParallel());
}
不过转换时需谨慎行事,否则可能落入陷阱。假设我们计划创建一个流水线(pipeline),其中一部分处理可以并行地完成,而其他处理仍然按顺序执行。那么,我们很容易写出如例 9-5 所示的代码。
例 9-5 并行流到顺序流的切换(与预期结果不同)
List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9);
List<Integer> nums = numbers.parallelStream() ➊
.map(n -> n * 2)
.peek(n -> System.out.printf("%s processing %d%n",
Thread.currentThread().getName(), n))
.sequential() ➋
.sorted()
.collect(Collectors.toList());
❶ 请求并行流
❷ 先切换为顺序流,再排序
上述代码的含义不难理解:先将所有数字倍增,再排序。由于倍增函数是无状态(stateless)且关联的(associative),采用并行操作可谓顺理成章。然而,排序本质上属于顺序操作 5。
5可以这样理解:使用并行流排序意味着将区间划分为若干相等的部分,然后分别对每部分排序,再尝试将所有经过排序的子区间合并在一起。那么从整体来看,最终的输出其实并没有实现排序。
在本例中,peek
方法用于显示进行处理的线程的名称,程序在调用 parallelStream
方法之后、sequential
方法之前调用 peek
。输出如下:
main processing 6
main processing 2
main processing 8
main processing 2
main processing 10
main processing 18
可以看到,main
线程完成了所有的处理。换言之,尽管调用了 parallelStream
方法,但返回的仍然是顺序流。这是什么原因呢?读者或许还记得,流在达到终止表达式(terminal expression)前不会进行任何操作,即在达到终止表达式时才会评估流的状态。在本例中,由于 collect
方法之前最后调用的是 sequential
方法,程序将返回顺序流,并对元素做相应处理。
流在执行时既可以是并行的,也可以是顺序执行的。
parallel
或sequential
方法能有效地设置或撤销设置一个布尔值,并在达到终止表达式时进行检查。
如果确有必要以并行方式处理部分流,而以顺序方式处理流的其他部分,建议使用两个单独的流。虽然这样处理也存在不少问题,但目前尚无更好的解决方案。