您好,登錄后才能下訂單哦!
并行流
并行流是一個把元素分成多個塊的流,每個塊用不同的線程處理。可以自動分區,讓所有的處理器都忙起來。
假設要寫一個方法,接受一個數量n做參數,計算1-n的和。可以這樣實現:
public long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .reduce(0L, Long::sum); }
也許可以使用parallel方法,簡單地使用并行計算,提高程序性能:
public long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() .reduce(0L, Long::sum); }
這樣,流可能在內部被分成多個塊,導致reduction操作可以在不同的塊上互不依賴地并行地各自工作。最后,reduction操作組合每個子流的并行reductions的返回值,返回的結果就是整個流的結果。見下面的示意圖
實際上,調用parallel方法,流自身不會有任何變化。在內部,設置一個布爾類型的標記,標明你想在并行模式執行操作,接下來的操作都是并行的。
類似地,你也可以使用sequential方法,把并行流轉成串行的。你也許認為可以組合這兩個方法:
stream.parallel() .filter(...) .sequential() .map(...) .parallel() .reduce();
但是,最后一次調用parallel或者sequential才會全局地影響管道。上面的例子,管道將被并行地執行。
配置并行流使用的線程池
并行流內部使用ForkJoinPool。默認地,線程數量等于處理器數量(Runtime.getRuntime().availableProcessors())。但是,可以修改系統屬性java.util.concurrent.ForkJoinPool.common.parallelism,配置線程數量。
這是全局配置,所以,除非你認為對性能有幫助,否則不要修改。
測量流的性能
我們聲稱并行加法應該比串行的或者自己的迭代方法快。我們可以使用JMH測量一下。這是一個工具,使用基于注解的方法,可以為JVM程序增加
可靠的microbenchmarks。如果使用maven,可以這樣引入:
<dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-core</artifactId> <version>1.21</version> </dependency> <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-generator-annprocess</artifactId> <version>1.21</version> </dependency>
第一個庫是核心實現,第二個包含一個注解處理器,幫助生成JAR文件,通過它可以方便地運行你的benchmark。maven配置里還應該有下面的plugin:
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <finalName>benchmarks</finalName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.openjdk.jmh.Main</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin>
程序代碼如下
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; //測量平均時間 @BenchmarkMode(Mode.AverageTime) //以毫秒為單位,打印benchmark結果 @OutputTimeUnit(TimeUnit.MILLISECONDS) //執行兩次,增加可靠性。堆空間是4Gb @Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"}) @State(Scope.Benchmark) public class ParallelStreamBenchmark { private static final long N = 10_000_000L; @Benchmark public long sequentialSum() { return Stream.iterate(1L, i -> i + 1).limit(N) .reduce(0L, Long::sum); } //每次執行benchmark后,執行GC @TearDown(Level.Invocation) public void tearDown() { System.gc(); } }
使用大內存,和每次迭代以后試著GC都是為了盡量減少GC的影響。盡管如此,結果應該再加一些鹽。很多因素會影響執行時間,比如你的機器有多少核。
默認地,JMH一般先執行5次熱身迭代,這樣可以讓HotSpot優化代碼,然后再執行5次迭代用來計算最終的結果。你可以使用-w和-i命令行參數修改這些配置。
在我的機器上,使用JDK 1.8.0_121, Java HotSpot™ 64-Bit Server VM,執行結果是
Benchmark Mode Cnt Score Error Units
ParallelStreamBenchmark.sequentialSum avgt 10 83.565 ± 1.841 ms/op
你應該期望,使用經典的for循環的迭代版本運行得更快,因為它在更低層(level)工作,而且,更重要的是,它不需要執行原始類型的裝箱和拆箱操作。我們測試一下這個方法:
@Benchmark public long iterativeSum() { long result = 0; for (long i = 1L; i <= N; i++) { result += i; } return result; }
執行結果是
Benchmark Mode Cnt Score Error Units
ParallelStreamBenchmark.iterativeSum avgt 10 6.877 ± 0.068 ms/op
證實了我們的期望:迭代版本比串行流快了10倍。讓我們使用并行流試一試:
Benchmark Mode Cnt Score Error Units
ParallelStreamBenchmark.parallelSum avgt 10 110.157 ± 1.882 ms/op
非常令人失望:并行版本的求和一點都沒有發揮多核的優勢,比串行版還要慢。為什么會這樣?有兩個問題混在一起:
迭代生成了裝箱對象,它們在做加法前,必須拆箱成數字
迭代很難劃分獨立的塊來并行地執行
第二點是特別有趣的,不是所有的流都是適合并行處理的。特別是,迭代的流就很難,這是因為,函數的輸入依賴上一個函數的結果。見下圖:
這意味著,reduction過程并沒有像第一張圖里所表示的那樣執行。reduction開始的時候,還沒有整個數字列表,所以沒法分塊。把流標記為并行的,反而增加了在不同線程上執行的求和要被串行處理的負擔。
使用更專業的方法
LongStream.rangeClosed方法使用的是原始long類型,所以不用裝箱和拆箱。而且,它生產的數的范圍,可以很容易地分成不依賴的塊。比如,范圍1-20可以被分成1-5、6-10、11-15和16-20。
@Benchmark public long rangedSum() { return LongStream.rangeClosed(1, N) .reduce(0L, Long::sum); }
輸出是
Benchmark Mode Cnt Score Error Units
ParallelStreamBenchmark.rangedSum avgt 10 7.660 ± 1.643 ms/op
可以看出來,比并行流快了很多,僅比經典的for循環慢了一點。LongStream支持并行:
@Benchmark public long parallelRangedSum() { return LongStream.rangeClosed(1, N) .parallel() .reduce(0L, Long::sum); }
輸出是
Benchmark Mode Cnt Score Error Units
ParallelStreamBenchmark.parallelRangedSum avgt 10 4.790 ± 5.142 ms/op
可以發現,并行生效了。甚至比for循環還快了1/3。
正確使用并行流
濫用并行流產生錯誤的主要原因是使用了改變共享狀態的算法。下面是一個通過改變共享的累加器來實現前n個自然數求和的例子:
public long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).forEach(accumulator::add); return accumulator.total; } public class Accumulator { public long total = 0; public void add(long value) { total += value; } }
這種代碼很常見,特別對熟悉命令式編程范式的開發者而言。當你迭代數字列表時,經常這樣做:初始化一個累加器,遍歷元素,使用累加器相加。
這代碼有什么錯?它是串行的,失去了并行性。讓我們試著使用并行流:
public long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; }
多執行幾次,你會發現,每次返回的結果都不一樣,而且都不是正確的50000005000000。這是因為多線程累加的時候,total += value并不是原子操作。那么怎樣才能寫出并行情況下,正確的代碼呢?
如果有懷疑,就做測試
注意裝箱問題。Java提供的原始類型流(IntStream、LongStream和DoubleStream)可以避免類似的問題,盡量使用他們
有些操作使用并行流性能更差。尤其是像limit和findFirst這種依賴元素順序的操作,使用并行是非常昂貴的。比如,findAny就比findFirst性能好,因為它跟順序無關。調用unordered方法,可以把一個有順序的流變成無順序的流。比如,如果你需要流的N個元素,而你對前M個感興趣,在一個無順序的流上調用limit比有順序的高效
如果數據量不大,不要選擇并行流
要考慮流的底層數據結構的可分解程度。比如,ArrayList比LinkedList分解起來更高效,因為不遍歷就可以分割。使用range工廠增加的原始類型流也很容易分割。可以通過實現自己的Spliterator分割流
流的特征,以及中間操作如何修改流的元素,會改變分解過程的性能。比如,一個SIZED流可以被分解成兩個相等的部分,并且每個部分可以高效得并行處理,但是,filter會過濾掉任何不滿足條件的元素,導致流的size成了未知的
考慮結束操作是廉價的還是昂貴的merge步驟(比如,Collector的combiner方法)。如果是昂貴的,組合并行結果的代價會比并行流帶來的好處還要高
下面的表格,總結一些流在可分解性方面的并行友好性
源 | 可分解性 |
---|---|
ArrayList | 優秀 |
LinkedList | 差 |
IntStream.range | 優秀 |
Stream.iterate | 差 |
HashSet | 好 |
TreeSet | 好 |
fork/join框架
fork/join框架用來遞歸地把可并行的任務分解成小任務,然后組合每個子任務的結果,以生成總的結果。它實現了ExecutorService接口,這樣所有的子任務都在一個線程池(ForkJoinPool)內工作。
RecursiveTask
要向ForkJoinPool提交任務,你不得不增加RecursiveTask的子類-R是并行任務(以及每個子任務)的返回類型,或者
增加RecursiveAction的子類-當沒有返回值的時候。要定義RecursiveTask,需要實現它唯一的抽象方法:
protected abstract R compute();
該方法定義分割任務和不能繼續被分割時處理一個子任務的算法的邏輯。該方法的實現,經常像下面的偽代碼:
if (任務足夠小,不再被分) { 順序執行任務 } else { 把任務分成兩個子任務 遞歸地調用本方法,盡量分割每個子任務 等待所有子任務的完成 組合每個子任務的結果 }
可以發現,這是分治算法的并行實現。我們繼續求和的例子,演示怎么使用fork/join框架。首先需要擴展RecursiveTask類:
import java.util.concurrent.RecursiveTask; /** * Created by leishu on 18-12-11. */ public class ForkJoinSumCalculator extends RecursiveTask<Long> { //分割任務的閾值 public static final long THRESHOLD = 10_000; //要被求和的數組 private final long[] numbers; private final int start; private final int end; public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } //生成子任務的私有構造器 private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { //子任務的大小 int length = end - start; if (length <= THRESHOLD) { return computeSequentially();//小于閾值,不分割 } //增加第一個子任務 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); //異步執行,新的子任務使用ForkJoinPool的另一個線程 leftTask.fork(); ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); //同步執行第二個子任務,允許遞歸 Long rightResult = rightTask.compute(); //讀取第一個子任務的結果,如果沒完成就等待 Long leftResult = leftTask.join(); //組合 return leftResult + rightResult; } //順序執行 private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } }
使用fork/join的最佳實踐
調用任務的join方法,會阻塞調用者,直到返回結果。所以,要在兩個子任務都啟動以后在調用它
不要在RecursiveTask內使用ForkJoinPool的invoke方法
子任務的fork方法是用來做調度的。在兩個子任務上直接調用它似乎是很自然的,但是,在其中一個上調用compute效率更高,因為這樣能重用相同的線程
偷工作
任務被分給ForkJoinPool里的線程。每個線程有一個保存任務的雙端鏈表,順序地執行鏈表中的任務。如果由于某種原因(比如I/O),一個線程完成了分配給他的全部任務,它會隨機地從其他線程選擇一個隊列,從隊列的尾部偷一個任務。這個過程會持續,直到所有的隊列都空了為止。所以,要有大量的小任務,而不是幾個大任務,這樣可以更好地平衡線程的負荷。
Spliterator
Spliterator是Java 8 提供的新接口,意思是“splitable iterator”,用來并行地迭代源中的元素。也許你不用開發自己的Spliterator,但是,理解了它,也就明白了并行流是如何工作的。Java 8已經在Collections框架內提供了Spliterator的默認實現。Collection接口有一個default方法spliterator(),它就返回一個Spliterator對象。我們先看看Spliterator接口的定義:
public interface Spliterator<T> { //用來按順序消費Spliterator的元素,如果還有元素就返回true boolean tryAdvance(Consumer<? super T> action); //把一些元素分到一個新的Spliterator,以允許他們并行處理 Spliterator<T> trySplit(); //剩余的可被遍歷的元素數量估值 long estimateSize(); int characteristics(); }
tryAdvance方法的行為類似于迭代器,用來按順序消費Spliterator的元素,如果還有元素就返回true。trySplit方法
用來把一些元素分到一個新的Spliterator,以允許他們并行處理。
分割過程
把一個流分割成多個部分是一個遞歸過程,如下圖所示。首先,在第一個Spliterator上調用trySplit生成一個新的。然后,在這兩個Spliterator上調用trySplit,這樣產生四個。一直進行下去,直到該方法返回null,標志著不能再被分割。最后,當所有的trySplit都返回null時,遞歸過程結束。
分割過程也會受到Spliterator的特征(由characteristics方法聲明)的影響。
Spliterator特征
characteristics方法返回一個整數,用來更好地控制和優化Spliterator的用法。
Characteristic | 描述 |
---|---|
ORDERED | 元素是有順序的(比如List),所以Spliterator使用該順序做遍歷和分區 |
DISTINCT | 對于每對遍歷的元素x和y,x.equals(y)返回false |
SORTED | 遍歷的元素遵循預定義的排序順序 |
SIZED | 源的size是已知的(比如set),所以estimatedSize()返回的值是精確的 |
NON-NULL | 元素不會為空 |
IMMUTABLE | 源是不可變的,說明遍歷的時候,元素不會被增加、修改和刪除 |
CONCURRENT | 源是并發安全的,并發修改的時候,不用任何同步 |
SUBSIZED | Spliterator和接下來產生的Spliterator都是SIZED |
實現自己的Spliterator
我們開發一個簡單的方法,用來計算字符串中的單詞數。
public int countWordsIteratively(String s) { int counter = 0; boolean lastSpace = true; for (char c : s.toCharArray()) { if (Character.isWhitespace(c)) { lastSpace = true; } else { if (lastSpace) counter++; lastSpace = false; } } return counter; }
要計算的字符串是但丁的“地域”的第一句
public static final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " che la dritta via era smarrita "; System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
注意,兩個單詞間的空格數是隨機的。執行結果
Found 19 words
使用函數式實現
首先需要把字符串轉換成一個流。原始類型int、long和double才有原始的的流,所以,我們使用Stream:
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
可以使用reduction計算單詞數量。當reduce的時候,你不得不攜帶由兩個變量組成的狀態:整數型的總數和布爾型的字符是否是空格。因為Java沒有tuples,你得增加一個新類-WordCounter-封裝狀態:
class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } //遍歷,累加 public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { //如果上一個字符是空格,而當前的不是,就加1 return lastSpace ? new WordCounter(counter + 1, false) : this; } } //組合,求和 public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } }
下面是遍歷一個新字符時,WordCounter的狀態圖
然后,我們就可以使用流的reduce方法了
private int countWords(Stream<Character> stream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter(); }
我們做一下測試
Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt); System.out.println("Found " + countWords(stream) + " words");
執行結果是正確的。
并行的實現
我們修改一下代碼
System.out.println("Found " + countWords(stream.parallel()) + " words");
執行結果不是找到19個單詞了。因為源字符串在隨意的位置被分割,一個字符被多次分割。要解決這個問題,就需要實現自己的Spliterator。
class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; private WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<? super Character> action) { //消費當前字符 action.accept(string.charAt(currentChar++)); //如果還有字符可被消費,返回true return currentChar < string.length(); } @Override public Spliterator<Character> trySplit() { int currentSize = string.length() - currentChar; //小于閾值,不再分割 if (currentSize < 10) { return null; } //候選的分割位置是字符串的一半長度 for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) { //如果是空格,才分割 if (Character.isWhitespace(string.charAt(splitPos))) { Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos)); //當前位置修改為分割位置 currentChar = splitPos; return spliterator; } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } }
然后,我們做測試
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true); System.out.println("Found " + countWords(stream) + " words");
這回沒問題了。
以上這篇Java 并行數據處理和性能分析就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。