本文介紹瞭如何使用 Java Stream `Collectors` 工廠方法與自定義 `Spliterator` 對 Stream 進行 Shuffle(混排),支援 Eager 與 Lazy 兩種樣式。
1. Eager Shuffle Collector
Heinz [在這篇文章][1]中給出了一種解決方案:將整個 Stream 轉換為 list,對 list 執行 `Collections#shuffle`,再轉為 Stream。像下麵這樣封裝成一個複合操作:
[1]:https://www.javaspecialists.eu/archive/Issue258.html
```java
public static Collector> toEagerShuffledStream() {
return Collectors.collectingAndThen(
toList(),
list -> {
Collections.shuffle(list);
return list.stream();
});
}
```
這種方法適用於對 Steam 中所有元素進行混排。由於會提前對集合中所有元素進行 Shuffle,如果只處理其中一部分則效果不佳,極端情況比如 Stream 只包含1個元素。
讓我們來看看一個簡單基準測試的執行結果:
```java
@State(Scope.Benchmark)
public class RandomSpliteratorBenchmark {
private List source;
@Param({"1", "10", "100", "1000", "10000", "10000"})
public int limit;
@Param({"100000"})
public int size;
@Setup(Level.Iteration)
public void setUp() {
source = IntStream.range(0, size)
.boxed()
.map(Object::toString)
.collect(Collectors.toList());
}
@Benchmark
public List eager() {
return source.stream()
.collect(toEagerShuffledStream())
.limit(limit)
.collect(Collectors.toList());
}
```
```shell
(limit) Mode Cnt Score Error Units
eager 1 thrpt 5 467.796 ± 9.074 ops/s
eager 10 thrpt 5 467.694 ± 17.166 ops/s
eager 100 thrpt 5 459.765 ± 8.048 ops/s
eager 1000 thrpt 5 467.934 ± 43.095 ops/s
eager 10000 thrpt 5 449.471 ± 5.549 ops/s
eager 100000 thrpt 5 331.111 ± 5.626 ops/s
```
從上面的資料可以看出,儘管執行結果 Stream 中元素不斷增加,執行效果還是相當不錯。因此,對整個集合提前混排太浪費了,尤其是元素較少的時候得分很差。
讓我們看看來有什麼好辦法。
2. Lazy Shuffle Collector
為了節省 CPU 資源,與其對集合中所有元素預處理,不如根據需要只處理其中一部分。
為了達到這個效果,需要自定義一個 Spliterator 對所有對元素隨機遍歷,然後透過 `StreamSupport.stream` 構造建立一個 Stream 物件:
```java
public class RandomSpliterator implements Spliterator {
// ...
public static Collector> toLazyShuffledStream() {
return Collectors.collectingAndThen(
toList(),
list -> StreamSupport.stream(
new ShuffledSpliterator<>(list), false));
}
}
```
3. 實現細節
即使只取出一個隨機元素,也不能避免計算整個 Steam 中的元素(這意味著不支援無限序列)。因此,可以用 `List` 初始化 `RandomSpliterator`。“註意,這裡有一個陷阱”。
如果給定 `List` 不支援在常量時間內完成隨機訪問,這種方案要比 Eager 方案慢得多。為了避免這種情況,可以在實體化 `Spliterator` 的時候進行簡單檢查:
```java
private RandomSpliterator(
List source, Supplier extends Random> random) {
if (source.isEmpty()) { ... } // throw
this.source = source instanceof RandomAccess
? source
: new ArrayList<>(source);
this.random = random.get();
}
```
相比隨機訪問時間複雜度不是 O(1) 的實現,建立 `ArrayList` 的成本可以忽略不計。
現在重寫最重要的 `tryAdvance()` 方法。實現很簡單,每次迭代都從 `source` 集合中隨機挑選並刪除一個元素。
不必擔心 `source` 發生改變。這裡不釋出 `RandomSpliterator`,只傳回基於它的一個 `Collector`:
```java
@Override
public boolean tryAdvance(Consumer super T> action) {
int remaining = source.size();
if (remaining > 0 ) {
action.accept(source.remove(random.nextInt(remaining)));
return true;
} else {
return false;
}
}
```
除此之外,還需要實現其它3個方法:
```java
@Override
public Spliterator trySplit() {
return null; // 表示 split 可不行
}
@Override
public long estimateSize() {
return source.size();
}
@Override
public int characteristics() {
return SIZED;
}
```
現在檢查一下是否有效果:
```java
IntStream.range(0, 10).boxed()
.collect(toLazyShuffledStream())
.forEach(System.out::println);
```
結果如下:
```shell
3
4
8
1
7
6
5
0
2
9
```
4. 效能考慮
在這個實現中,我們把大小為 N 的陣列換成 M 查詢或刪除:
- N:集合大小
- M:挑選元素的數量
從 `ArrayList` 中查詢或刪除單個元素通常比交換開銷大,因此方案的可擴充套件性不夠好。但是對於 M 值較小的時候效能會好很多。
現在對比 Eager 方案(都包含100000個物件):
```shell
(limit) Mode Cnt Score Error Units
eager 1 thrpt 5 467.796 ± 9.074 ops/s
eager 10 thrpt 5 467.694 ± 17.166 ops/s
eager 100 thrpt 5 459.765 ± 8.048 ops/s
eager 1000 thrpt 5 467.934 ± 43.095 ops/s
eager 10000 thrpt 5 449.471 ± 5.549 ops/s
eager 100000 thrpt 5 331.111 ± 5.626 ops/s
lazy 1 thrpt 5 1530.763 ± 72.096 ops/s
lazy 10 thrpt 5 1462.305 ± 23.860 ops/s
lazy 100 thrpt 5 823.212 ± 119.771 ops/s
lazy 1000 thrpt 5 166.786 ± 16.306 ops/s
lazy 10000 thrpt 5 19.475 ± 4.052 ops/s
lazy 100000 thrpt 5 4.097 ± 0.416 ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart.png)
可以明顯看到,如果資料流元素較少,新方案的效能優於前者。但隨著“處理數量/集合大小”增加,吞吐量急劇下降。
這是因為從 `ArrayList` 中移除元素會帶來額外開銷,每次移除都會呼叫 `System#arraycopy` 對內部陣列執行移位操作,開銷較大。
對於較大的集合(1000000個元素)可以看到類似的樣式:
```shell
(limit) (size) Mode Cnt Score Err Units
eager 1 10000000 thrpt 5 0.915 ops/s
eager 10 10000000 thrpt 5 0.783 ops/s
eager 100 10000000 thrpt 5 0.965 ops/s
eager 1000 10000000 thrpt 5 0.936 ops/s
eager 10000 10000000 thrpt 5 0.860 ops/s
lazy 1 10000000 thrpt 5 4.338 ops/s
lazy 10 10000000 thrpt 5 3.149 ops/s
lazy 100 10000000 thrpt 5 2.060 ops/s
lazy 1000 10000000 thrpt 5 0.370 ops/s
lazy 10000 10000000 thrpt 5 0.05 ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-2.png)
在更小集合(128個元素)上的表現:
```shell
(limit) (size) Mode Cnt Score Error Units
eager 2 128 thrpt 5 246439.459 ops/s
eager 4 128 thrpt 5 333866.936 ops/s
eager 8 128 thrpt 5 340296.188 ops/s
eager 16 128 thrpt 5 345533.673 ops/s
eager 32 128 thrpt 5 231725.156 ops/s
eager 64 128 thrpt 5 314324.265 ops/s
eager 128 128 thrpt 5 270451.992 ops/s
lazy 2 128 thrpt 5 765989.718 ops/s
lazy 4 128 thrpt 5 659421.041 ops/s
lazy 8 128 thrpt 5 652685.515 ops/s
lazy 16 128 thrpt 5 470346.570 ops/s
lazy 32 128 thrpt 5 324174.691 ops/s
lazy 64 128 thrpt 5 186472.090 ops/s
lazy 128 128 thrpt 5 108105.699 ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-3.png)
能不能進一步最佳化?
5. 進一步提高效能
不幸的是,現有的解決方案擴充套件性不盡如人意,讓我們試著改進。但在此之前,先對現有操作進行測評:
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.36.58.png)
不出意外,`Arraylist#remove` 是開銷最大的操作之一。換句話說,從 `ArrayList` 中刪除元素耗費了大量 CPU 資源。
為什麼呢?從 `ArrayList` 中刪除元素會對底層實現的陣列執行移除操作。問題是,Java 陣列不會自動調整大小,每次移除都會建立一個更小的新陣列:
```java
private void fastRemove(Object[] es, int i) {
modCount++;
final int newSize;
if ((newSize = size - 1) > i)
System.arraycopy(es, i + 1, es, i, newSize - i);
es[size = newSize] = null;
}
```
接下來該怎麼辦?避免從 `ArrayList` 中移除元素。
為了達到這個效果,可以用一個陣列儲存剩餘的元素並記錄它的大小:
```java
public class ImprovedRandomSpliterator implements Spliterator {
private final Random random;
private final T[] source;
private int size;
private ImprovedRandomSpliterator(
List source, Supplier extends Random> random) {
if (source.isEmpty()) {
throw new IllegalArgumentException(...);
}
this.source = (T[]) source.toArray();
this.random = random.get();
this.size = this.source.length;
}
}
```
幸運的是,由於 `Spliterator` 的實體不會在執行緒之間共享,因此不會遇到併發問題。
現在嘗試移除元素時,實際上不需要建立縮小後的新陣列。相反,只要減小 `size` 並忽略陣列的其餘部分即可。
在此之前,把最後一個元素與傳回的元素交換:
```java
@Override
public boolean tryAdvance(Consumer super T> action) {
if (size > 0) {
int nextIdx = random.nextInt(size);
int lastIdx = size - 1;
action.accept(source[nextIdx]);
source[nextIdx] = source[lastIdx];
source[lastIdx] = null; // let object be GCed
size--;
return true;
} else {
return false;
}
}
```
對改進後的方案進行評測,可以看到開銷最大的呼叫已經消失了:
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.38.47.png)
準備在此執行基準測試進行比較:
```shell
(limit) (size) Mode Cnt Score Error Units
eager 1 100000 thrpt 3 456.811 ± 20.585 ops/s
eager 10 100000 thrpt 3 469.635 ± 23.281 ops/s
eager 100 100000 thrpt 3 466.486 ± 68.820 ops/s
eager 1000 100000 thrpt 3 454.459 ± 13.103 ops/s
eager 10000 100000 thrpt 3 443.640 ± 96.929 ops/s
eager 100000 100000 thrpt 3 335.134 ± 21.944 ops/s
lazy 1 100000 thrpt 3 1587.536 ± 389.128 ops/s
lazy 10 100000 thrpt 3 1452.855 ± 406.879 ops/s
lazy 100 100000 thrpt 3 814.978 ± 242.077 ops/s
lazy 1000 100000 thrpt 3 167.825 ± 129.559 ops/s
lazy 10000 100000 thrpt 3 19.782 ± 8.596 ops/s
lazy 100000 100000 thrpt 3 3.970 ± 0.408 ops/s
lazy_improved 1 100000 thrpt 3 1509.264 ± 170.423 ops/s
lazy_improved 10 100000 thrpt 3 1512.150 ± 143.927 ops/s
lazy_improved 100 100000 thrpt 3 1463.093 ± 593.370 ops/s
lazy_improved 1000 100000 thrpt 3 1451.007 ± 58.948 ops/s
lazy_improved 10000 100000 thrpt 3 1148.581 ± 232.218 ops/s
lazy_improved 100000 100000 thrpt 3 383.022 ± 97.082 ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-5.png)
從上面的結果可以看出,改進後的方案效能受元素數量變化影響顯著減小。
實際上,即使遇到最差情況,改進方案的效能也比基於 `Collections#shuffle` 的方案略好一些。
6. 完整示例
完整示例可以在 [GitHub][2] 上找到。
[2]:https://github.com/pivovarit/articles/tree/master/java-random-stream
```java
package com.pivovarit.stream;
import java.util.List;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class ImprovedRandomSpliterator implements Spliterator {
private final Random random;
private final T[] source;
private int size;
ImprovedRandomSpliterator(List source, Supplier extends Random> random) {
if (source.isEmpty()) {
throw new IllegalArgumentException("RandomSpliterator can't be initialized with an empty collection");
}
this.source = (T[]) source.toArray();
this.random = random.get();
this.size = this.source.length;
}
@Override
public boolean tryAdvance(Consumer super T> action) {
if (size > 0) {
int nextIdx = random.nextInt(size);
int lastIdx = size - 1;
action.accept(source[nextIdx]);
source[nextIdx] = source[lastIdx];
source[lastIdx] = null; // let object be GCed
size--;
return true;
} else {
return false;
}
}
@Override
public Spliterator trySplit() {
return null;
}
@Override
public long estimateSize() {
return source.length;
}
@Override
public int characteristics() {
return SIZED;
}
}
```
```java
package com.pivovarit.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toCollection;
public final class RandomCollectors {
private RandomCollectors() {
}
public static Collector> toImprovedLazyShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> !list.isEmpty()
? StreamSupport.stream(new ImprovedRandomSpliterator<>(list, Random::new), false)
: Stream.empty());
}
public static Collector> toLazyShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> !list.isEmpty()
? StreamSupport.stream(new RandomSpliterator<>(list, Random::new), false)
: Stream.empty());
}
public static Collector> toEagerShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> {
Collections.shuffle(list);
return list.stream();
});
}
}
```