(給ImportNew加星標,提高Java技能)
編譯:ImportNew/唐尤華
medium.com/javarevisited/java-8-parallel-stream-java2blog-e1254e593763
在這篇文章中,我們將介紹 Java 並行流(Parallel Stream)。
[Java 8][1] 引入了”並行流“概念實現並行處理。隨著硬體成本降低,現在的 CPU 大都擁有多個核心,因此可以使用並行處理加快操作執行。
[1]:https://java2blog.com/java-8-tutorial/
讓我們透過一個簡單的例子來幫助理解:
```java
package org.arpit.java2blog.java8;
import java.util.Arrays;
import java.util.stream.IntStream;
public class Java8ParallelStreamMain {
public static void main(String[] args) {
System.out.println("=================================");
System.out.println("Using Sequential Stream");
System.out.println("=================================");
int[] array = {1,2,3,4,5,6,7,8,9,10};
IntStream intArrStream = Arrays.stream(array);
intArrStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
});
System.out.println("=================================");
System.out.println("Using Parallel Stream");
System.out.println("=================================");
IntStream intParallelStream=Arrays.stream(array).parallel();
intParallelStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
});
}
}
```
執行上面的程式,輸出結果如下:
```shell
=================================
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 ForkJoinPool.commonPool-worker-3
3 ForkJoinPool.commonPool-worker-1
9 ForkJoinPool.commonPool-worker-2
2 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-2
1 ForkJoinPool.commonPool-worker-3
8 ForkJoinPool.commonPool-worker-2
4 ForkJoinPool.commonPool-worker-1
```
仔細觀察輸出結果可以看到,在順序流情況下,主執行緒將完成所有工作。主執行緒會等待當前迭代完成,然後接著進行下一次迭代。
在[並行流][2]的情況下,會同時產生4個執行緒,在內部使用 `ForkJoinPool` 建立和管理執行緒。並行流透過 `ForkJoinPool.commonPool()` 靜態方法建立 `ForkJoinPool` 實體。
[2]:http://www.java67.com/2018/10/java-8-stream-and-functional-programming-interview-questions-answers.html
並行流會利用所有可用 CPU 核心並行處理任務。如果任務數超過了 CPU 核心數量,那麼其餘任務將等待當前正在執行的任務完成。
並行流很酷,可以一直用嗎?
“當然不是!”
雖然只加上 `.parallel` 就可以把[一個流][3]轉為並行流,但這並不表示所有情況都適用。
[3]:https://java2blog.com/java-8-stream-filter-examples/
在使用並行流時,需要考慮許多因素,否則將會遭遇並行流帶來的負面影響。
並行流的開銷遠高於順序流,而且協調執行緒需要花費大量時間。
[4]:http://www.java67.com/2014/04/java-8-stream-examples-and-tutorial.html
當且僅當以出現下麵的情況時,需要考慮使用並行流:
- 需要處理大量資料;
- 正如你所知道的那樣,Java 使用了 [ForkJoinPool][5] 實現並行。`ForkJoinPool` 會把輸入流拆分後提交執行,因此需要確保輸入流是可拆分的。
[5]:http://javarevisited.blogspot.sg/2016/12/difference-between-executor-framework-and-ForkJoinPool-in-Java.html
例如:
[ArrayList][6] 非常易於拆分,因為可以透過索引找到一個中間元素進行拆分。但 `LinkedList` 很難拆分,而且大多數情況下執行效果並不好。
- 實際使用中會遇到麻煩的效能問題;
- 需要確保執行緒之間的所有共享資源都能正確同步,否則可能會產生意外的結果。
[6]:https://javarevisited.blogspot.com/2011/05/example-of-arraylist-in-java-tutorial.html
衡量並行性最簡單的公式是 Brian Goetz 在他的演講中提到的 “NQ” 模型。
“NQ 模型:“
```
N x Q >10000
```
這裡,
- N = 資料元素數量
- Q = 每個元素執行的工作量
這意味著,如果資料元素數量很大且每個元素執行的工作量較小(如加法操作),並行性可能會讓程式更快地執行,反之亦然。如果資料元素數量較少且每個元素執行的工作量較大(比如做一些複雜的計算工作),並行處理也可能會更快地得到結果。
讓我們看看另一個例子。
下麵的示例將展示在執行長時間計算時,採用並行流或順序流 CPU 的不同表現。我們將做一些耗時的計算讓 CPU 忙碌起來。
```java
package org.arpit.java2blog.java8;
import java.util.ArrayList;
import java.util.List;
public class PerformanceComparisonMain {
public static void main(String[] args) {
long currentTime=System.currentTimeMillis();
List data=new ArrayList();
for (int i = 0; i < 100000; i++) {
data.add(i);
}
long sum=data.stream()
.map(i ->(int)Math.sqrt(i))
.map(number->performComputation(number))
.reduce(0,Integer::sum);
System.out.println(sum);
long endTime=System.currentTimeMillis();
System.out.println("Time taken to complete:"+(endTime-currentTime)/(1000*60)+" minutes");
}
public static int performComputation(int number)
{
int sum=0;
for (int i = 1; i < 1000000; i++) {
int div=(number/i);
sum+=div;
}
return sum;
}
}
```
執行上面的程式結果如下:
>>>
117612733
Time taken to complete:6 minutes
>>>
然而這裡不關心輸出,而是關心在執行操作時 CPU 的表現。
如圖所示,在順序流情況下,CPU 沒有得到充分利用。
讓我們修改第16行使並行流,再次執行程式。
```java
long sum=data.stream()
.parallel()
.map(i ->(int)Math.sqrt(i))
.map(number->performComputation(number))
.reduce(0,Integer::sum);
```
使用並行流執行,輸出的結果中耗時明顯減少。
>>>
117612733
Time taken to complete:3 minutes
>>>
檢視使用並行流執行程式時 CPU 的歷史記錄。
如圖所示,並行流使用了所有4個 CPU 核心進行計算。
以上是有關 Java 並行流的所有介紹。
朋友會在“發現-看一看”看到你“在看”的內容