歡迎光臨
每天分享高質量文章

怎麼理解Condition?

精品專欄

 

作者:liuinsect
原文出處:http://www.liuinsect.com/?p=69


在java.util.concurrent包中,有兩個很特殊的工具類,Condition和ReentrantLock,使用過的人都知道,ReentrantLock(重入鎖)是jdk的concurrent包提供的一種獨佔鎖的實現。它繼承自Dong Lea的 AbstractQueuedSynchronizer(同步器),確切的說是ReentrantLock的一個內部類繼承了AbstractQueuedSynchronizer,ReentrantLock只不過是代理了該類的一些方法,可能有人會問為什麼要使用內部類在包裝一層? 我想是安全的關係,因為AbstractQueuedSynchronizer中有很多方法,還實現了共享鎖,Condition(稍候再細說)等功能,如果直接使ReentrantLock繼承它,則很容易出現AbstractQueuedSynchronizer中的API被無用的情況。

言歸正傳,今天,我們討論下Condition工具類的實現。

ReentrantLock和Condition的使用方式通常是這樣的:

  1. public static void main(String[] args) {

  2.    final ReentrantLock reentrantLock = new ReentrantLock();

  3.    final Condition condition = reentrantLock.newCondition();

  4.    Thread thread = new Thread((Runnable) () -> {

  5.            try {

  6.                reentrantLock.lock();

  7.                System.out.println("我要等一個新訊號" + this);

  8.                condition.wait();

  9.            }

  10.            catch (InterruptedException e) {

  11.                e.printStackTrace();

  12.            }

  13.            System.out.println("拿到一個訊號!!" + this);

  14.            reentrantLock.unlock();

  15.    }, "waitThread1");

  16.    thread.start();

  17.    Thread thread1 = new Thread((Runnable) () -> {

  18.            reentrantLock.lock();

  19.            System.out.println("我拿到鎖了");

  20.            try {

  21.                Thread.sleep(3000);

  22.            }

  23.            catch (InterruptedException e) {

  24.                e.printStackTrace();

  25.            }

  26.            condition.signalAll();

  27.            System.out.println("我發了一個訊號!!");

  28.            reentrantLock.unlock();

  29.    }, "signalThread");

  30.    thread1.start();

  31. }

執行後,結果如下:

  1. 我要等一個新訊號lock.ReentrantLockTest$1@a62fc3

  2. 我拿到鎖了

  3. 我發了一個訊號!!

  4. 拿到一個訊號!!

可以看到,

Condition的執行方式,是當在執行緒1中呼叫await方法後,執行緒1將釋放鎖,並且將自己沉睡,等待喚醒,

執行緒2獲取到鎖後,開始做事,完畢後,呼叫Condition的signal方法,喚醒執行緒1,執行緒1恢復執行。

以上說明Condition是一個多執行緒間協調通訊的工具類,使得某個,或者某些執行緒一起等待某個條件(Condition),只有當該條件具備( signal 或者 signalAll方法被帶呼叫)時 ,這些等待執行緒才會被喚醒,從而重新爭奪鎖。

那,它是怎麼實現的呢?

首先還是要明白,reentrantLock.newCondition() 傳回的是Condition的一個實現,該類在AbstractQueuedSynchronizer中被實現,叫做newCondition()

  1. public Condition newCondition()   {

  2.  return sync.newCondition();

  3. }

它可以訪問AbstractQueuedSynchronizer中的方法和其餘內部類(AbstractQueuedSynchronizer是個抽象類,至於他怎麼能訪問,這裡有個很奇妙的點,後面我專門用demo說明 )

現在,我們一起來看下Condition類的實現,還是從上面的demo入手,

為了方便書寫,我將AbstractQueuedSynchronizer縮寫為AQS

當await被呼叫時,程式碼如下:

  1. public final void await() throws InterruptedException {

  2.    if (Thread.interrupted())

  3.        throw new InterruptedException();

  4.    Node node = addConditionWaiter(); // 將當前執行緒包裝下後,

  5.                                      // 新增到Condition自己維護的一個連結串列中。

  6.    int savedState = fullyRelease(node);// 釋放當前執行緒佔有的鎖,從demo中看到,

  7.                                        // 呼叫await前,當前執行緒是佔有鎖的

  8.    int interruptMode = 0;

  9.    while (!isOnSyncQueue(node)) {// 釋放完畢後,遍歷AQS的佇列,看當前節點是否在佇列中,

  10.        // 不在 說明它還沒有競爭鎖的資格,所以繼續將自己沉睡。

  11.        // 直到它被加入到佇列中,聰明的你可能猜到了,

  12.        // 沒有錯,在singal的時候加入不就可以了?

  13.        LockSupport.park(this);

  14.        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

  15.            break;

  16.    }

  17.    // 被喚醒後,重新開始正式競爭鎖,同樣,如果競爭不到還是會將自己沉睡,等待喚醒重新開始競爭。

  18.    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

  19.        interruptMode = REINTERRUPT;

  20.    if (node.nextWaiter != null)

  21.        unlinkCancelledWaiters();

  22.    if (interruptMode != 0)

  23.        reportInterruptAfterWait(interruptMode);

  24. }

回到上面的demo,鎖被釋放後,執行緒1開始沉睡,這個時候執行緒因為執行緒1沉睡時,會喚醒AQS佇列中的頭結點,所所以執行緒2會開始競爭鎖,並獲取到,等待3秒後,執行緒2會呼叫signal方法,“發出”signal訊號,signal方法如下:

  1. public final void signal() {

  2.    if (!isHeldExclusively())

  3.        throw new IllegalMonitorStateException();

  4.    Node first = firstWaiter; // firstWaiter為condition自己維護的一個連結串列的頭結點,

  5.                              // 取出第一個節點後開始喚醒操作

  6.    if (first != null)

  7.        doSignal(first);

  8. }

說明下,其實Condition內部維護了等待佇列的頭結點和尾節點,該佇列的作用是存放等待signal訊號的執行緒,該執行緒被封裝為Node節點後存放於此。

  1. public class ConditionObject implements Condition, java.io.Serializable {

  2.    private static final long serialVersionUID = 1173984872572414699L;

  3.    /** First node of condition queue. */

  4.    private transient Node firstWaiter;

  5.    /** Last node of condition queue. */

  6.    private transient Node lastWaiter;

關鍵的就在於此,我們知道AQS自己維護的佇列是當前等待資源的佇列,AQS會在資源被釋放後,依次喚醒佇列中從前到後的所有節點,使他們對應的執行緒恢復執行。直到佇列為空。

而Condition自己也維護了一個佇列,該佇列的作用是維護一個等待signal訊號的佇列,兩個佇列的作用是不同,事實上,每個執行緒也僅僅會同時存在以上兩個佇列中的一個,流程是這樣的:

  • 執行緒1呼叫reentrantLock.lock時,執行緒被加入到AQS的等待佇列中。

  • 執行緒1呼叫await方法被呼叫時,該執行緒從AQS中移除,對應操作是鎖的釋放。

  • 接著馬上被加入到Condition的等待佇列中,以為著該執行緒需要signal訊號。

  • 執行緒2,因為執行緒1釋放鎖的關係,被喚醒,並判斷可以獲取鎖,於是執行緒2獲取鎖,並被加入到AQS的等待佇列中。

  • 執行緒2呼叫signal方法,這個時候Condition的等待佇列中只有執行緒1一個節點,於是它被取出來,並被加入到AQS的等待佇列中。 註意,這個時候,執行緒1 並沒有被喚醒。

  • signal方法執行完畢,執行緒2呼叫reentrantLock.unLock()方法,釋放鎖。這個時候因為AQS中只有執行緒1,於是,AQS釋放鎖後按從頭到尾的順序喚醒執行緒時,執行緒1被喚醒,於是執行緒1回覆執行。

  • 直到釋放所整個過程執行完畢。

可以看到,整個協作過程是靠結點在AQS的等待佇列和Condition的等待佇列中來回移動實現的,Condition作為一個條件類,很好的自己維護了一個等待訊號的佇列,併在適時的時候將結點加入到AQS的等待佇列中來實現的喚醒操作。

看到這裡,signal方法的程式碼應該不難理解了。

取出頭結點,然後doSignal

  1. public final void signal() {

  2.    if (!isHeldExclusively()) {

  3.        throw new IllegalMonitorStateException();

  4.    }

  5.    Node first = firstWaiter;

  6.    if (first != null) {

  7.        doSignal(first);

  8.    }

  9. }

  10. private void doSignal(Node first) {

  11.    do {

  12.        if ((firstWaiter = first.nextWaiter) == null) // 修改頭結點,完成舊頭結點的移出工作

  13.            lastWaiter = null;

  14.        first.nextWaiter = null;

  15.    } while (!transferForSignal(first) && // 將老的頭結點,加入到AQS的等待佇列中

  16.             (first = firstWaiter) != null);

  17. }

  18. final boolean transferForSignal(Node node) {

  19.    /*

  20.     * If cannot change waitStatus, the node has been cancelled.

  21.     */

  22.    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

  23.        return false;

  24.    /*

  25.     * Splice onto queue and try to set waitStatus of predecessor to

  26.     * indicate that thread is (probably) waiting. If cancelled or attempt

  27.     * to set waitStatus fails, wake up to resync (in which case the

  28.     * waitStatus can be transiently and harmlessly wrong).

  29.     */

  30.    Node p = enq(node);

  31.    int ws = p.waitStatus;

  32.    // 如果該結點的狀態為cancel 或者修改waitStatus失敗,則直接喚醒。

  33.    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

  34.        LockSupport.unpark(node.thread);

  35.    return true;

  36. }

可以看到,正常情況 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)這個判斷是不會為true的,所以,不會在這個時候喚醒該執行緒。

只有到傳送signal訊號的執行緒呼叫reentrantLock.unlock()後因為它已經被加到AQS的等待佇列中,所以才會被喚醒。

總結:

本文從程式碼的角度說明瞭Condition的實現方式,其中,涉及到了AQS的很多操作,比如AQS的等待佇列實現獨佔鎖功能,不過,這不是本文討論的重點,等有機會再將AQS的實現單獨分享出來。

如何擴充套件和最佳化執行緒池?

多執行緒:為什麼在while迴圈中加入System.out.println,執行緒可以停止

快速排序演演算法到底有多快?

微服務實戰:使用API Gateway

MySQL的索引是什麼?怎麼最佳化?

在一個千萬級的資料庫查尋中,如何提高查詢效率?

你真的瞭解try{ return }finally{}中的return?

推薦四十多條純乾貨 Java 程式碼最佳化建議

一個致命的 Redis 命令,導致公司損失 400 萬!!

美團三面:一個執行緒OOM,行程裡其他執行緒還能執行麼?

END


>>>>>> 加群交流技術 <<<<<<

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖