原文出處http://cmsblogs.com/ 『chenssy』
前面提到的各種BlockingQueue對讀或者寫都是鎖上整個佇列,在併發量大的時候,各種鎖是比較耗資源和耗時間的,而前面的SynchronousQueue雖然不會鎖住整個佇列,但它是一個沒有容量的“佇列”,那麼有沒有這樣一種佇列,它即可以像其他的BlockingQueue一樣有容量又可以像SynchronousQueue一樣不會鎖住整個佇列呢?有!答案就是LinkedTransferQueue。
LinkedTransferQueue是基於連結串列的FIFO無界阻塞佇列,它出現在JDK7中。Doug Lea 大神說LinkedTransferQueue是一個聰明的佇列。它是ConcurrentLinkedQueue、SynchronousQueue (公平樣式下)、無界的LinkedBlockingQueues等的超集。既然這麼牛逼,那勢必要弄清楚其中的原理了。
LinkedTransferQueue
看原始碼之前我們先稍微瞭解下它的原理,這樣看原始碼就會有跡可循了。
LinkedTransferQueue採用一種預佔樣式。什麼意思呢?有就直接拿走,沒有就佔著這個位置直到拿到或者超時或者中斷。即消費者執行緒到佇列中取元素時,如果發現佇列為空,則會生成一個null節點,然後park住等待生產者。後面如果生產者執行緒入隊時發現有一個null元素節點,這時生產者就不會入列了,直接將元素填充到該節點上,喚醒該節點的執行緒,被喚醒的消費者執行緒拿東西走人。是不是有點兒SynchronousQueue的味道?
結構
LinkedTransferQueue與其他的BlockingQueue一樣,同樣繼承AbstractQueue類,但是它實現了TransferQueue,TransferQueue介面繼承BlockingQueue,所以TransferQueue算是對BlockingQueue一種擴充,該介面提供了一整套的transfer介面:
public interface TransferQueue<E> extends BlockingQueue<E> {
/**
* 若當前存在一個正在等待獲取的消費者執行緒(使用take()或者poll()函式),使用該方法會即刻轉移/傳輸物件元素e;
* 若不存在,則傳回false,並且不進入佇列。這是一個不阻塞的操作
*/
boolean tryTransfer(E e);
/**
* 若當前存在一個正在等待獲取的消費者執行緒,即立刻移交之;
* 否則,會插入當前元素e到佇列尾部,並且等待進入阻塞狀態,到有消費者執行緒取走該元素
*/
void transfer(E e) throws InterruptedException;
/**
* 若當前存在一個正在等待獲取的消費者執行緒,會立即傳輸給它;否則將插入元素e到佇列尾部,並且等待被消費者執行緒獲取消費掉;
* 若在指定的時間內元素e無法被消費者執行緒獲取,則傳回false,同時該元素被移除。
*/
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 判斷是否存在消費者執行緒
*/
boolean hasWaitingConsumer();
/**
* 獲取所有等待獲取元素的消費執行緒數量
*/
int getWaitingConsumerCount();
}
相對於其他的BlockingQueue,LinkedTransferQueue就多了上面幾個方法。這幾個方法在LinkedTransferQueue中起到了核心作用。
LinkedTransferQueue定義的變數如下:
// 判斷是否為多核
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;
// 自旋次數
private static final int FRONT_SPINS = 1 << 7;
// 前驅節點正在處理,當前節點需要自旋的次數
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
static final int SWEEP_THRESHOLD = 32;
// 頭節點
transient volatile Node head;
// 尾節點
private transient volatile Node tail;
// 刪除節點失敗的次數
private transient volatile int sweepVotes;
/*
* 呼叫xfer()方法時需要傳入,區分不同處理
* xfer()方法是LinkedTransferQueue的最核心的方法
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
Node節點
Node節點由四個部分構成:
-
isData:表示該節點是存放資料還是獲取資料
-
item:存放資料,isData為false時,該節點為null,為true時,匹配後,該節點會置為null
-
next:指向下一個節點
-
waiter:park住消費者執行緒,執行緒就放在這裡
結構如下:
原始碼如下:
static final class Node {
// 表示該節點是存放資料還是獲取資料
final boolean isData;
// 存放資料,isData為false時,該節點為null,為true時,匹配後,該節點會置為null
volatile Object item;
//指向下一個節點
volatile Node next;
// park住消費者執行緒,執行緒就放在這裡
volatile Thread waiter; // null until waiting
/**
* CAS Next域
*/
final boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* CAS itme域
*/
final boolean casItem(Object cmp, Object val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* 建構式
*/
Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
/**
* 將next域指向自身,其實就是剔除節點
*/
final void forgetNext() {
UNSAFE.putObject(this, nextOffset, this);
}
/**
* 匹配過或節點被取消的時候會呼叫
*/
final void forgetContents() {
UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}
/**
* 校驗節點是否匹配過,如果匹配做取消了,item則會發生變化
*/
final boolean isMatched() {
Object x = item;
return (x == this) || ((x == null) == isData);
}
/**
* 是否是一個未匹配的請求節點
* 如果是的話isData應為false,item == null,因位如果匹配了,item則會有值
*/
final boolean isUnmatchedRequest() {
return !isData && item == null;
}
/**
* 如給定節點型別不能掛在當前節點後傳回true
*/
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
/**
* 匹配一個資料節點
*/
final boolean tryMatchData() {
// assert isData;
Object x = item;
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter);
return true;
}
return false;
}
private static final long serialVersionUID = -3375979862319811754L;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
private static final long waiterOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
waiterOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiter"));
} catch (Exception e) {
throw new Error(e);
}
}
}
節點Node為LinkedTransferQueue的內部類,其內部結構和公平方式的SynchronousQueue差不多,裡面也同樣提供了一些很重要的方法。
put操作
LinkedTransferQueue提供了add、put、offer三類方法,用於將元素插入佇列中,如下:
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
由於LinkedTransferQueue是無界的,不會阻塞,所以在呼叫xfer方法是傳入的是ASYNC,同時直接傳回true.
take操作
LinkedTransferQueue提供了poll、take方法用於出列元素:
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll() {
return xfer(null, false, NOW, 0);
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
這裡和put操作有點不一樣,take()方法傳入的是SYNC,阻塞。poll()傳入的是NOW,poll(long timeout, TimeUnit unit)則是傳入TIMED。
tranfer操作
實現TransferQueue介面,就要實現它的方法:
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
xfer()
透過上面幾個核心方法的原始碼我們清楚可以看到,最終都是呼叫xfer()方法,該方法接受四個引數,item或者null的E,put操作為true、take操作為false的havaData,how(有四個值NOW, ASYNC, SYNC, or TIMED,分別表示不同的操作),超時nanos。
private E xfer(E e, boolean haveData, int how, long nanos) {
// havaData為true,但是e == null 丟擲空指標
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) {
// 從首節點開始匹配
// p == null 佇列為空
for (Node h = head, p = h; p != null;) {
// 模型,request or data
boolean isData = p.isData;
// item域
Object item = p.item;
// 找到一個沒有匹配的節點
// item != p 也就是自身,則表示沒有匹配過
// (item != null) == isData,表示模型符合
if (item != p && (item != null) == isData) {
// 節點型別和待處理型別一致,這樣肯定是不能匹配的
if (isData == haveData) // can't match
break;
// 匹配,將E加入到item域中
// 如果p 的item為data,那麼e為null,如果p的item為null,那麼e為data
if (p.casItem(item, e)) { // match
//
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 匹配後喚醒p的waiter執行緒;reservation則叫人收貨,data則叫null收貨
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
// 如果已經匹配了則向前推進
Node n = p.next;
// 如果p的next指向p本身,說明p節點已經有其他執行緒處理過了,只能從head重新開始
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 如果沒有找到匹配的節點,則進行處理
// NOW為untimed poll, tryTransfer,不需要入隊
if (how != NOW) { // No matches available
// s == null,新建一個節點
if (s == null)
s = new Node(e, haveData);
// 入隊,傳回前驅節點
Node pred = tryAppend(s, haveData);
// 傳回的前驅節點為null,那就是有race,被其他的搶了,那就continue 整個for
if (pred == null)
continue retry;
// ASYNC不需要阻塞等待
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e;
}
}
整個演演算法的核心就是尋找匹配節點找到了就傳回,否則就入隊(NOW直接傳回):
-
matched。判斷匹配條件(isData不一樣,本身沒有匹配),匹配後就casItem,然後unpark匹配節點的waiter執行緒,如果是reservation則叫人收貨,data則叫null收貨。
-
unmatched。如果沒有找到匹配節點,則根據傳入的how來處理,NOW直接傳回,其餘三種先入對,入隊後如果是ASYNC則傳回,SYNC和TIMED則會阻塞等待匹配。
其實相當於SynchronousQueue來說,這個處理邏輯還是比較簡單的。
如果沒有找到匹配節點,且how != NOW會入隊,入隊則是呼叫tryAppend方法:
private Node tryAppend(Node s, boolean haveData) {
// 從尾節點tail開始
for (Node t = tail, p = t;;) {
Node n, u;
// 佇列為空則將節點S設定為head
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s;
}
// 如果為data
else if (p.cannotPrecede(haveData))
return null;
// 不是最後一個節點
else if ((n = p.next) != null)
p = p != t && t != (u = tail) ? (t = u) : (p != n) ? n : null;
// CAS失敗,一般來說失敗的原因在於p.next != null,可能有其他增加了tail,向前推薦
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
else {
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
tryAppend方法是將S節點新增到tail上,然後傳回其前驅節點。好吧,我承認這段程式碼我看的有點兒暈!!!
加入佇列後,如果how還不是ASYNC則呼叫awaitMatch()方法阻塞等待:
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// 超時控制
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 當前執行緒
Thread w = Thread.currentThread();
// 自旋次數
int spins = -1; // initialized after first item and cancel checks
// 隨機數
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
//匹配了,可能有其他執行緒匹配了執行緒
if (item != e) {
// 撤銷該節點
s.forgetContents();
return LinkedTransferQueue.<E>cast(item);
}
// 執行緒中斷或者超時了。則呼叫將s節點item設定為e,等待取消
if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel
// 斷開節點
unsplice(pred, s);
return e;
}
// 自旋
if (spins < 0) {
// 計算自旋次數
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
// 自旋
else if (spins > 0) {
--spins;
// 生成的隨機數 == 0 ,停止執行緒?不是很明白....
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield();
}
// 將當前執行緒設定到節點的waiter域
// 一開始s.waiter == null 肯定是會成立的,
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
// 超時阻塞
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
// 不是超時阻塞
LockSupport.park(this);
}
}
}
整個awaitMatch過程和SynchronousQueue的awaitFulfill沒有很大區別,不過在自旋過程會呼叫Thread.yield();這是幹嘛?
在awaitMatch過程中,如果執行緒中斷了,或者超時了則會呼叫unsplice()方法去除該節點:
final void unsplice(Node pred, Node s) {
s.forgetContents(); // forget unneeded fields
if (pred != null && pred != s && pred.next == s) {
Node n = s.next;
if (n == null ||
(n != s && pred.casNext(s, n) && pred.isMatched())) {
for (;;) { // check if at, or could be, head
Node h = head;
if (h == pred || h == s || h == null)
return; // at head or list empty
if (!h.isMatched())
break;
Node hn = h.next;
if (hn == null)
return; // now empty
if (hn != h && casHead(h, hn))
h.forgetNext(); // advance head
}
if (pred.next != pred && s.next != s) { // recheck if offlist
for (;;) { // sweep now if enough votes
int v = sweepVotes;
if (v < SWEEP_THRESHOLD) {
if (casSweepVotes(v, v + 1))
break;
}
else if (casSweepVotes(v, 0)) {
sweep();
break;
}
}
}
}
}
}
主體流程已經完成,這裡總結下:
-
無論是入對、出對,還是交換,最終都會跑到xfer(E e, boolean haveData, int how, long nanos)方法中,只不過傳入的how不同而已
-
如果佇列不為空,則嘗試在佇列中尋找是否存在與該節點相匹配的節點,如果找到則將匹配節點的item設定e,然後喚醒匹配節點的waiter執行緒。如果是reservation則叫人收貨,data則叫null收貨
-
如果佇列為空,或者沒有找到匹配的節點且how != NOW,則呼叫tryAppend()方法將節點新增到佇列的tail,然後傳回其前驅節點
-
如果節點的how != NOW && how != ASYNC,則呼叫awaitMatch()方法阻塞等待,在阻塞等待過程中和SynchronousQuque的awaitFulfill()邏輯差不多,都是先自旋,然後判斷是否需要自旋,如果中斷或者超時了則將該節點從佇列中移出
實體
這段摘自JAVA 1.7併發之LinkedTransferQueue原理理解。感覺看完上面的原始碼後,在結合這個例子會有更好的瞭解,掌握。
1:Head->Data Input->Data
Match: 根據他們的屬性 發現 cannot match ,因為是同類的
處理節點: 所以把新的data放在原來的data後面,然後head往後移一位,Reservation同理
HEAD=DATA->DATA
2:Head->Data Input->Reservation (取資料)
Match: 成功match,就把Data的item變為reservation的值(null,有主了),並且傳回資料。
處理節點: 沒動,head還在原地
HEAD=DATA(用過)
3:Head->Reservation Input->Data(放資料)
Match: 成功match,就把Reservation的item變為Data的值(有主了),並且叫waiter來取
處理節點: 沒動
HEAD=RESERVATION(用過)