Java 将顺序流转换为并行流

Java 将顺序流转换为并行流,无论默认情况如何,用户希望创建顺序流(sequential stream)或并行流(parallel stream)。既可以使用 Collection 接口定义的 streamparallelStream 方法,也可以使用 BaseStream 接口定义的 sequentialparallel 方法。

Java 将顺序流转换为并行流 问题描述

无论默认情况如何,用户希望创建顺序流(sequential stream)或并行流(parallel stream)。

Java 将顺序流转换为并行流 解决方案

既可以使用 Collection 接口定义的 streamparallelStream 方法,也可以使用 BaseStream 接口定义的 sequentialparallel 方法。

Java 将顺序流转换为并行流 具体实例

默认情况下,Java 创建的流都是顺序流。在 BaseStream 接口(Stream 的父接口)中,可以通过 isParallel 方法判断流是否采用并行方式执行。
从例 9-1 可以看到,所有采用标准机制创建的流都是顺序流。

例 9-1 创建顺序流(JUnit 测试的一部分)

@Test
public void sequentialStreamOf() throws Exception {
    assertFalse(Stream.of(3, 1, 4, 1, 5, 9).isParallel());
}

@Test
public void sequentialIterateStream() throws Exception {
    assertFalse(Stream.iterate(1, n -> n + 1).isParallel());
}

@Test
public void sequentialGenerateStream() throws Exception {
    assertFalse(Stream.generate(Math::random).isParallel());
}

@Test
public void sequentialCollectionStream() throws Exception {
    List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9);
    assertFalse(numbers.stream().isParallel());
}

如果数据源为集合,可以通过 parallelStream 方法返回一个可能的并行流,如例 9-2 所示。

例 9-2 parallelStream 方法的应用

@Test
public void parallelStreamMethodOnCollection() throws Exception {
    List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9);
    assertTrue(numbers.parallelStream().isParallel());
}

之所以强调“可能的”,是因为 parallelStream 方法在某些情况下也会返回顺序流,但默认返回的是并行流。Javadoc 指出,仅当创建自定义 spliterator 时才会返回顺序流,不过这种情况相当罕见。4
4毋庸置疑,这是一个有趣的话题。不过受篇幅所限,本书不对此做讨论。

如例 9-3 所示,也可以通过在现有流上使用 parallel 方法来创建并行流。

例 9-3 在流上使用 parallel 方法

@Test
public void parallelMethodOnStream() throws Exception {
    assertTrue(Stream.of(3, 1, 4, 1, 5, 9)
            .parallel()
            .isParallel());
}

parallel 方法相对的是 sequential 方法,它将返回顺序流,如例 9-4 所示。

例 9-4 将并行流转换为顺序流

@Test
public void parallelStreamThenSequential() throws Exception {
    List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9);
    assertFalse(numbers.parallelStream()
            .sequential()
            .isParallel());
}

不过转换时需谨慎行事,否则可能落入陷阱。假设我们计划创建一个流水线(pipeline),其中一部分处理可以并行地完成,而其他处理仍然按顺序执行。那么,我们很容易写出如例 9-5 所示的代码。

例 9-5 并行流到顺序流的切换(与预期结果不同)

List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9);
List<Integer> nums = numbers.parallelStream()  ➊
      .map(n -> n * 2)
      .peek(n -> System.out.printf("%s processing %d%n",
              Thread.currentThread().getName(), n))
      .sequential()                              ➋
      .sorted()
      .collect(Collectors.toList());

❶ 请求并行流
❷ 先切换为顺序流,再排序
上述代码的含义不难理解:先将所有数字倍增,再排序。由于倍增函数是无状态(stateless)且关联的(associative),采用并行操作可谓顺理成章。然而,排序本质上属于顺序操作 5
5可以这样理解:使用并行流排序意味着将区间划分为若干相等的部分,然后分别对每部分排序,再尝试将所有经过排序的子区间合并在一起。那么从整体来看,最终的输出其实并没有实现排序。

在本例中,peek 方法用于显示进行处理的线程的名称,程序在调用 parallelStream 方法之后、sequential 方法之前调用 peek。输出如下:

main processing 6
main processing 2
main processing 8
main processing 2
main processing 10
main processing 18

可以看到,main 线程完成了所有的处理。换言之,尽管调用了 parallelStream 方法,但返回的仍然是顺序流。这是什么原因呢?读者或许还记得,流在达到终止表达式(terminal expression)前不会进行任何操作,即在达到终止表达式时才会评估流的状态。在本例中,由于 collect 方法之前最后调用的是 sequential 方法,程序将返回顺序流,并对元素做相应处理。

流在执行时既可以是并行的,也可以是顺序执行的。parallelsequential 方法能有效地设置或撤销设置一个布尔值,并在达到终止表达式时进行检查。

如果确有必要以并行方式处理部分流,而以顺序方式处理流的其他部分,建议使用两个单独的流。虽然这样处理也存在不少问题,但目前尚无更好的解决方案。

赞(0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

Java 实例