歡迎光臨
每天分享高質量文章

【死磕Java併發】—– J.U.C之AQS:同步狀態的獲取與釋放

此篇部落格所有原始碼均來自JDK 1.8

在前面提到過,AQS是構建Java同步元件的基礎,我們期待它能夠成為實現大部分同步需求的基礎。AQS的設計樣式採用的模板方法樣式,子類透過繼承的方式,實現它的抽象方法來管理同步狀態,對於子類而言它並沒有太多的活要做,AQS提供了大量的模板方法來實現同步,主要是分為三類:獨佔式獲取和釋放同步狀態、共享式獲取和釋放同步狀態、查詢同步佇列中的等待執行緒情況。自定義子類使用AQS提供的模板方法就可以實現自己的同步語意。

作者:大明哥
原文地址:http://cmsblogs.com/?p=2197

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【大明哥】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【大明哥】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【大明哥】搞基嗨皮。

獨佔式

獨佔式,同一時刻僅有一個執行緒持有同步狀態。

獨佔式同步狀態獲取

acquire(int arg)方法為AQS提供的模板方法,該方法為獨佔式獲取同步狀態,但是該方法對中斷不敏感,也就是說由於執行緒獲取同步狀態失敗加入到CLH同步佇列中,後續對執行緒進行中斷操作時,執行緒不會從同步佇列中移除。程式碼如下:

  1.    public final void acquire(int arg) {

  2.        if (!tryAcquire(arg) &&

  3.            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

  4.            selfInterrupt();

  5.    }

各個方法定義如下:

  1. tryAcquire:去嘗試獲取鎖,獲取成功則設定鎖狀態並傳回true,否則傳回false。該方法自定義同步元件自己實現,該方法必須要保證執行緒安全的獲取同步狀態。

  2. addWaiter:如果tryAcquire傳回FALSE(獲取同步狀態失敗),則呼叫該方法將當前執行緒加入到CLH同步佇列尾部。

  3. acquireQueued:當前執行緒會根據公平性原則來進行阻塞等待(自旋),直到獲取鎖為止;並且傳回當前執行緒在等待過程中有沒有中斷過。

  4. selfInterrupt:產生一個中斷。

acquireQueued方法為一個自旋的過程,也就是說當前執行緒(Node)進入同步佇列後,就會進入一個自旋的過程,每個節點都會自省地觀察,當條件滿足,獲取到同步狀態後,就可以從這個自旋過程中退出,否則會一直執行下去。如下:

  1.    final boolean acquireQueued(final Node node, int arg) {

  2.        boolean failed = true;

  3.        try {

  4.            //中斷標誌

  5.            boolean interrupted = false;

  6.            /*

  7.             * 自旋過程,其實就是一個死迴圈而已

  8.             */

  9.            for (;;) {

  10.                //當前執行緒的前驅節點

  11.                final Node p = node.predecessor();

  12.                //當前執行緒的前驅節點是頭結點,且同步狀態成功

  13.                if (p == head && tryAcquire(arg)) {

  14.                    setHead(node);

  15.                    p.next = null; // help GC

  16.                    failed = false;

  17.                    return interrupted;

  18.                }

  19.                //獲取失敗,執行緒等待--具體後面介紹

  20.                if (shouldParkAfterFailedAcquire(p, node) &&

  21.                        parkAndCheckInterrupt())

  22.                    interrupted = true;

  23.            }

  24.        } finally {

  25.            if (failed)

  26.                cancelAcquire(node);

  27.        }

  28.    }

從上面程式碼中可以看到,當前執行緒會一直嘗試獲取同步狀態,當然前提是隻有其前驅節點為頭結點才能夠嘗試獲取同步狀態,理由:

  1. 保持FIFO同步佇列原則。

  2. 頭節點釋放同步狀態後,將會喚醒其後繼節點,後繼節點被喚醒後需要檢查自己是否為頭節點。

acquire(int arg)方法流程圖如下:

獨佔式獲取響應中斷

AQS提供了acquire(int arg)方法以供獨佔式獲取同步狀態,但是該方法對中斷不響應,對執行緒進行中斷操作後,該執行緒會依然位於CLH同步佇列中等待著獲取同步狀態。為了響應中斷,AQS提供了acquireInterruptibly(int arg)方法,該方法在等待獲取同步狀態時,如果當前執行緒被中斷了,會立刻響應中斷丟擲異常InterruptedException。

  1.    public final void acquireInterruptibly(int arg)

  2.            throws InterruptedException {

  3.        if (Thread.interrupted())

  4.            throw new InterruptedException();

  5.        if (!tryAcquire(arg))

  6.            doAcquireInterruptibly(arg);

  7.    }

首先校驗該執行緒是否已經中斷了,如果是則丟擲InterruptedException,否則執行tryAcquire(int arg)方法獲取同步狀態,如果獲取成功,則直接傳回,否則執行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定義如下:

  1. private void doAcquireInterruptibly(int arg)

  2.        throws InterruptedException {

  3.        final Node node = addWaiter(Node.EXCLUSIVE);

  4.        boolean failed = true;

  5.        try {

  6.            for (;;) {

  7.                final Node p = node.predecessor();

  8.                if (p == head && tryAcquire(arg)) {

  9.                    setHead(node);

  10.                    p.next = null; // help GC

  11.                    failed = false;

  12.                    return;

  13.                }

  14.                if (shouldParkAfterFailedAcquire(p, node) &&

  15.                    parkAndCheckInterrupt())

  16.                    throw new InterruptedException();

  17.            }

  18.        } finally {

  19.            if (failed)

  20.                cancelAcquire(node);

  21.        }

  22.    }

doAcquireInterruptibly(int arg)方法與acquire(int arg)方法僅有兩個差別。1.方法宣告丟擲InterruptedException異常,2.在中斷方法處不再是使用interrupted標誌,而是直接丟擲InterruptedException異常。

獨佔式超時獲取

AQS除了提供上面兩個方法外,還提供了一個增強版的方法:tryAcquireNanos(int arg,long nanos)。該方法為acquireInterruptibly方法的進一步增強,它除了響應中斷外,還有超時控制。即如果當前執行緒沒有在指定時間內獲取同步狀態,則會傳回false,否則傳回true。如下:

  1.   public final boolean tryAcquireNanos(int arg, long nanosTimeout)

  2.            throws InterruptedException {

  3.        if (Thread.interrupted())

  4.            throw new InterruptedException();

  5.        return tryAcquire(arg) ||

  6.            doAcquireNanos(arg, nanosTimeout);

  7.    }

tryAcquireNanos(int arg, long nanosTimeout)方法超時獲取最終是在doAcquireNanos(int arg, long nanosTimeout)中實現的,如下:

  1.    private boolean doAcquireNanos(int arg, long nanosTimeout)

  2.            throws InterruptedException {

  3.        //nanosTimeout <= 0

  4.        if (nanosTimeout <= 0L)

  5.            return false;

  6.        //超時時間

  7.        final long deadline = System.nanoTime() + nanosTimeout;

  8.        //新增Node節點

  9.        final Node node = addWaiter(Node.EXCLUSIVE);

  10.        boolean failed = true;

  11.        try {

  12.            //自旋

  13.            for (;;) {

  14.                final Node p = node.predecessor();

  15.                //獲取同步狀態成功

  16.                if (p == head && tryAcquire(arg)) {

  17.                    setHead(node);

  18.                    p.next = null; // help GC

  19.                    failed = false;

  20.                    return true;

  21.                }

  22.                /*

  23.                 * 獲取失敗,做超時、中斷判斷

  24.                 */

  25.                //重新計算需要休眠的時間

  26.                nanosTimeout = deadline - System.nanoTime();

  27.                //已經超時,傳回false

  28.                if (nanosTimeout <= 0L)

  29.                    return false;

  30.                //如果沒有超時,則等待nanosTimeout納秒

  31.                //註:該執行緒會直接從LockSupport.parkNanos中傳回,

  32.                //LockSupport為JUC提供的一個阻塞和喚醒的工具類,後面做詳細介紹

  33.                if (shouldParkAfterFailedAcquire(p, node) &&

  34.                        nanosTimeout > spinForTimeoutThreshold)

  35.                    LockSupport.parkNanos(this, nanosTimeout);

  36.                //執行緒是否已經中斷了

  37.                if (Thread.interrupted())

  38.                    throw new InterruptedException();

  39.            }

  40.        } finally {

  41.            if (failed)

  42.                cancelAcquire(node);

  43.        }

  44.    }

針對超時控制,程式首先記錄喚醒時間deadline ,deadline = System.nanoTime() + nanosTimeout(時間間隔)。如果獲取同步狀態失敗,則需要計算出需要休眠的時間間隔nanosTimeout(= deadline – System.nanoTime()),如果nanosTimeout <= 0 表示已經超時了,傳回false,如果大於spinForTimeoutThreshold(1000L)則需要休眠nanosTimeout ,如果nanosTimeout <= spinForTimeoutThreshold ,就不需要休眠了,直接進入快速自旋的過程。原因在於 spinForTimeoutThreshold 已經非常小了,非常短的時間等待無法做到十分精確,如果這時再次進行超時等待,相反會讓nanosTimeout 的超時從整體上面表現得不是那麼精確,所以在超時非常短的場景中,AQS會進行無條件的快速自旋。

整個流程如下:

獨佔式同步狀態釋放

當執行緒獲取同步狀態後,執行完相應邏輯後就需要釋放同步狀態。AQS提供了release(int arg)方法釋放同步狀態:

  1.    public final boolean release(int arg) {

  2.        if (tryRelease(arg)) {

  3.            Node h = head;

  4.            if (h != null && h.waitStatus != 0)

  5.                unparkSuccessor(h);

  6.            return true;

  7.        }

  8.        return false;

  9.    }

該方法同樣是先呼叫自定義同步器自定義的tryRelease(int arg)方法來釋放同步狀態,釋放成功後,會呼叫unparkSuccessor(Node node)方法喚醒後繼節點(如何喚醒LZ後面介紹)。

這裡稍微總結下:

在AQS中維護著一個FIFO的同步佇列,當執行緒獲取同步狀態失敗後,則會加入到這個CLH同步佇列的對尾並一直保持著自旋。在CLH同步佇列中的執行緒在自旋時會判斷其前驅節點是否為首節點,如果為首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步佇列。當執行緒執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。

共享式

共享式與獨佔式的最主要區別在於同一時刻獨佔式只能有一個執行緒獲取同步狀態,而共享式在同一時刻可以有多個執行緒獲取同步狀態。例如讀操作可以有多個執行緒同時進行,而寫操作同一時刻只能有一個執行緒進行寫操作,其他操作都會被阻塞。

共享式同步狀態獲取

AQS提供acquireShared(int arg)方法共享式獲取同步狀態:

  1.    public final void acquireShared(int arg) {

  2.        if (tryAcquireShared(arg) < 0)

  3.            //獲取失敗,自旋獲取同步狀態

  4.            doAcquireShared(arg);

  5.    }

從上面程式可以看出,方法首先是呼叫tryAcquireShared(int arg)方法嘗試獲取同步狀態,如果獲取失敗則呼叫doAcquireShared(int arg)自旋方式獲取同步狀態,共享式獲取同步狀態的標誌是傳回 >= 0 的值表示獲取成功。自選式獲取同步狀態如下:

  1.    private void doAcquireShared(int arg) {

  2.        /共享式節點

  3.        final Node node = addWaiter(Node.SHARED);

  4.        boolean failed = true;

  5.        try {

  6.            boolean interrupted = false;

  7.            for (;;) {

  8.                //前驅節點

  9.                final Node p = node.predecessor();

  10.                //如果其前驅節點,獲取同步狀態

  11.                if (p == head) {

  12.                    //嘗試獲取同步

  13.                    int r = tryAcquireShared(arg);

  14.                    if (r >= 0) {

  15.                        setHeadAndPropagate(node, r);

  16.                        p.next = null; // help GC

  17.                        if (interrupted)

  18.                            selfInterrupt();

  19.                        failed = false;

  20.                        return;

  21.                    }

  22.                }

  23.                if (shouldParkAfterFailedAcquire(p, node) &&

  24.                        parkAndCheckInterrupt())

  25.                    interrupted = true;

  26.            }

  27.        } finally {

  28.            if (failed)

  29.                cancelAcquire(node);

  30.        }

  31.    }

tryAcquireShared(int arg)方法嘗試獲取同步狀態,傳回值為int,當其 >= 0 時,表示能夠獲取到同步狀態,這個時候就可以從自旋過程中退出。

acquireShared(int arg)方法不響應中斷,與獨佔式相似,AQS也提供了響應中斷、超時的方法,分別是:acquireSharedInterruptibly(int arg)、tryAcquireSharedNanos(int arg,long nanos),這裡就不做解釋了。

共享式同步狀態釋放

獲取同步狀態後,需要呼叫release(int arg)方法釋放同步狀態,方法如下:

  1.    public final boolean releaseShared(int arg) {

  2.        if (tryReleaseShared(arg)) {

  3.            doReleaseShared();

  4.            return true;

  5.        }

  6.        return false;

  7.    }

因為可能會存在多個執行緒同時進行釋放同步狀態資源,所以需要確保同步狀態安全地成功釋放,一般都是透過CAS和迴圈來完成的。

參考資料

Doug Lea:《Java併發程式設計實戰》
方騰飛:《Java併發程式設計的藝術》


贊(0)

分享創造快樂