Java 并行流的优点,用户希望了解并行流的优势所在,在合适的条件下应用并行流。Stream API 能方便地将顺序流转换为并行流,不过性能提升与否需要视情况而定。请记住,切换到并行流是一种优化操作,但首先应保证代码可以正常工作,再决定是否有必要使用并行流。
Java 并行流的优点 问题描述
用户希望了解并行流的优势所在。
Java 并行流的优点 解决方案
在合适的条件下应用并行流。
Java 并行流的优点 具体实例
Stream API 能方便地将顺序流转换为并行流,不过性能提升与否需要视情况而定。请记住,切换到并行流是一种优化操作,但首先应保证代码可以正常工作,再决定是否有必要使用并行流。建议根据实际情况进行决策。
在 Java 8 中,并行流默认使用通用 fork/join 线程池来分发任务。线程池大小等于 JVM 可用的处理器数量,它由 Runtime.getRuntime().availableProcessors()
确定。6 无论是将任务分解为多个子任务,还是将所有子任务的结果合并为最终输出,都会为 fork/join 线程池的管理引入开销。
6严格来说,线程池大小应为处理器数量减 1。但如果将主线程包括在内,则线程池大小与处理器数量相等。
为了使这些额外的开销物有所值,应在满足以下要求时再使用并行流:
- 数据量较大
- 每个元素的处理较为耗时
- 数据源易于分解
- 操作是无状态且关联的
前两项要求经常被合二为一。如果 N
为数据元素的数量,Q
为每个元素所需的计算时间,则 N
与 Q
的乘积通常需要超过某个阈值,使用并行流才可能获得性能提升 7。根据第三项要求,应采用易于分解的数据结构(如数组)。最后,在并行处理时,如果操作是有状态或有序的,那么显然会出现问题。
7一个约定俗成的公式是 N * Q > 10,000
,不过似乎没有人会使用很大的 Q
值,因此很难解释为何将 10 000 作为阈值。
为说明并行流对性能提升的效果,我们来看一个最简单的例子。如例 9-6 所示,程序为顺序流添加了为数不多的几个整数。
例 9-6 为顺序流添加整数
public static int doubleIt(int n) {
try {
Thread.sleep(100); ➊
} catch (InterruptedException ignore) {
}
return n * 2;
}
// 主线程
Instant before = Instant.now(); ➋
total = IntStream.of(3, 1, 4, 1, 5, 9)
.map(ParallelDemo::doubleIt)
.sum();
Instant after = Instant.now(); ➋
Duration duration = Duration.between(start, end);
System.out.println("Total of doubles = " + total);
System.out.println("time = " + duration.toMillis() + " ms");
❶ 人为引入延迟
❷ 获取倍增前后的时间
由于添加数字的过程极快,除非人为引入延迟,否则难以看出并行操作对性能提升的效果。在本例中,N
是如此之小(N = 6
),因此通过引入 100 毫秒的延迟(sleep(100)
)来扩展 Q
。
默认情况下,Java 创建的流都是顺序流。由于每个元素的倍增操作都延迟 100 毫秒,处理 6 个元素所需的总时间约为 600 毫秒。程序的实际执行结果如下:
Total of doubles = 46
time = 621 ms
接下来,我们对代码进行微调,改为使用并行流。如例 9-7 所示,在 map
操作前插入 parallel
方法,其他代码保持不变。
例 9-7 使用并行流
total = IntStream.of(3, 1, 4, 1, 5, 9)
.parallel() ➊
.map(ParallelDemo::doubleIt)
.sum();
➊ 使用并行流
在一台 8 核计算机上,实例化的 fork/join 线程池大小为 88。换言之,流中的每个元素都可以被分配一个 CPU 核心(假设当前没有其他任务,稍后将讨论),因此所有倍增操作基本会同时进行。
8线程池的实际大小为 7,但如果将主线程包括在内,则有 8 个独立的线程。
再次执行程序,结果如下:
Total of doubles = 46
time = 112 ms
每个倍增操作延迟 100 毫秒,且有足够的线程对每个数字单独处理,因此整个计算仅需 100 多毫秒就能完成。
1.利用JHM计时
众所周知,性能度量绝非易事,它与缓存、JVM 启动时间等多种因素有关。前面的示例只是粗略估算了顺序流和并行流的处理时间,如果希望获得更精确的测试结果,可以采用微基准测试框架(micro-benchmarking framework)。
JMH(Java Micro-benchmark Harness)是一种常用的 Java 微基准测试框架,它通过注解(annotation)来设置计时模式、范围、JVM 参数等。采用 JMH 重构前面的示例,结果如例 9-8 所示。
例 9-8 利用 JMH 对倍增操作计时
import org.openjdk.jmh.annotations.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"})
public class DoublingDemo {
public int doubleIt(int n) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
return n * 2;
}
@Benchmark
public int doubleAndSumSequential() {
return IntStream.of(3, 1, 4, 1, 5, 9)
.map(this::doubleIt)
.sum();
}
@Benchmark
public int doubleAndSumParallel() {
return IntStream.of(3, 1, 4, 1, 5, 9)
.parallel()
.map(this::doubleIt)
.sum();
}
}
根据默认设置,在一系列预热迭代(warmup iteration)9 后,JHM 将在两个独立的线程中执行 20 次迭代。典型的运行结果如下:
Benchmark Mode Cnt Score Error Units
DoublingDemo.doubleAndSumParallel avgt 40 103.523 ± 0.247 ms/op
DoublingDemo.doubleAndSumSequential avgt 40 620.242 ± 1.656 ms/op
可以看到,这些值与粗略估算的结果基本相同:顺序处理的平均耗时约为 620 毫秒,而并行处理的平均耗时约为 103 毫秒。换言之,只要处理器的数量足够多,并行操作就能为 6 个数字各自分配一个单独的线程,这比连续执行每个运算的速度要快 6 倍。
2.对基本数据类型求和
在上一节的示例中,由于 N
过小,为体现并行流对性能提升的效果,我们人为引入延迟以扩展 Q
。接下来,我们将对泛型流(generic stream)和基本类型流(primitive stream)的迭代操作、并行操作以及顺序操作进行比较,并通过较大的 N
值观察不同操作的性能。
本节的示例并不复杂,它根据经典教程 Mordern Java in Action (Second Edition)10 中一个类似的示例改编而成。
例 9-9 显示了采用迭代方式对循环中的数字求和。
例 9-9 采用迭代方式对循环中的数字求和
public long iterativeSum() {
long result = 0;
for (long i = 1L; i <= N; i++) {
result += i;
}
return result;
}
例 9-10 分别采用顺序方式和并行方式对 Stream<Long>
求和。
例 9-10 对泛型流求和
public long sequentialStreamSum() {
return Stream.iterate(1L, i -> i + 1)
.limit(N)
.reduce(0L, Long::sum);
}
public long parallelStreamSum() {
return Stream.iterate(1L, i -> i + 1)
.limit(N)
.parallel()
.reduce(0L, Long::sum);
}
parallelStreamSum
方法遇到的可能是最糟糕的情况,因为它返回 Stream<Long>
而非 LongStream
,且需要处理由 iterate
方法产生的数据集合,而它不易分解。
而例 9-11 使用 LongStream
接口(包括一个 sum
方法)定义的 rangeClosed
方法,有助于 Java 分区。
例 9-11
LongStream
的应用
public long sequentialLongStreamSum() {
return LongStream.rangeClosed(1, N)
.sum();
}
public long parallelLongStreamSum() {
return LongStream.rangeClosed(1, N)
.parallel()
.sum();
}
利用 JHM 对 1000 万个元素(N = 10,000,000
)进行测试,几种操作的测试结果如下:
Benchmark Mode Cnt Score Error Units
iterativeSum avgt 40 6.441 ± 0.019 ms/op
sequentialStreamSum avgt 40 90.468 ± 0.613 ms/op
parallelStreamSum avgt 40 99.148 ± 3.065 ms/op
sequentialLongStreamSum avgt 40 6.191 ± 0.248 ms/op
parallelLongStreamSum avgt 40 6.571 ± 2.756 ms/op
可以看到,装箱和拆箱操作引入了不少开销。使用 Stream<Long>
(而非 LongStream
)的 parallelStreamSum
和 sequentialStreamSum
操作非常慢,加之由 iterate
方法产生的数据集合不易分解,速度问题更加突出。LongStream.rangeClosed
方法则快得多,sequentialLongStreamSum
和 parallelLongStreamSum
操作在性能上几乎没有差别。
9旨在优化 JIT,让系统进入稳定状态,以便测试结果更接近实际情况。——译者注
10作者为 Urma、Fusco 与 Mycroft,由 Manning Publications 于 2018 年 5 月出版。(该书第 1 版中文版书名为《Java 8 实战》,已由人民邮电出版社出版,此版中文版也即将推出。——译者注)