精品專欄
作者:liuinsect
原文出處:http://www.liuinsect.com/?p=69
在java.util.concurrent包中,有兩個很特殊的工具類,Condition和ReentrantLock,使用過的人都知道,ReentrantLock(重入鎖)是jdk的concurrent包提供的一種獨佔鎖的實現。它繼承自Dong Lea的 AbstractQueuedSynchronizer(同步器),確切的說是ReentrantLock的一個內部類繼承了AbstractQueuedSynchronizer,ReentrantLock只不過是代理了該類的一些方法,可能有人會問為什麼要使用內部類在包裝一層? 我想是安全的關係,因為AbstractQueuedSynchronizer中有很多方法,還實現了共享鎖,Condition(稍候再細說)等功能,如果直接使ReentrantLock繼承它,則很容易出現AbstractQueuedSynchronizer中的API被無用的情況。
言歸正傳,今天,我們討論下Condition工具類的實現。
ReentrantLock和Condition的使用方式通常是這樣的:
public static void main(String[] args) {
final ReentrantLock reentrantLock = new ReentrantLock();
final Condition condition = reentrantLock.newCondition();
Thread thread = new Thread((Runnable) () -> {
try {
reentrantLock.lock();
System.out.println("我要等一個新訊號" + this);
condition.wait();
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("拿到一個訊號!!" + this);
reentrantLock.unlock();
}, "waitThread1");
thread.start();
Thread thread1 = new Thread((Runnable) () -> {
reentrantLock.lock();
System.out.println("我拿到鎖了");
try {
Thread.sleep(3000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();
System.out.println("我發了一個訊號!!");
reentrantLock.unlock();
}, "signalThread");
thread1.start();
}
執行後,結果如下:
我要等一個新訊號lock.ReentrantLockTest$1@a62fc3
我拿到鎖了
我發了一個訊號!!
拿到一個訊號!!
可以看到,
Condition的執行方式,是當在執行緒1中呼叫await方法後,執行緒1將釋放鎖,並且將自己沉睡,等待喚醒,
執行緒2獲取到鎖後,開始做事,完畢後,呼叫Condition的signal方法,喚醒執行緒1,執行緒1恢復執行。
以上說明Condition是一個多執行緒間協調通訊的工具類,使得某個,或者某些執行緒一起等待某個條件(Condition),只有當該條件具備( signal 或者 signalAll方法被帶呼叫)時 ,這些等待執行緒才會被喚醒,從而重新爭奪鎖。
那,它是怎麼實現的呢?
首先還是要明白,reentrantLock.newCondition() 傳回的是Condition的一個實現,該類在AbstractQueuedSynchronizer中被實現,叫做newCondition()
public Condition newCondition() {
return sync.newCondition();
}
它可以訪問AbstractQueuedSynchronizer中的方法和其餘內部類(AbstractQueuedSynchronizer是個抽象類,至於他怎麼能訪問,這裡有個很奇妙的點,後面我專門用demo說明 )
現在,我們一起來看下Condition類的實現,還是從上面的demo入手,
為了方便書寫,我將AbstractQueuedSynchronizer縮寫為AQS
當await被呼叫時,程式碼如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 將當前執行緒包裝下後,
// 新增到Condition自己維護的一個連結串列中。
int savedState = fullyRelease(node);// 釋放當前執行緒佔有的鎖,從demo中看到,
// 呼叫await前,當前執行緒是佔有鎖的
int interruptMode = 0;
while (!isOnSyncQueue(node)) {// 釋放完畢後,遍歷AQS的佇列,看當前節點是否在佇列中,
// 不在 說明它還沒有競爭鎖的資格,所以繼續將自己沉睡。
// 直到它被加入到佇列中,聰明的你可能猜到了,
// 沒有錯,在singal的時候加入不就可以了?
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被喚醒後,重新開始正式競爭鎖,同樣,如果競爭不到還是會將自己沉睡,等待喚醒重新開始競爭。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
回到上面的demo,鎖被釋放後,執行緒1開始沉睡,這個時候執行緒因為執行緒1沉睡時,會喚醒AQS佇列中的頭結點,所所以執行緒2會開始競爭鎖,並獲取到,等待3秒後,執行緒2會呼叫signal方法,“發出”signal訊號,signal方法如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; // firstWaiter為condition自己維護的一個連結串列的頭結點,
// 取出第一個節點後開始喚醒操作
if (first != null)
doSignal(first);
}
說明下,其實Condition內部維護了等待佇列的頭結點和尾節點,該佇列的作用是存放等待signal訊號的執行緒,該執行緒被封裝為Node節點後存放於此。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
關鍵的就在於此,我們知道AQS自己維護的佇列是當前等待資源的佇列,AQS會在資源被釋放後,依次喚醒佇列中從前到後的所有節點,使他們對應的執行緒恢復執行。直到佇列為空。
而Condition自己也維護了一個佇列,該佇列的作用是維護一個等待signal訊號的佇列,兩個佇列的作用是不同,事實上,每個執行緒也僅僅會同時存在以上兩個佇列中的一個,流程是這樣的:
-
執行緒1呼叫reentrantLock.lock時,執行緒被加入到AQS的等待佇列中。
-
執行緒1呼叫await方法被呼叫時,該執行緒從AQS中移除,對應操作是鎖的釋放。
-
接著馬上被加入到Condition的等待佇列中,以為著該執行緒需要signal訊號。
-
執行緒2,因為執行緒1釋放鎖的關係,被喚醒,並判斷可以獲取鎖,於是執行緒2獲取鎖,並被加入到AQS的等待佇列中。
-
執行緒2呼叫signal方法,這個時候Condition的等待佇列中只有執行緒1一個節點,於是它被取出來,並被加入到AQS的等待佇列中。 註意,這個時候,執行緒1 並沒有被喚醒。
-
signal方法執行完畢,執行緒2呼叫reentrantLock.unLock()方法,釋放鎖。這個時候因為AQS中只有執行緒1,於是,AQS釋放鎖後按從頭到尾的順序喚醒執行緒時,執行緒1被喚醒,於是執行緒1回覆執行。
-
直到釋放所整個過程執行完畢。
可以看到,整個協作過程是靠結點在AQS的等待佇列和Condition的等待佇列中來回移動實現的,Condition作為一個條件類,很好的自己維護了一個等待訊號的佇列,併在適時的時候將結點加入到AQS的等待佇列中來實現的喚醒操作。
看到這裡,signal方法的程式碼應該不難理解了。
取出頭結點,然後doSignal
public final void signal() {
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if (first != null) {
doSignal(first);
}
}
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null) // 修改頭結點,完成舊頭結點的移出工作
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 將老的頭結點,加入到AQS的等待佇列中
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or attempt
* to set waitStatus fails, wake up to resync (in which case the
* waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
// 如果該結點的狀態為cancel 或者修改waitStatus失敗,則直接喚醒。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
可以看到,正常情況 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)這個判斷是不會為true的,所以,不會在這個時候喚醒該執行緒。
只有到傳送signal訊號的執行緒呼叫reentrantLock.unlock()後因為它已經被加到AQS的等待佇列中,所以才會被喚醒。
總結:
本文從程式碼的角度說明瞭Condition的實現方式,其中,涉及到了AQS的很多操作,比如AQS的等待佇列實現獨佔鎖功能,不過,這不是本文討論的重點,等有機會再將AQS的實現單獨分享出來。
多執行緒:為什麼在while迴圈中加入System.out.println,執行緒可以停止
你真的瞭解try{ return }finally{}中的return?
END
>>>>>> 加群交流技術 <<<<<<