Java 并行流的优点

Java 并行流的优点,用户希望了解并行流的优势所在,在合适的条件下应用并行流。Stream API 能方便地将顺序流转换为并行流,不过性能提升与否需要视情况而定。请记住,切换到并行流是一种优化操作,但首先应保证代码可以正常工作,再决定是否有必要使用并行流。

Java 并行流的优点 问题描述

用户希望了解并行流的优势所在。

Java 并行流的优点 解决方案

在合适的条件下应用并行流。

Java 并行流的优点 具体实例

Stream API 能方便地将顺序流转换为并行流,不过性能提升与否需要视情况而定。请记住,切换到并行流是一种优化操作,但首先应保证代码可以正常工作,再决定是否有必要使用并行流。建议根据实际情况进行决策。
在 Java 8 中,并行流默认使用通用 fork/join 线程池来分发任务。线程池大小等于 JVM 可用的处理器数量,它由 Runtime.getRuntime().availableProcessors() 确定。6 无论是将任务分解为多个子任务,还是将所有子任务的结果合并为最终输出,都会为 fork/join 线程池的管理引入开销。
6严格来说,线程池大小应为处理器数量减 1。但如果将主线程包括在内,则线程池大小与处理器数量相等。

为了使这些额外的开销物有所值,应在满足以下要求时再使用并行流:

  • 数据量较大
  • 每个元素的处理较为耗时
  • 数据源易于分解
  • 操作是无状态且关联的

前两项要求经常被合二为一。如果 N 为数据元素的数量,Q 为每个元素所需的计算时间,则 NQ 的乘积通常需要超过某个阈值,使用并行流才可能获得性能提升 7。根据第三项要求,应采用易于分解的数据结构(如数组)。最后,在并行处理时,如果操作是有状态或有序的,那么显然会出现问题。
7一个约定俗成的公式是 N * Q > 10,000,不过似乎没有人会使用很大的 Q 值,因此很难解释为何将 10 000 作为阈值。

为说明并行流对性能提升的效果,我们来看一个最简单的例子。如例 9-6 所示,程序为顺序流添加了为数不多的几个整数。

例 9-6 为顺序流添加整数

public static int doubleIt(int n) {
    try {
        Thread.sleep(100);              ➊
    } catch (InterruptedException ignore) {
    }
    return n * 2;
}

// 主线程
Instant before = Instant.now();         ➋
total = IntStream.of(3, 1, 4, 1, 5, 9)
        .map(ParallelDemo::doubleIt)
        .sum();
Instant after = Instant.now();          ➋
Duration duration = Duration.between(start, end);
System.out.println("Total of doubles = " + total);
System.out.println("time = " + duration.toMillis() + " ms");

❶ 人为引入延迟
❷ 获取倍增前后的时间
由于添加数字的过程极快,除非人为引入延迟,否则难以看出并行操作对性能提升的效果。在本例中,N 是如此之小(N = 6),因此通过引入 100 毫秒的延迟(sleep(100))来扩展 Q
默认情况下,Java 创建的流都是顺序流。由于每个元素的倍增操作都延迟 100 毫秒,处理 6 个元素所需的总时间约为 600 毫秒。程序的实际执行结果如下:

Total of doubles = 46
time = 621 ms

接下来,我们对代码进行微调,改为使用并行流。如例 9-7 所示,在 map 操作前插入 parallel 方法,其他代码保持不变。

例 9-7 使用并行流

total = IntStream.of(3, 1, 4, 1, 5, 9)
    .parallel()                   ➊
    .map(ParallelDemo::doubleIt)
    .sum();

➊ 使用并行流
在一台 8 核计算机上,实例化的 fork/join 线程池大小为 88。换言之,流中的每个元素都可以被分配一个 CPU 核心(假设当前没有其他任务,稍后将讨论),因此所有倍增操作基本会同时进行。
8线程池的实际大小为 7,但如果将主线程包括在内,则有 8 个独立的线程。

再次执行程序,结果如下:

Total of doubles = 46
time = 112 ms

每个倍增操作延迟 100 毫秒,且有足够的线程对每个数字单独处理,因此整个计算仅需 100 多毫秒就能完成。
1.利用JHM计时
众所周知,性能度量绝非易事,它与缓存、JVM 启动时间等多种因素有关。前面的示例只是粗略估算了顺序流和并行流的处理时间,如果希望获得更精确的测试结果,可以采用微基准测试框架(micro-benchmarking framework)。
JMH(Java Micro-benchmark Harness)是一种常用的 Java 微基准测试框架,它通过注解(annotation)来设置计时模式、范围、JVM 参数等。采用 JMH 重构前面的示例,结果如例 9-8 所示。

例 9-8 利用 JMH 对倍增操作计时

import org.openjdk.jmh.annotations.*;
 
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
 
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"})
public class DoublingDemo {
    public int doubleIt(int n) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException ignored) {
        }
        return n * 2;
    }
 
    @Benchmark
    public int doubleAndSumSequential() {
        return IntStream.of(3, 1, 4, 1, 5, 9)
                .map(this::doubleIt)
                .sum();
    }
 
    @Benchmark
    public int doubleAndSumParallel() {
        return IntStream.of(3, 1, 4, 1, 5, 9)
                .parallel()
                .map(this::doubleIt)
                .sum();
    }
}

根据默认设置,在一系列预热迭代(warmup iteration)9 后,JHM 将在两个独立的线程中执行 20 次迭代。典型的运行结果如下:

Benchmark                            Mode  Cnt    Score    Error  Units
DoublingDemo.doubleAndSumParallel    avgt   40  103.523 ± 0.247  ms/op
DoublingDemo.doubleAndSumSequential  avgt   40  620.242 ± 1.656  ms/op

可以看到,这些值与粗略估算的结果基本相同:顺序处理的平均耗时约为 620 毫秒,而并行处理的平均耗时约为 103 毫秒。换言之,只要处理器的数量足够多,并行操作就能为 6 个数字各自分配一个单独的线程,这比连续执行每个运算的速度要快 6 倍。

2.对基本数据类型求和
在上一节的示例中,由于 N 过小,为体现并行流对性能提升的效果,我们人为引入延迟以扩展 Q。接下来,我们将对泛型流(generic stream)和基本类型流(primitive stream)的迭代操作、并行操作以及顺序操作进行比较,并通过较大的 N 值观察不同操作的性能。

本节的示例并不复杂,它根据经典教程 Mordern Java in Action (Second Edition)10 中一个类似的示例改编而成。

例 9-9 显示了采用迭代方式对循环中的数字求和。

例 9-9 采用迭代方式对循环中的数字求和

public long iterativeSum() {
    long result = 0;
    for (long i = 1L; i <= N; i++) {
        result += i;
    }
    return result;
}

例 9-10 分别采用顺序方式和并行方式对 Stream<Long> 求和。

例 9-10 对泛型流求和

public long sequentialStreamSum() {
    return Stream.iterate(1L, i -> i + 1)
            .limit(N)
            .reduce(0L, Long::sum);
}
 
public long parallelStreamSum() {
    return Stream.iterate(1L, i -> i + 1)
            .limit(N)
            .parallel()
            .reduce(0L, Long::sum);
}

parallelStreamSum 方法遇到的可能是最糟糕的情况,因为它返回 Stream<Long> 而非 LongStream,且需要处理由 iterate 方法产生的数据集合,而它不易分解。
而例 9-11 使用 LongStream 接口(包括一个 sum 方法)定义的 rangeClosed 方法,有助于 Java 分区。

例 9-11 LongStream 的应用

public long sequentialLongStreamSum() {
    return LongStream.rangeClosed(1, N)
            .sum();
}
 
public long parallelLongStreamSum() {
    return LongStream.rangeClosed(1, N)
            .parallel()
            .sum();
}

利用 JHM 对 1000 万个元素(N = 10,000,000)进行测试,几种操作的测试结果如下:

Benchmark                Mode  Cnt   Score    Error  Units
iterativeSum             avgt   40   6.441 ± 0.019  ms/op
sequentialStreamSum      avgt   40  90.468 ± 0.613  ms/op
parallelStreamSum        avgt   40  99.148 ± 3.065  ms/op
sequentialLongStreamSum  avgt   40   6.191 ± 0.248  ms/op
parallelLongStreamSum    avgt   40   6.571 ± 2.756  ms/op

可以看到,装箱和拆箱操作引入了不少开销。使用 Stream<Long>(而非 LongStream)的 parallelStreamSumsequentialStreamSum 操作非常慢,加之由 iterate 方法产生的数据集合不易分解,速度问题更加突出。LongStream.rangeClosed 方法则快得多,sequentialLongStreamSumparallelLongStreamSum 操作在性能上几乎没有差别。

9旨在优化 JIT,让系统进入稳定状态,以便测试结果更接近实际情况。——译者注

10作者为 Urma、Fusco 与 Mycroft,由 Manning Publications 于 2018 年 5 月出版。(该书第 1 版中文版书名为《Java 8 实战》,已由人民邮电出版社出版,此版中文版也即将推出。——译者注)

赞(0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

Java 实例