點選上方“芋道原始碼”,選擇“置頂公眾號”
技術文章第一時間送達!
原始碼精品專欄
摘要: 原創出處 http://cmsblogs.com/?p=2269 「小明哥」歡迎轉載,保留摘要,謝謝!
-
應用示例
-
實現分析
此篇部落格所有原始碼均來自JDK 1.8
前面三篇部落格分別介紹了CyclicBarrier、CountDownLatch、Semaphore,現在介紹併發工具類中的最後一個Exchange。Exchange是最簡單的也是最複雜的,簡單在於API非常簡單,就一個構造方法和兩個exchange()方法,最複雜在於它的實現是最複雜的(反正我是看暈了的)。
在API是這麼介紹的:可以在對中對元素進行配對和交換的執行緒的同步點。每個執行緒將條目上的某個方法呈現給 exchange 方法,與夥伴執行緒進行匹配,並且在傳回時接收其夥伴的物件。Exchanger 可能被視為 SynchronousQueue 的雙向形式。Exchanger 可能在應用程式(比如遺傳演演算法和管道設計)中很有用。
Exchanger,它允許在併發任務之間交換資料。具體來說,Exchanger類允許在兩個執行緒之間定義同步點。當兩個執行緒都到達同步點時,他們交換資料結構,因此第一個執行緒的資料結構進入到第二個執行緒中,第二個執行緒的資料結構進入到第一個執行緒中。
應用示例
Exchange實現較為複雜,我們先看其怎麼使用,然後再來分析其原始碼。現在我們用Exchange來模擬生產-消費者問題:
public class ExchangerTest {
static class Producer implements Runnable{
//生產者、消費者交換的資料結構
private List buffer;
//步生產者和消費者的交換物件
private Exchanger> exchanger;
Producer(List buffer,Exchanger> exchanger){
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for(int i = 1 ; i 5 ; i++){
System.out.println("生產者第" + i + "次提供");
for(int j = 1 ; j <= 3 ; j++){
System.out.println("生產者裝入" + i + "--" + j);
buffer.add("buffer:" + i + "--" + j);
}
System.out.println("生產者裝滿,等待與消費者交換...");
try {
exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
private List buffer;
private final Exchanger> exchanger;
public Consumer(List buffer, Exchanger> exchanger)
{
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 1; i 5; i++) {
//呼叫exchange()與消費者進行資料交換
try {
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者第" + i + "次提取");
for (int j = 1; j <= 3 ; j++) {
System.out.println("消費者 : " + buffer.get(0));
buffer.remove(0);
}
}
}
}
public static void main(String[] args){
List buffer1 = new ArrayList();
List buffer2 = new ArrayList();
Exchanger> exchanger = new Exchanger>();
Thread producerThread = new Thread(new Producer(buffer1,exchanger));
Thread consumerThread = new Thread(new Consumer(buffer2,exchanger));
producerThread.start();
consumerThread.start();
}
}
執行結果:
[
首先生產者Producer、消費者Consumer首先都建立一個緩衝串列,透過Exchanger來同步交換資料。消費中透過呼叫Exchanger與生產者進行同步來獲取資料,而生產者則透過for迴圈向快取佇列儲存資料並使用exchanger物件消費者同步。到消費者從exchanger哪裡得到資料後,他的緩衝串列中有3個資料,而生產者得到的則是一個空的串列。上面的例子充分展示了消費者-生產者是如何利用Exchanger來完成資料交換的。
在Exchanger中,如果一個執行緒已經到達了exchanger節點時,對於它的夥伴節點的情況有三種:
-
如果它的夥伴節點在該執行緒到達之前已經呼叫了exchanger方法,則它會喚醒它的夥伴然後進行資料交換,得到各自資料傳回。
-
如果它的夥伴節點還沒有到達交換點,則該執行緒將會被掛起,等待它的夥伴節點到達被喚醒,完成資料交換。
-
如果當前執行緒被中斷了則丟擲異常,或者等待超時了,則丟擲超時異常。
實現分析
Exchanger演演算法的核心是透過一個可交換資料的slot,以及一個可以帶有資料item的參與者。原始碼中的描述如下:
for (;;) {
if (slot is empty) { // offer
place item in a Node;
if (can CAS slot from empty to node) {
wait for release;
return matching item in node;
}
}
else if (can CAS slot from node to empty) { // release
get the item in node;
set matching item in node;
release waiting thread;
}
// else retry on CAS failure
}
Exchanger中定義瞭如下幾個重要的成員變數:
private final Participant participant;
private volatile Node[] arena;
private volatile Node slot;
participant的作用是為每個執行緒保留唯一的一個Node節點。
slot為單個槽,arena為陣列槽。他們都是Node型別。在這裡可能會感覺到疑惑,slot作為Exchanger交換資料的場景,應該只需要一個就可以了啊?為何還多了一個Participant 和陣列型別的arena呢?一個slot交換場所原則上來說應該是可以的,但實際情況卻不是如此,多個參與者使用同一個交換場所時,會存在嚴重伸縮性問題。既然單個交換場所存在問題,那麼我們就安排多個,也就是陣列arena。透過陣列arena來安排不同的執行緒使用不同的slot來降低競爭問題,並且可以保證最終一定會成對交換資料。但是Exchanger不是一來就會生成arena陣列來降低競爭,只有當產生競爭是才會生成arena陣列。那麼怎麼將Node與當前執行緒系結呢?Participant ,Participant 的作用就是為每個執行緒保留唯一的一個Node節點,它繼承ThreadLocal,同時在Node節點中記錄在arena中的下標index。
Node定義如下:
@sun.misc.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
}
-
index:arena的下標;
-
bound:上一次記錄的Exchanger.bound;
-
collides:在當前bound下CAS失敗的次數;
-
hash:偽隨機數,用於自旋;
-
item:這個執行緒的當前項,也就是需要交換的資料;
-
match:做releasing操作的執行緒傳遞的項;
-
parked:掛起時設定執行緒值,其他情況下為null;
在Node定義中有兩個變數值得思考:bound以及collides。前面提到了陣列area是為了避免競爭而產生的,如果系統不存在競爭問題,那麼完全沒有必要開闢一個高效的arena來徒增系統的複雜性。首先透過單個slot的exchanger來交換資料,當探測到競爭時將安排不同的位置的slot來儲存執行緒Node,並且可以確保沒有slot會在同一個快取行上。如何來判斷會有競爭呢?CAS替換slot失敗,如果失敗,則透過記錄衝突次數來擴充套件arena的尺寸,我們在記錄衝突的過程中會跟蹤“bound”的值,以及會重新計算衝突次數在bound的值被改變時。這裡闡述可能有點兒模糊,不著急,我們先有這個概念,後面在arenaExchange中再次做詳細闡述。
我們直接看exchange()方法
exchange(V x)
exchange(V x):等待另一個執行緒到達此交換點(除非當前執行緒被中斷),然後將給定的物件傳送給該執行緒,並接收該執行緒的物件。方法定義如下:
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
這個方法比較好理解:arena為陣列槽,如果為null,則執行slotExchange()方法,否則判斷執行緒是否中斷,如果中斷值丟擲InterruptedException異常,沒有中斷則執行arenaExchange()方法。整套邏輯就是:如果slotExchange(Object item, boolean timed, long ns)方法執行失敗了就執行arenaExchange(Object item, boolean timed, long ns)方法,最後傳回結果V。
NULL_ITEM 為一個空節點,其實就是一個Object物件而已,slotExchange()為單個slot交換。
slotExchange(Object item, boolean timed, long ns)
private final Object slotExchange(Object item, boolean timed, long ns) {
// 獲取當前執行緒的節點 p
Node p = participant.get();
// 當前執行緒
Thread t = Thread.currentThread();
// 執行緒中斷,直接傳回
if (t.isInterrupted())
return null;
// 自旋
for (Node q;;) {
//slot != null
if ((q = slot) != null) {
//嘗試CAS替換
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item; // 當前執行緒的項,也就是交換的資料
q.match = item; // 做releasing操作的執行緒傳遞的項
Thread w = q.parked; // 掛起時設定執行緒值
// 掛起執行緒不為null,執行緒掛起
if (w != null)
U.unpark(w);
return v;
}
//如果失敗了,則建立arena
//bound 則是上次Exchanger.bound
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) < }
//如果arena != null,直接傳回,進入arenaExchange邏輯處理
else if (arena != null)
return null;
else {
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
}
/*
* 等待 release
* 進入spin+block樣式
*/
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {
if (spins > 0) {
h ^= h <1; h ^= h >>> 3; h ^= h <10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
return v;
}
程式首先透過participant獲取當前執行緒節點Node。檢測是否中斷,如果中斷return null,等待後續丟擲InterruptedException異常。
如果slot不為null,則進行slot消除,成功直接傳回資料V,否則失敗,則建立arena消除陣列。
如果slot為null,但arena不為null,則傳回null,進入arenaExchange邏輯。
如果slot為null,且arena也為null,則嘗試佔領該slot,失敗重試,成功則跳出迴圈進入spin+block(自旋+阻塞)樣式。
在自旋+阻塞樣式中,首先取得結束時間和自旋次數。如果match(做releasing操作的執行緒傳遞的項)為null,其首先嘗試spins+隨機次自旋(改自旋使用當前節點中的hash,並改變之)和退讓。當自旋數為0後,假如slot發生了改變(slot != p)則重置自旋數並重試。否則假如:當前未中斷&arena;為null&(當前不是限時版本或者限時版本+當前時間未結束):阻塞或者限時阻塞。假如:當前中斷或者arena不為null或者當前為限時版本+時間已經結束:不限時版本:置v為null;限時版本:如果時間結束以及未中斷則TIMED_OUT;否則給出null(原因是探測到arena非空或者當前執行緒中斷)。
match不為空時跳出迴圈。
整個slotExchange清晰明瞭。
arenaExchange(Object item, boolean timed, long ns)
private final Object arenaExchange(Object item, boolean timed, long ns) {
Node[] a = arena;
Node p = participant.get();
for (int i = p.index;;) { // access slot at i
int b, m, c; long j; // j is raw array offset
Node q = (Node)U.getObjectVolatile(a, j = (i < if (q != null && U.compareAndSwapObject(a, j, q, null)) {
Object v = q.item; // release
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
else if (i <= (m = (b = bound) & MMASK) && q == null) {
p.item = item; // offer
if (U.compareAndSwapObject(a, j, null, p)) {
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
Thread t = Thread.currentThread(); // wait
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
if (v != null) {
U.putOrderedObject(p, MATCH, null);
p.item = null; // clear for next use
p.hash = h;
return v;
}
else if (spins > 0) {
h ^= h <1; h ^= h >>> 3; h ^= h <10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
}
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
if (m != 0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break; // expired; restart
}
}
}
else
p.item = null; // clear offer
}
else {
if (p.bound != b) { // stale; reset
p.bound = b;
p.collides = 0;
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0) ? m : i - 1; // cyclically traverse
}
else
i = m + 1; // grow
p.index = i;
}
}
}
首先透過participant取得當前節點Node,然後根據當前節點Node的index去取arena中相對應的節點node。前面提到過arena可以確保不同的slot在arena中是不會相衝突的,那麼是怎麼保證的呢?我們先看arena的建立:
arena = new Node[(FULL + 2) <
這個arena到底有多大呢?我們先看FULL 和ASHIFT的定義:
static final int FULL = (NCPU >= (MMASK <1)) ? MMASK : NCPU >>> 1;
private static final int ASHIFT = 7;
private static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final int MMASK = 0xff; // 255
假如我的機器NCPU = 8 ,則得到的是768大小的arena陣列。然後透過以下程式碼取得在arena中的節點:
Node q = (Node)U.getObjectVolatile(a, j = (i <
他仍然是透過右移ASHIFT位來取得Node的,ABASE定義如下:
Class> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak) + (1 <
U.arrayBaseOffset獲取物件頭長度,陣列元素的大小可以透過unsafe.arrayIndexScale(T[].class) 方法獲取到。這也就是說要訪問型別為T的第N個元素的話,你的偏移量offset應該是arrayOffset+N*arrayScale。也就是說BASE = arrayOffset+ 128 。其次我們再看Node節點的定義
@sun.misc.Contended static final class Node{
....
}
在Java 8 中我們是可以利用sun.misc.Contended來規避偽共享的。所以說透過 << ASHIFT方式加上sun.misc.Contended,所以使得任意兩個可用Node不會再同一個快取行中。
關於偽共享請參考如下博文:
偽共享(False Sharing)
[ Java8中用sun.misc.Contended避免偽共享(false sharing)](
http://blog.csdn.net/aigoogle/article/details/41518369)
我們再次回到arenaExchange()。取得arena中的node節點後,如果定位的節點q 不為空,且CAS操作成功,則交換資料,傳回交換的資料,喚醒等待的執行緒。
如果q等於null且下標在bound & MMASK範圍之內,則嘗試佔領該位置,如果成功,則採用自旋 + 阻塞的方式進行等待交換資料。
如果下標不在bound & MMASK範圍之內獲取由於q不為null但是競爭失敗的時候:消除p。加入bound 不等於當前節點的bond(b != p.bound),則更新p.bound = b,collides = 0 ,i = m或者m - 1。如果衝突的次數不到m 獲取m 已經為最大值或者修改當前bound的值失敗,則透過增加一次collides以及迴圈遞減下標i的值;否則更新當前bound的值成功:我們令i為m+1即為此時最大的下標。最後更新當前index的值。
Exchanger使用、原理都比較好理解,但是這個原始碼看起來真心有點兒複雜,是真心難看懂,但是這種交換的思路Doug Lea在後續博文中還會提到,例如SynchronousQueue、LinkedTransferQueue。
最後用一個在網上看到的段子結束此篇部落格(http://brokendreams.iteye.com/blog/2253956),博主對其做了一點點修改,以便更加符合在1.8環境下的Exchanger:
其實就是"我"和"你"(可能有多個"我",多個"你")在一個叫Slot的地方做交易(一手交錢,一手交貨),過程分以下步驟:
-
我先到一個叫做Slot的交易場所交易,發現你已經到了,那我就嘗試喊你交易,如果你回應了我,決定和我交易那麼進入第2步;如果別人搶先一步把你喊走了,那我就進入第5步。
-
我拿出錢交給你,你可能會接收我的錢,然後把貨給我,交易結束;也可能嫌我掏錢太慢(超時)或者接個電話(中斷),TM的不賣了,走了,那我只能再找別人買貨了(從頭開始)。
-
我到交易地點的時候,你不在,那我先嘗試把這個交易點給佔了(一屁股做凳子上…),如果我成功搶佔了單間(交易點),那就坐這兒等著你拿貨來交易,進入第4步;如果被別人搶座了,那我只能在找別的地方兒了,進入第5步。
-
你拿著貨來了,喊我交易,然後完成交易;也可能我等了好長時間你都沒來,我不等了,繼續找別人交易去,走的時候我看了一眼,一共沒多少人,弄了這麼多單間(交易地點Slot),太TM浪費了,我喊來交易地點管理員:一共也沒幾個人,搞這麼多單間兒乾毛,給哥撤一個!。然後再找別人買貨(從頭開始);或者我老大給我打了個電話,不讓我買貨了(中斷)。
-
我跑去喊管理員,尼瑪,就一個坑交易個毛啊,然後管理在一個更加開闊的地方開闢了好多個單間,然後我就挨個來看每個單間是否有人。如果有人我就問他是否可以交易,如果回應了我,那我就進入第2步。如果我沒有人,那我就佔著這個單間等其他人來交易,進入第4步。
-
如果我嘗試了幾次都沒有成功,我就會認為,是不是我TM選的這個單間風水不好?不行,得換個地兒繼續(從頭開始);如果我嘗試了多次發現還沒有成功,怒了,把管理員喊來:給哥再開一個單間(Slot),加一個凳子,這麼多人就這麼幾個破凳子夠誰用!
666. 彩蛋
如果你對 Dubbo / Netty 等等原始碼與原理感興趣,歡迎加入我的知識星球一起交流。長按下方二維碼噢:
目前在知識星球更新了《Dubbo 原始碼解析》目錄如下:
01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽
05. 拓展機制 SPI
06. 執行緒池
07. 服務暴露 Export
08. 服務取用 Refer
09. 註冊中心 Registry
10. 動態編譯 Compile
11. 動態代理 Proxy
12. 服務呼叫 Invoke
13. 呼叫特性
14. 過濾器 Filter
15. NIO 伺服器
16. P2P 伺服器
17. HTTP 伺服器
18. 序列化 Serialization
19. 叢集容錯 Cluster
20. 優雅停機
21. 日誌適配
22. 狀態檢查
23. 監控中心 Monitor
24. 管理中心 Admin
25. 運維命令 QOS
26. 鏈路追蹤 Tracing
... 一共 69+ 篇
目前在知識星球更新了《Netty 原始碼解析》目錄如下:
01. 除錯環境搭建
02. NIO 基礎
03. Netty 簡介
04. 啟動 Bootstrap
05. 事件輪詢 EventLoop
06. 通道管道 ChannelPipeline
07. 通道 Channel
08. 位元組緩衝區 ByteBuf
09. 通道處理器 ChannelHandler
10. 編解碼 Codec
11. 工具類 Util
... 一共 61+ 篇
目前在知識星球更新了《資料庫物體設計》目錄如下:
01. 商品模組
02. 交易模組
03. 營銷模組
04. 公用模組
... 一共 17+ 篇
原始碼不易↓↓↓↓↓
點贊支援老艿艿↓↓