歡迎光臨
每天分享高質量文章

【死磕Java併發】—–J.U.C之阻塞佇列:LinkedTransferQueue

原文出處http://cmsblogs.com/ 『chenssy

前面提到的各種BlockingQueue對讀或者寫都是鎖上整個佇列,在併發量大的時候,各種鎖是比較耗資源和耗時間的,而前面的SynchronousQueue雖然不會鎖住整個佇列,但它是一個沒有容量的“佇列”,那麼有沒有這樣一種佇列,它即可以像其他的BlockingQueue一樣有容量又可以像SynchronousQueue一樣不會鎖住整個佇列呢?有!答案就是LinkedTransferQueue。

LinkedTransferQueue是基於連結串列的FIFO無界阻塞佇列,它出現在JDK7中。Doug Lea 大神說LinkedTransferQueue是一個聰明的佇列。它是ConcurrentLinkedQueue、SynchronousQueue (公平樣式下)、無界的LinkedBlockingQueues等的超集。既然這麼牛逼,那勢必要弄清楚其中的原理了。

LinkedTransferQueue

看原始碼之前我們先稍微瞭解下它的原理,這樣看原始碼就會有跡可循了。

LinkedTransferQueue採用一種預佔樣式。什麼意思呢?有就直接拿走,沒有就佔著這個位置直到拿到或者超時或者中斷。即消費者執行緒到佇列中取元素時,如果發現佇列為空,則會生成一個null節點,然後park住等待生產者。後面如果生產者執行緒入隊時發現有一個null元素節點,這時生產者就不會入列了,直接將元素填充到該節點上,喚醒該節點的執行緒,被喚醒的消費者執行緒拿東西走人。是不是有點兒SynchronousQueue的味道?

結構

LinkedTransferQueue與其他的BlockingQueue一樣,同樣繼承AbstractQueue類,但是它實現了TransferQueue,TransferQueue介面繼承BlockingQueue,所以TransferQueue算是對BlockingQueue一種擴充,該介面提供了一整套的transfer介面:

  1.    public interface TransferQueue<E> extends BlockingQueue<E> {

  2.        /**

  3.         * 若當前存在一個正在等待獲取的消費者執行緒(使用take()或者poll()函式),使用該方法會即刻轉移/傳輸物件元素e;

  4.         * 若不存在,則傳回false,並且不進入佇列。這是一個不阻塞的操作

  5.         */

  6.        boolean tryTransfer(E e);

  7.        /**

  8.         * 若當前存在一個正在等待獲取的消費者執行緒,即立刻移交之;

  9.         * 否則,會插入當前元素e到佇列尾部,並且等待進入阻塞狀態,到有消費者執行緒取走該元素

  10.         */

  11.        void transfer(E e) throws InterruptedException;

  12.        /**

  13.         * 若當前存在一個正在等待獲取的消費者執行緒,會立即傳輸給它;否則將插入元素e到佇列尾部,並且等待被消費者執行緒獲取消費掉;

  14.         * 若在指定的時間內元素e無法被消費者執行緒獲取,則傳回false,同時該元素被移除。

  15.         */

  16.        boolean tryTransfer(E e, long timeout, TimeUnit unit)

  17.                throws InterruptedException;

  18.        /**

  19.         * 判斷是否存在消費者執行緒

  20.         */

  21.        boolean hasWaitingConsumer();

  22.        /**

  23.         * 獲取所有等待獲取元素的消費執行緒數量

  24.         */

  25.        int getWaitingConsumerCount();

  26.    }

相對於其他的BlockingQueue,LinkedTransferQueue就多了上面幾個方法。這幾個方法在LinkedTransferQueue中起到了核心作用。

LinkedTransferQueue定義的變數如下:

  1.    // 判斷是否為多核

  2.    private static final boolean MP =

  3.            Runtime.getRuntime().availableProcessors() > 1;

  4.    // 自旋次數

  5.    private static final int FRONT_SPINS   = 1 << 7;

  6.    // 前驅節點正在處理,當前節點需要自旋的次數

  7.    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

  8.    static final int SWEEP_THRESHOLD = 32;

  9.    // 頭節點

  10.    transient volatile Node head;

  11.    // 尾節點

  12.    private transient volatile Node tail;

  13.    // 刪除節點失敗的次數

  14.    private transient volatile int sweepVotes;

  15.    /*

  16.     * 呼叫xfer()方法時需要傳入,區分不同處理

  17.     * xfer()方法是LinkedTransferQueue的最核心的方法

  18.     */

  19.    private static final int NOW   = 0; // for untimed poll, tryTransfer

  20.    private static final int ASYNC = 1; // for offer, put, add

  21.    private static final int SYNC  = 2; // for transfer, take

  22.    private static final int TIMED = 3; // for timed poll, tryTransfer

Node節點

Node節點由四個部分構成:

  • isData:表示該節點是存放資料還是獲取資料

  • item:存放資料,isData為false時,該節點為null,為true時,匹配後,該節點會置為null

  • next:指向下一個節點

  • waiter:park住消費者執行緒,執行緒就放在這裡

結構如下:

原始碼如下:

  1.    static final class Node {

  2.        // 表示該節點是存放資料還是獲取資料

  3.        final boolean isData;

  4.        // 存放資料,isData為false時,該節點為null,為true時,匹配後,該節點會置為null

  5.        volatile Object item;

  6.        //指向下一個節點

  7.        volatile Node next;

  8.        // park住消費者執行緒,執行緒就放在這裡

  9.        volatile Thread waiter; // null until waiting

  10.        /**

  11.         * CAS Next域

  12.         */

  13.        final boolean casNext(Node cmp, Node val) {

  14.            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);

  15.        }

  16.        /**

  17.         * CAS itme域

  18.         */

  19.        final boolean casItem(Object cmp, Object val) {

  20.            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);

  21.        }

  22.        /**

  23.         * 建構式

  24.         */

  25.        Node(Object item, boolean isData) {

  26.            UNSAFE.putObject(this, itemOffset, item); // relaxed write

  27.            this.isData = isData;

  28.        }

  29.        /**

  30.         * 將next域指向自身,其實就是剔除節點

  31.         */

  32.        final void forgetNext() {

  33.            UNSAFE.putObject(this, nextOffset, this);

  34.        }

  35.        /**

  36.         *  匹配過或節點被取消的時候會呼叫

  37.         */

  38.        final void forgetContents() {

  39.            UNSAFE.putObject(this, itemOffset, this);

  40.            UNSAFE.putObject(this, waiterOffset, null);

  41.        }

  42.        /**

  43.         * 校驗節點是否匹配過,如果匹配做取消了,item則會發生變化

  44.         */

  45.        final boolean isMatched() {

  46.            Object x = item;

  47.            return (x == this) || ((x == null) == isData);

  48.        }

  49.        /**

  50.         * 是否是一個未匹配的請求節點

  51.         * 如果是的話isData應為false,item == null,因位如果匹配了,item則會有值

  52.         */

  53.        final boolean isUnmatchedRequest() {

  54.            return !isData && item == null;

  55.        }

  56.        /**

  57.         * 如給定節點型別不能掛在當前節點後傳回true

  58.         */

  59.        final boolean cannotPrecede(boolean haveData) {

  60.            boolean d = isData;

  61.            Object x;

  62.            return d != haveData && (x = item) != this && (x != null) == d;

  63.        }

  64.        /**

  65.         * 匹配一個資料節點

  66.         */

  67.        final boolean tryMatchData() {

  68.            // assert isData;

  69.            Object x = item;

  70.            if (x != null && x != this && casItem(x, null)) {

  71.                LockSupport.unpark(waiter);

  72.                return true;

  73.            }

  74.            return false;

  75.        }

  76.        private static final long serialVersionUID = -3375979862319811754L;

  77.        // Unsafe mechanics

  78.        private static final sun.misc.Unsafe UNSAFE;

  79.        private static final long itemOffset;

  80.        private static final long nextOffset;

  81.        private static final long waiterOffset;

  82.        static {

  83.            try {

  84.                UNSAFE = sun.misc.Unsafe.getUnsafe();

  85.                Class> k = Node.class;

  86.                itemOffset = UNSAFE.objectFieldOffset

  87.                        (k.getDeclaredField("item"));

  88.                nextOffset = UNSAFE.objectFieldOffset

  89.                        (k.getDeclaredField("next"));

  90.                waiterOffset = UNSAFE.objectFieldOffset

  91.                        (k.getDeclaredField("waiter"));

  92.            } catch (Exception e) {

  93.                throw new Error(e);

  94.            }

  95.        }

  96.    }

節點Node為LinkedTransferQueue的內部類,其內部結構和公平方式的SynchronousQueue差不多,裡面也同樣提供了一些很重要的方法。

put操作

LinkedTransferQueue提供了add、put、offer三類方法,用於將元素插入佇列中,如下:

  1.    public void put(E e) {

  2.        xfer(e, true, ASYNC, 0);

  3.    }

  4.    public boolean offer(E e, long timeout, TimeUnit unit) {

  5.        xfer(e, true, ASYNC, 0);

  6.        return true;

  7.    }

  8.    public boolean offer(E e) {

  9.        xfer(e, true, ASYNC, 0);

  10.        return true;

  11.    }

  12.    public boolean add(E e) {

  13.        xfer(e, true, ASYNC, 0);

  14.        return true;

  15.    }

由於LinkedTransferQueue是無界的,不會阻塞,所以在呼叫xfer方法是傳入的是ASYNC,同時直接傳回true.

take操作

LinkedTransferQueue提供了poll、take方法用於出列元素:

  1.    public E take() throws InterruptedException {

  2.        E e = xfer(null, false, SYNC, 0);

  3.        if (e != null)

  4.            return e;

  5.        Thread.interrupted();

  6.        throw new InterruptedException();

  7.    }

  8.    public E poll() {

  9.        return xfer(null, false, NOW, 0);

  10.    }

  11.    public E poll(long timeout, TimeUnit unit) throws InterruptedException {

  12.        E e = xfer(null, false, TIMED, unit.toNanos(timeout));

  13.        if (e != null || !Thread.interrupted())

  14.            return e;

  15.        throw new InterruptedException();

  16.    }

這裡和put操作有點不一樣,take()方法傳入的是SYNC,阻塞。poll()傳入的是NOW,poll(long timeout, TimeUnit unit)則是傳入TIMED。

tranfer操作

實現TransferQueue介面,就要實現它的方法:

  1. public boolean tryTransfer(E e, long timeout, TimeUnit unit)

  2.    throws InterruptedException {

  3.    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)

  4.        return true;

  5.    if (!Thread.interrupted())

  6.        return false;

  7.    throw new InterruptedException();

  8. }

  9. public void transfer(E e) throws InterruptedException {

  10.    if (xfer(e, true, SYNC, 0) != null) {

  11.        Thread.interrupted(); // failure possible only due to interrupt

  12.        throw new InterruptedException();

  13.    }

  14. }

  15. public boolean tryTransfer(E e) {

  16.    return xfer(e, true, NOW, 0) == null;

  17. }

xfer()

透過上面幾個核心方法的原始碼我們清楚可以看到,最終都是呼叫xfer()方法,該方法接受四個引數,item或者null的E,put操作為true、take操作為false的havaData,how(有四個值NOW, ASYNC, SYNC, or TIMED,分別表示不同的操作),超時nanos。

  1.    private E xfer(E e, boolean haveData, int how, long nanos) {

  2.        // havaData為true,但是e == null 丟擲空指標

  3.        if (haveData && (e == null))

  4.            throw new NullPointerException();

  5.        Node s = null;                        // the node to append, if needed

  6.        retry:

  7.        for (;;) {

  8.            // 從首節點開始匹配

  9.            // p == null 佇列為空

  10.            for (Node h = head, p = h; p != null;) {

  11.                // 模型,request or data

  12.                boolean isData = p.isData;

  13.                // item域

  14.                Object item = p.item;

  15.                // 找到一個沒有匹配的節點

  16.                // item != p 也就是自身,則表示沒有匹配過

  17.                // (item != null) == isData,表示模型符合

  18.                if (item != p && (item != null) == isData) {

  19.                    // 節點型別和待處理型別一致,這樣肯定是不能匹配的

  20.                    if (isData == haveData)   // can't match

  21.                        break;

  22.                    // 匹配,將E加入到item域中

  23.                    // 如果p 的item為data,那麼e為null,如果p的item為null,那麼e為data

  24.                    if (p.casItem(item, e)) { // match

  25.                        //

  26.                        for (Node q = p; q != h;) {

  27.                            Node n = q.next;  // update by 2 unless singleton

  28.                            if (head == h && casHead(h, n == null ? q : n)) {

  29.                                h.forgetNext();

  30.                                break;

  31.                            }                 // advance and retry

  32.                            if ((h = head)   == null ||

  33.                                    (q = h.next) == null || !q.isMatched())

  34.                                break;        // unless slack < 2

  35.                        }

  36.                        // 匹配後喚醒p的waiter執行緒;reservation則叫人收貨,data則叫null收貨

  37.                        LockSupport.unpark(p.waiter);

  38.                        return LinkedTransferQueue.<E>cast(item);

  39.                    }

  40.                }

  41.                // 如果已經匹配了則向前推進

  42.                Node n = p.next;

  43.                // 如果p的next指向p本身,說明p節點已經有其他執行緒處理過了,只能從head重新開始

  44.                p = (p != n) ? n : (h = head); // Use head if p offlist

  45.            }

  46.            // 如果沒有找到匹配的節點,則進行處理

  47.            // NOW為untimed poll, tryTransfer,不需要入隊

  48.            if (how != NOW) {                 // No matches available

  49.                // s == null,新建一個節點

  50.                if (s == null)

  51.                    s = new Node(e, haveData);

  52.                // 入隊,傳回前驅節點

  53.                Node pred = tryAppend(s, haveData);

  54.                // 傳回的前驅節點為null,那就是有race,被其他的搶了,那就continue 整個for

  55.                if (pred == null)

  56.                    continue retry;

  57.                // ASYNC不需要阻塞等待

  58.                if (how != ASYNC)

  59.                    return awaitMatch(s, pred, e, (how == TIMED), nanos);

  60.            }

  61.            return e;

  62.        }

  63.    }

整個演演算法的核心就是尋找匹配節點找到了就傳回,否則就入隊(NOW直接傳回):

  • matched。判斷匹配條件(isData不一樣,本身沒有匹配),匹配後就casItem,然後unpark匹配節點的waiter執行緒,如果是reservation則叫人收貨,data則叫null收貨。

  • unmatched。如果沒有找到匹配節點,則根據傳入的how來處理,NOW直接傳回,其餘三種先入對,入隊後如果是ASYNC則傳回,SYNC和TIMED則會阻塞等待匹配。

其實相當於SynchronousQueue來說,這個處理邏輯還是比較簡單的。

如果沒有找到匹配節點,且how != NOW會入隊,入隊則是呼叫tryAppend方法:

  1.    private Node tryAppend(Node s, boolean haveData) {

  2.        // 從尾節點tail開始

  3.        for (Node t = tail, p = t;;) {

  4.            Node n, u;

  5.            // 佇列為空則將節點S設定為head

  6.            if (p == null && (p = head) == null) {

  7.                if (casHead(null, s))

  8.                    return s;

  9.            }

  10.            // 如果為data

  11.            else if (p.cannotPrecede(haveData))

  12.                return null;

  13.            // 不是最後一個節點

  14.            else if ((n = p.next) != null)

  15.                p = p != t && t != (u = tail) ? (t = u) : (p != n) ? n : null;

  16.            // CAS失敗,一般來說失敗的原因在於p.next != null,可能有其他增加了tail,向前推薦

  17.            else if (!p.casNext(null, s))

  18.                p = p.next;                   // re-read on CAS failure

  19.            else {

  20.                if (p != t) {                 // update if slack now >= 2

  21.                    while ((tail != t || !casTail(t, s)) &&

  22.                            (t = tail)   != null &&

  23.                            (s = t.next) != null && // advance and retry

  24.                            (s = s.next) != null && s != t);

  25.                }

  26.                return p;

  27.            }

  28.        }

  29.    }

tryAppend方法是將S節點新增到tail上,然後傳回其前驅節點。好吧,我承認這段程式碼我看的有點兒暈!!!

加入佇列後,如果how還不是ASYNC則呼叫awaitMatch()方法阻塞等待:

  1.    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {

  2.        // 超時控制

  3.        final long deadline = timed ? System.nanoTime() + nanos : 0L;

  4.        // 當前執行緒

  5.        Thread w = Thread.currentThread();

  6.        // 自旋次數

  7.        int spins = -1; // initialized after first item and cancel checks

  8.        // 隨機數

  9.        ThreadLocalRandom randomYields = null; // bound if needed

  10.        for (;;) {

  11.            Object item = s.item;

  12.            //匹配了,可能有其他執行緒匹配了執行緒

  13.            if (item != e) {

  14.                // 撤銷該節點

  15.                s.forgetContents();

  16.                return LinkedTransferQueue.<E>cast(item);

  17.            }

  18.            // 執行緒中斷或者超時了。則呼叫將s節點item設定為e,等待取消

  19.            if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) {        // cancel

  20.                // 斷開節點

  21.                unsplice(pred, s);

  22.                return e;

  23.            }

  24.            // 自旋

  25.            if (spins < 0) {

  26.                // 計算自旋次數

  27.                if ((spins = spinsFor(pred, s.isData)) > 0)

  28.                    randomYields = ThreadLocalRandom.current();

  29.            }

  30.            // 自旋

  31.            else if (spins > 0) {

  32.                --spins;

  33.                // 生成的隨機數 == 0 ,停止執行緒?不是很明白....

  34.                if (randomYields.nextInt(CHAINED_SPINS) == 0)

  35.                    Thread.yield();

  36.            }

  37.            // 將當前執行緒設定到節點的waiter域

  38.            // 一開始s.waiter == null 肯定是會成立的,

  39.            else if (s.waiter == null) {

  40.                s.waiter = w;                 // request unpark then recheck

  41.            }

  42.            // 超時阻塞

  43.            else if (timed) {

  44.                nanos = deadline - System.nanoTime();

  45.                if (nanos > 0L)

  46.                    LockSupport.parkNanos(this, nanos);

  47.            }

  48.            else {

  49.                // 不是超時阻塞

  50.                LockSupport.park(this);

  51.            }

  52.        }

  53.    }

整個awaitMatch過程和SynchronousQueue的awaitFulfill沒有很大區別,不過在自旋過程會呼叫Thread.yield();這是幹嘛?

在awaitMatch過程中,如果執行緒中斷了,或者超時了則會呼叫unsplice()方法去除該節點:

  1.    final void unsplice(Node pred, Node s) {

  2.        s.forgetContents(); // forget unneeded fields

  3.        if (pred != null && pred != s && pred.next == s) {

  4.            Node n = s.next;

  5.            if (n == null ||

  6.                    (n != s && pred.casNext(s, n) && pred.isMatched())) {

  7.                for (;;) {               // check if at, or could be, head

  8.                    Node h = head;

  9.                    if (h == pred || h == s || h == null)

  10.                        return;          // at head or list empty

  11.                    if (!h.isMatched())

  12.                        break;

  13.                    Node hn = h.next;

  14.                    if (hn == null)

  15.                        return;          // now empty

  16.                    if (hn != h && casHead(h, hn))

  17.                        h.forgetNext();  // advance head

  18.                }

  19.                if (pred.next != pred && s.next != s) { // recheck if offlist

  20.                    for (;;) {           // sweep now if enough votes

  21.                        int v = sweepVotes;

  22.                        if (v < SWEEP_THRESHOLD) {

  23.                            if (casSweepVotes(v, v + 1))

  24.                                break;

  25.                        }

  26.                        else if (casSweepVotes(v, 0)) {

  27.                            sweep();

  28.                            break;

  29.                        }

  30.                    }

  31.                }

  32.            }

  33.        }

  34.    }

主體流程已經完成,這裡總結下:

  1. 無論是入對、出對,還是交換,最終都會跑到xfer(E e, boolean haveData, int how, long nanos)方法中,只不過傳入的how不同而已

  2. 如果佇列不為空,則嘗試在佇列中尋找是否存在與該節點相匹配的節點,如果找到則將匹配節點的item設定e,然後喚醒匹配節點的waiter執行緒。如果是reservation則叫人收貨,data則叫null收貨

  3. 如果佇列為空,或者沒有找到匹配的節點且how != NOW,則呼叫tryAppend()方法將節點新增到佇列的tail,然後傳回其前驅節點

  4. 如果節點的how != NOW && how != ASYNC,則呼叫awaitMatch()方法阻塞等待,在阻塞等待過程中和SynchronousQuque的awaitFulfill()邏輯差不多,都是先自旋,然後判斷是否需要自旋,如果中斷或者超時了則將該節點從佇列中移出

實體

這段摘自JAVA 1.7併發之LinkedTransferQueue原理理解。感覺看完上面的原始碼後,在結合這個例子會有更好的瞭解,掌握。

1:Head->Data Input->Data
Match: 根據他們的屬性 發現 cannot match ,因為是同類的
處理節點: 所以把新的data放在原來的data後面,然後head往後移一位,Reservation同理
HEAD=DATA->DATA

2:Head->Data Input->Reservation (取資料)
Match: 成功match,就把Data的item變為reservation的值(null,有主了),並且傳回資料。
處理節點: 沒動,head還在原地
HEAD=DATA(用過)

3:Head->Reservation Input->Data(放資料)
Match: 成功match,就把Reservation的item變為Data的值(有主了),並且叫waiter來取
處理節點: 沒動
HEAD=RESERVATION(用過)

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖