原文出處http://cmsblogs.com/ 『chenssy』
【註】:SynchronousQueue實現演演算法看的暈乎乎的,寫了好久才寫完,如果當中有什麼錯誤之處,忘各位指正
作為BlockingQueue中的一員,SynchronousQueue與其他BlockingQueue有著不同特性:
-
SynchronousQueue沒有容量。與其他BlockingQueue不同,SynchronousQueue是一個不儲存元素的BlockingQueue。每一個put操作必須要等待一個take操作,否則不能繼續新增元素,反之亦然。
-
因為沒有容量,所以對應 peek, contains, clear, isEmpty … 等方法其實是無效的。例如clear是不執行任何操作的,contains始終傳回false,peek始終傳回null。
-
SynchronousQueue分為公平和非公平,預設情況下採用非公平性訪問策略,當然也可以透過建構式來設定為公平性訪問策略(為true即可)。
-
若使用 TransferQueue, 則佇列中永遠會存在一個 dummy node(這點後面詳細闡述)。
SynchronousQueue非常適合做交換工作,生產者的執行緒和消費者的執行緒同步以傳遞某些資訊、事件或者任務。
SynchronousQueue
與其他BlockingQueue一樣,SynchronousQueue同樣繼承AbstractQueue和實現BlockingQueue介面:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
SynchronousQueue提供了兩個建構式:
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
// 透過 fair 值來決定公平性和非公平性
// 公平性使用TransferQueue,非公平性採用TransferStack
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
TransferQueue、TransferStack繼承Transferer,Transferer為SynchronousQueue的內部類,它提供了一個方法transfer(),該方法定義了轉移資料的規範,如下:
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
transfer()方法主要用來完成轉移資料的,如果e != null,相當於將一個資料交給消費者,如果e == null,則相當於從一個生產者接收一個消費者交出的資料。
SynchronousQueue採用佇列TransferQueue來實現公平性策略,採用堆疊TransferStack來實現非公平性策略,他們兩種都是透過連結串列實現的,其節點分別為QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演著非常重要的作用,SynchronousQueue的put、take操作都是委託這兩個類來實現的。
TransferQueue
TransferQueue是實現公平性策略的核心類,其節點為QNode,其定義如下:
static final class TransferQueue<E> extends Transferer<E> {
/** 頭節點 */
transient volatile QNode head;
/** 尾節點 */
transient volatile QNode tail;
// 指向一個取消的結點
//當一個節點中最後一個插入時,它被取消了但是可能還沒有離開佇列
transient volatile QNode cleanMe;
/**
* 省略很多程式碼O(∩_∩)O
*/
}
在TransferQueue中除了頭、尾節點外還存在一個cleanMe節點。該節點主要用於標記,當刪除的節點是尾節點時則需要使用該節點。
同時,對於TransferQueue需要註意的是,其佇列永遠都存在一個dummy node,在構造時建立:
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
在TransferQueue中定義了QNode類來表示佇列中的節點,QNode節點定義如下:
static final class QNode {
// next 域
volatile QNode next;
// item資料項
volatile Object item;
// 等待執行緒,用於park/unpark
volatile Thread waiter; // to control park/unpark
//樣式,表示當前是資料還是請求,只有當匹配的樣式相匹配時才會交換
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
/**
* CAS next域,在TransferQueue中用於向next推進
*/
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* CAS itme資料項
*/
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* 取消本結點,將item域設定為自身
*/
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
/**
* 是否被取消
* 與tryCancel相照應只需要判斷item釋放等於自身即可
*/
boolean isCancelled() {
return item == this;
}
boolean isOffList() {
return next == this;
}
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
上面程式碼沒啥好看的,需要註意的一點就是isData,該屬性在進行資料交換起到關鍵性作用,兩個執行緒進行資料交換的時候,必須要兩者的樣式保持一致。
TransferStack
TransferStack用於實現非公平性,定義如下:
static final class TransferStack<E> extends Transferer<E> {
static final int REQUEST = 0;
static final int DATA = 1;
static final int FULFILLING = 2;
volatile SNode head;
/**
* 省略一堆程式碼 O(∩_∩)O~
*/
}
TransferStack中定義了三個狀態:REQUEST表示消費資料的消費者,DATA表示生產資料的生產者,FULFILLING,表示匹配另一個生產者或消費者。任何執行緒對TransferStack的操作都屬於上述3種狀態中的一種(對應著SNode節點的mode)。同時還包含一個head域,表示頭結點。
內部節點SNode定義如下:
static final class SNode {
// next 域
volatile SNode next;
// 相匹配的節點
volatile SNode match;
// 等待的執行緒
volatile Thread waiter;
// item 域
Object item; // data; or null for REQUESTs
// 模型
int mode;
/**
* item域和mode域不需要使用volatile修飾,因為它們在volatile/atomic操作之前寫,之後讀
*/
SNode(Object item) {
this.item = item;
}
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* 將s結點與本結點進行匹配,匹配成功,則unpark等待執行緒
*/
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
上面簡單介紹了TransferQueue、TransferStack,由於SynchronousQueue的put、take操作都是呼叫Transfer的transfer()方法,只不過是傳遞的引數不同而已,put傳遞的是e引數,所以樣式為資料(公平isData = true,非公平mode= DATA),而take操作傳遞的是null,所以樣式為請求(公平isData = false,非公平mode = REQUEST),如下:
// put操作
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
// take操作
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
公平樣式
公平性呼叫TransferQueue的transfer方法:
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
// 當前節點樣式
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// 頭、尾節點 為null,沒有初始化
if (t == null || h == null)
continue;
// 頭尾節點相等(佇列為null) 或者當前節點和佇列節點樣式一樣
if (h == t || t.isData == isData) {
// tn = t.next
QNode tn = t.next;
// t != tail表示已有其他執行緒操作了,修改了tail,重新再來
if (t != tail)
continue;
// tn != null,表示已經有其他執行緒添加了節點,tn 推進,重新處理
if (tn != null) {
// 當前執行緒幫忙推進尾節點,就是嘗試將tn設定為尾節點
advanceTail(t, tn);
continue;
}
// 呼叫的方法的 wait 型別的, 並且 超時了, 直接傳回 null
// timed 在take操作闡述
if (timed && nanos <= 0)
return null;
// s == null,構建一個新節點Node
if (s == null)
s = new QNode(e, isData);
// 將新建的節點加入到佇列中,如果不成功,繼續處理
if (!t.casNext(null, s))
continue;
// 替換尾節點
advanceTail(t, s);
// 呼叫awaitFulfill, 若節點是 head.next, 則進行自旋
// 若不是的話, 直接 block, 直到有其他執行緒 與之匹配, 或它自己進行執行緒的中斷
Object x = awaitFulfill(s, e, timed, nanos);
// 若傳回的x == s表示,當前執行緒已經超時或者中斷,不然的話s == null或者是匹配的節點
if (x == s) {
// 清理節點S
clean(t, s);
return null;
}
// isOffList:用於判斷節點是否已經從佇列中離開了
if (!s.isOffList()) {
// 嘗試將S節點設定為head,移出t
advanceHead(t, s);
if (x != null)
s.item = s;
// 釋放執行緒 ref
s.waiter = null;
}
// 傳回
return (x != null) ? (E)x : e;
}
// 這裡是從head.next開始,因為TransferQueue總是會存在一個dummy node節點
else {
// 節點
QNode m = h.next;
// 不一致讀,重新開始
// 有其他執行緒更改了執行緒結構
if (t != tail || m == null || h != head)
continue;
/**
* 生產者producer和消費者consumer匹配操作
*/
Object x = m.item;
// isData == (x != null):判斷isData與x的樣式是否相同,相同表示已經匹配了
// x == m :m節點被取消了
// !m.casItem(x, e):如果嘗試將資料e設定到m上失敗
if (isData == (x != null) || x == m || !m.casItem(x, e)) {
// 將m設定為頭結點,h出列,然後重試
advanceHead(h, m);
continue;
}
// 成功匹配了,m設定為頭結點h出列,向前推進
advanceHead(h, m);
// 喚醒m上的等待執行緒
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
整個transfer的演演算法如下:
-
如果佇列為null或者尾節點樣式與當前節點樣式一致,則嘗試將節點加入到等待佇列中(採用自旋的方式),直到被匹配或、超時或者取消。匹配成功的話要麼傳回null(producer傳回的)要麼傳回真正傳遞的值(consumer傳回的),如果傳回的是node節點本身則表示當前執行緒超時或者取消了。
-
如果佇列不為null,且佇列的節點是當前節點匹配的節點,則進行資料的傳遞匹配並傳回匹配節點的資料
-
在整個過程中都會檢測並幫助其他執行緒推進
當佇列為空時,節點入列然後透過呼叫awaitFulfill()方法自旋,該方法主要用於自旋/阻塞節點,直到節點被匹配傳回或者取消、中斷。
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 超時控制
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自旋次數
// 如果節點Node恰好是head節點,則自旋一段時間,這裡主要是為了效率問題,如果裡面阻塞,會存在喚醒、執行緒背景關係切換的問題
// 如果生產者、消費者者裡面到來的話,就避免了這個阻塞的過程
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
// 自旋
for (;;) {
// 執行緒中斷了,剔除當前節點
if (w.isInterrupted())
s.tryCancel(e);
// 如果執行緒進行了阻塞 -> 喚醒或者中斷了,那麼x != e 肯定成立,直接傳回當前節點即可
Object x = s.item;
if (x != e)
return x;
// 超時判斷
if (timed) {
nanos = deadline - System.nanoTime();
// 如果超時了,取消節點,continue,在if(x != e)肯定會成立,直接傳回x
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 自旋- 1
if (spins > 0)
--spins;
// 等待執行緒
else if (s.waiter == null)
s.waiter = w;
// 進行沒有超時的 park
else if (!timed)
LockSupport.park(this);
// 自旋次數過了, 直接 + timeout 方式 park
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
在自旋/阻塞過程中做了一點最佳化,就是判斷當前節點是否為對頭元素,如果是的則先自旋,如果自旋次數過了,則才阻塞,這樣做的主要目的就在如果生產者、消費者立馬來匹配了則不需要阻塞,因為阻塞、喚醒會消耗資源。在整個自旋的過程中會不斷判斷是否超時或者中斷了,如果中斷或者超時了則呼叫tryCancel()取消該節點。
tryCancel
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
取消過程就是將節點的item設定為自身(itemOffset是item的偏移量)。所以在呼叫awaitFulfill()方法時,如果當前執行緒被取消、中斷、超時了那麼傳回的值肯定時S,否則傳回的則是匹配的節點。如果傳回值是節點S,那麼if(x == s)必定成立,如下:
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
如果傳回的x == s成立,則呼叫clean()方法清理節點S:
void clean(QNode pred, QNode s) {
//
s.waiter = null;
while (pred.next == s) {
QNode h = head;
QNode hn = h.next;
// hn節點被取消了,向前推進
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
// 佇列為空,直接return null
QNode t = tail;
if (t == h)
return;
QNode tn = t.next;
// 不一致,說明有其他執行緒改變了tail節點,重新開始
if (t != tail)
continue;
// tn != null 推進tail節點,重新開始
if (tn != null) {
advanceTail(t, tn);
continue;
}
// s 不是尾節點 移出
if (s != t) {
QNode sn = s.next;
// 如果s已經被移除退出迴圈,否則嘗試斷開s
if (sn == s || pred.casNext(s, sn))
return;
}
// s是尾節點,則有可能會有其他執行緒在新增新節點,則cleanMe出場
QNode dp = cleanMe;
// 如果dp不為null,說明是前一個被取消節點,將其移除
if (dp != null) {
QNode d = dp.next;
QNode dn;
if (d == null || // 節點d已經刪除
d == dp || // 原來的節點 cleanMe 已經透過 advanceHead 進行刪除
!d.isCancelled() || // 原來的節點 s已經刪除
(d != t && // d 不是tail節點
(dn = d.next) != null && //
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
// 清除 cleanMe 節點, 這裡的 dp == pred 若成立, 說明清除節點s,成功, 直接return, 不然的話要再次迴圈
casCleanMe(dp, null);
if (dp == pred)
return;
} else if (casCleanMe(null, pred)) // 原來的 cleanMe 是 null, 則將 pred 標記為 cleamMe 為下次 清除 s 節點做標識
return;
}
}
這個clean()方法感覺有點兒難度,我也看得不是很懂。這裡是取用http://www.jianshu.com/p/95cb570c8187
-
刪除的節點不是queue尾節點, 這時 直接 pred.casNext(s, s.next) 方式來進行刪除(和ConcurrentLikedQueue中差不多)
-
刪除的節點是隊尾節點
-
此時 cleanMe == null, 則 前繼節點pred標記為 cleanMe, 為下次刪除做準備
-
此時 cleanMe != null, 先刪除上次需要刪除的節點, 然後將 cleanMe至null, 讓後再將 pred 賦值給 cleanMe
非公平樣式
非公平樣式transfer方法如下:
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
// 棧為空或者當前節點樣式與頭節點樣式一樣,將節點壓入棧內,等待匹配
if (h == null || h.mode == mode) {
// 超時
if (timed && nanos <= 0) {
// 節點被取消了,向前推進
if (h != null && h.isCancelled())
// 重新設定頭結點(彈出之前的頭結點)
casHead(h, h.next);
else
return null;
}
// 不超時
// 生成一個SNode節點,並嘗試替換掉頭節點head (head -> s)
else if (casHead(h, s = snode(s, e, h, mode))) {
// 自旋,等待執行緒匹配
SNode m = awaitFulfill(s, timed, nanos);
// 傳回的m == s 表示該節點被取消了或者超時、中斷了
if (m == s) {
// 清理節點S,return null
clean(s);
return null;
}
// 因為透過前面一步將S替換成了head,如果h.next == s ,則表示有其他節點插入到S前面了,變成了head
// 且該節點就是與節點S匹配的節點
if ((h = head) != null && h.next == s)
// 將s.next節點設定為head,相當於取消節點h、s
casHead(h, s.next);
// 如果是請求則傳回匹配的域,否則傳回節點S的域
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
// 如果棧不為null,且兩者樣式不匹配(h != null && h.mode != mode)
// 說明他們是一隊對等匹配的節點,嘗試用當前節點s來滿足h節點
else if (!isFulfilling(h.mode)) {
// head 節點已經取消了,向前推進
if (h.isCancelled())
casHead(h, h.next);
// 嘗試將當前節點打上"正在匹配"的標記,並設定為head
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 迴圈loop
for (;;) {
// s為當前節點,m是s的next節點,
// m節點是s節點的匹配節點
SNode m = s.next;
// m == null,其他節點把m節點匹配走了
if (m == null) {
// 將s彈出
casHead(s, null);
// 將s置空,下輪迴圈的時候還會新建
s = null;
// 退出該迴圈,繼續主迴圈
break;
}
// 獲取m的next節點
SNode mn = m.next;
// 嘗試匹配
if (m.tryMatch(s)) {
// 匹配成功,將s 、 m彈出
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// 如果沒有匹配成功,說明有其他執行緒已經匹配了,把m移出
s.casNext(m, mn);
}
}
}
// 到這最後一步說明節點正在匹配階段
else {
// head 的next的節點,是正在匹配的節點,m 和 h配對
SNode m = h.next;
// m == null 其他執行緒把m節點搶走了,彈出h節點
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}
整個處理過程分為三種情況,具體如下:
-
如果當前棧為空獲取節點樣式與棧頂樣式一樣,則嘗試將節點加入棧內,同時透過自旋方式等待節點匹配,最後傳回匹配的節點或者null(被取消)
-
如果棧不為空且節點的樣式與首節點樣式匹配,則嘗試將該節點打上FULFILLING標記,然後加入棧中,與相應的節點匹配,成功後將這兩個節點彈出棧並傳回匹配節點的資料
-
如果有節點在匹配,那麼幫助這個節點完成匹配和出棧操作,然後在主迴圈中繼續執行
當節點加入棧內後,透過呼叫awaitFulfill()方法自旋等待節點匹配:
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 超時
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 當前執行緒
Thread w = Thread.currentThread();
// 自旋次數
// shouldSpin 用於檢測當前節點是否需要自旋
// 如果棧為空、該節點是首節點或者該節點是匹配節點,則先採用自旋,否則阻塞
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 執行緒中斷了,取消該節點
if (w.isInterrupted())
s.tryCancel();
// 匹配節點
SNode m = s.match;
// 如果匹配節點m不為空,則表示匹配成功,直接傳回
if (m != null)
return m;
// 超時
if (timed) {
nanos = deadline - System.nanoTime();
// 節點超時,取消
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 自旋;每次自旋的時候都需要檢查自身是否滿足自旋條件,滿足就 - 1,否則為0
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
// 第一次阻塞時,會將當前執行緒設定到s上
else if (s.waiter == null)
s.waiter = w;
// 阻塞 當前執行緒
else if (!timed)
LockSupport.park(this);
// 超時
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
awaitFulfill()方法會一直自旋/阻塞直到匹配節點。在S節點阻塞之前會先呼叫shouldSpin()方法判斷是否採用自旋方式,為的就是如果有生產者或者消費者馬上到來,就不需要阻塞了,在多核條件下這種最佳化是有必要的。同時在呼叫park()阻塞之前會將當前執行緒設定到S節點的waiter上。匹配成功,傳回匹配節點m。
shouldSpin()方法如下:
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
同時在阻塞過程中會一直檢測當前執行緒是否中斷了,如果中斷了,則呼叫tryCancel()方法取消該節點,取消過程就是將當前節點的math設定為當前節點。所以如果執行緒中斷了,那麼在傳回m時一定是S節點自身。
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
awaitFullfill()方法如果傳回的m == s,則表示當前節點已經中斷取消了,則需要呼叫clean()方法,清理節點S:
void clean(SNode s) {
// 清理item域
s.item = null;
// 清理waiter域
s.waiter = null;
// past節點
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// 從棧頂head節點,取消從棧頂head到past節點之間所有已經取消的節點
// 註意:這裡如果遇到一個節點沒有取消,則會退出while
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next); // 如果p節點已經取消了,則剔除該節點
// 如果經歷上面while p節點還沒有取消,則再次迴圈取消掉所有p 到past之間的取消節點
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
clean()方法就是將head節點到S節點之間所有已經取消的節點全部移出。【不清楚為何要用兩個while,一個不行麼】
至此,SynchronousQueue的原始碼分析完成了,說下我個人感覺吧:個人感覺SynchronousQueue實現好複雜(可能是自己智商不夠吧~~~~(>_