(點選上方公眾號,可快速關註)
來源:五月的倉頡 ,
www.cnblogs.com/xrq730/p/7067904.html
共享樣式acquire實現流程
上文我們講解了AbstractQueuedSynchronizer獨佔樣式的acquire實現流程,本文趁熱打鐵繼續看一下AbstractQueuedSynchronizer共享樣式acquire的實現流程。連續兩篇文章的學習,也可以對比獨佔樣式acquire和共享樣式acquire的區別,加深對於AbstractQueuedSynchronizer的理解。
先看一下共享樣式acquire的實現,方法為acquireShared和acquireSharedInterruptibly,兩者差別不大,區別就在於後者有中斷處理,以acquireShared為例:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
這裡就能看出第一個差別來了:獨佔樣式acquire的時候子類重寫的方法tryAcquire傳回的是boolean,即是否tryAcquire成功;共享樣式acquire的時候,傳回的是一個int型變數,判斷是否<0。doAcquireShared方法的實現為:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
我們來分析一下這段程式碼做了什麼:
-
addWaiter,把所有tryAcquireShared<0的執行緒實體化出一個Node,構建為一個FIFO佇列,這和獨佔鎖是一樣的
-
拿當前節點的前驅節點,只有前驅節點是head的節點才能tryAcquireShared,這和獨佔鎖也是一樣的
-
前驅節點不是head的,執行”shouldParkAfterFailedAcquire() && parkAndCheckInterrupt()”,for(;;)迴圈,”shouldParkAfterFailedAcquire()”方法執行2次,當前執行緒阻塞,這和獨佔鎖也是一樣的
確實,共享樣式下的acquire和獨佔樣式下的acquire大部分邏輯差不多,最大的差別在於tryAcquireShared成功之後,獨佔樣式的acquire是直接將當前節點設定為head節點即可,共享樣式會執行setHeadAndPropagate方法,顧名思義,即在設定head之後多執行了一步propagate操作。setHeadAndPropagate方法原始碼為:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don’t know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
第3行的程式碼設定重設head,第2行的程式碼由於第3行的程式碼要重設head,因此先定義一個Node型變數h獲得原head的地址,這兩行程式碼很簡單。
第19行~第23行的程式碼是獨佔鎖和共享鎖最不一樣的一個地方,我們再看獨佔鎖acquireQueued的程式碼:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這意味著獨佔鎖某個節點被喚醒之後,它只需要將這個節點設定成head就完事了,而共享鎖不一樣,某個節點被設定為head之後,如果它的後繼節點是SHARED狀態的,那麼將繼續透過doReleaseShared方法嘗試往後喚醒節點,實現了共享狀態的向後傳播。
共享樣式release實現流程
上面講了共享樣式下acquire是如何實現的,下麵再看一下release的實現流程,方法為releaseShared:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared方法是子類實現的,如果tryReleaseShared成功,那麼執行doReleaseShared()方法:
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
主要是兩層邏輯:
-
頭結點本身的waitStatus是SIGNAL且能透過CAS演演算法將頭結點的waitStatus從SIGNAL設定為0,喚醒頭結點的後繼節點
-
頭結點本身的waitStatus是0的話,嘗試將其設定為PROPAGATE狀態的,意味著共享狀態可以向後傳播
Condition的await()方法實現原理—-構建等待佇列
我們知道,Condition是用於實現通知/等待機制的,和Object的wait()/notify()一樣,由於本文之前描述AbstractQueuedSynchronizer的共享樣式的篇幅不是很長,加之Condition也是AbstractQueuedSynchronizer的一部分,因此將Condition也放在這裡寫了。
Condition分為await()和signal()兩部分,前者用於等待、後者用於喚醒,首先看一下await()是如何實現的。Condition本身是一個介面,其在AbstractQueuedSynchronizer中的實現為ConditionObject:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
…
}
這裡貼了一些欄位定義,後面都是方法就不貼了,會對重點方法進行分析的。從欄位定義我們可以看到,ConditionObject全域性性地記錄了第一個等待的節點與最後一個等待的節點。
像ReentrantLock每次要使用ConditionObject,直接new一個ConditionObject出來即可。我們關註一下await()方法的實現:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
第2行~第3行的程式碼用於處理中斷,第4行程式碼比較關鍵,新增Condition的等待者,看一下實現:
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
首先拿到佇列(註意資料結構,Condition構建出來的也是一個佇列)中最後一個等待者,緊接著第4行的的判斷,判斷最後一個等待者的waitStatus不是CONDITION的話,執行第5行的程式碼,解綁取消的等待者,因為透過第8行的程式碼,我們看到,new出來的Node的狀態都是CONDITION的。
那麼unlinkCancelledWaiters做了什麼?裡面的流程就不看了,就是一些指標遍歷並判斷狀態的操作,總結一下就是:從頭到尾遍歷每一個Node,遇到Node的waitStatus不是CONDITION的就從佇列中踢掉,該節點的前後節點相連。
接著第8行的程式碼前面說過了,new出來了一個Node,儲存了當前執行緒,waitStatus是CONDITION,接著第9行~第13行的操作很好理解:
-
如果lastWaiter是null,說明FIFO佇列中沒有任何Node,firstWaiter=Node
-
如果lastWaiter不是null,說明FIFO佇列中有Node,原lastWaiter的next指向Node
-
無論如何,新加入的Node程式設計lastWaiter,即新加入的Node一定是在最後面
用一張圖表示一下構建的資料結構就是:
對比學習,我們總結一下Condition構建出來的佇列和AbstractQueuedSynchronizer構建出來的佇列的差別,主要體現在2點上:
-
AbstractQueuedSynchronizer構建出來的佇列,頭節點是一個沒有Thread的空節點,其標識作用,而Condition構建出來的佇列,頭節點就是真正等待的節點
-
AbstractQueuedSynchronizer構建出來的佇列,節點之間有next與pred相互標識該節點的前一個節點與後一個節點的地址,而Condition構建出來的佇列,只使用了nextWaiter標識下一個等待節點的地址
整個過程中,我們看到沒有使用任何CAS操作,firstWaiter和lastWaiter也沒有用volatile修飾,其實原因很簡單:要await()必然要先lock(),既然lock()了就表示沒有競爭,沒有競爭自然也沒必要使用volatile+CAS的機制去保證什麼。
Condition的await()方法實現原理—-執行緒等待
前面我們看了Condition構建等待佇列的過程,接下來我們看一下等待的過程,await()方法的程式碼比較短,再貼一下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
構建完畢佇列之後,執行第5行的fullyRelease方法,顧名思義:fullyRelease方法的作用是完全釋放Node的狀態。方法實現為:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
這裡第4行獲取state,第5行release的時候將整個state傳過去,理由是某執行緒可能多次呼叫了lock()方法,比如呼叫了10次lock,那麼此執行緒就將state加到了10,所以這裡要將10傳過去,將狀態全部釋放,這樣後面的執行緒才能重新從state=0開始競爭鎖,這也是方法被命名為fullyRelease的原因,因為要完全釋放鎖,釋放鎖之後,如果有競爭鎖的執行緒,那麼就喚醒第一個,這都是release方法的邏輯了,前面的文章詳細講解過。
接著看await()方法的第7行判斷”while(!isOnSyncQueue(node))”:
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
註意這裡的判斷是Node是否在AbstractQueuedSynchronizer構建的佇列中而不是Node是否在Condition構建的佇列中,如果Node不在AbstractQueuedSynchronizer構建的佇列中,那麼呼叫LockSupport的park方法阻塞。
至此呼叫await()方法的執行緒構建Condition等待佇列–釋放鎖–等待的過程已經全部分析完畢。
Condition的signal()實現原理
上面的程式碼分析了構建Condition等待佇列–釋放鎖–等待的過程,接著看一下signal()方法通知是如何實現的:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
首先從第2行的程式碼我們看到,要能signal(),當前執行緒必須持有獨佔鎖,否則丟擲異常IllegalMonitorStateException。
那麼真正操作的時候,獲取第一個waiter,如果有waiter,呼叫doSignal方法:
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
第3行~第5行的程式碼很好理解:
-
重新設定firstWaiter,指向第一個waiter的nextWaiter
-
如果第一個waiter的nextWaiter為null,說明當前佇列中只有一個waiter,lastWaiter置空
-
因為firstWaiter是要被signal的,因此它沒什麼用了,nextWaiter置空
接著執行第6行和第7行的程式碼,這裡重點就是第6行的transferForSignal方法:
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
方法本意是將一個節點從Condition佇列轉換為AbstractQueuedSynchronizer佇列,總結一下方法的實現:
-
嘗試將Node的waitStatus從CONDITION置為0,這一步失敗直接傳回false
-
當前節點進入呼叫enq方法進入AbstractQueuedSynchronizer佇列
-
當前節點透過CAS機制將waitStatus置為SIGNAL
最後上面的步驟全部成功,傳回true,傳回true喚醒等待節點成功。從喚醒的程式碼我們可以得出一個重要結論:某個await()的節點被喚醒之後並不意味著它後面的程式碼會立即執行,它會被加入到AbstractQueuedSynchronizer佇列的尾部,只有前面等待的節點獲取鎖全部完畢才能輪到它。
程式碼分析到這裡,我想類似的signalAll方法也沒有必要再分析了,顯然signalAll方法的作用就是將所有Condition佇列中等待的節點逐一佇列中從移除,由CONDITION狀態變為SIGNAL狀態並加入AbstractQueuedSynchronizer佇列的尾部。
程式碼示例
可能大家看了我分析半天程式碼會有點迷糊,這裡最後我貼一段我用於驗證上面Condition結論的示例程式碼,首先建立一個Thread,我將之命名為ConditionThread:
/**
* @author 五月的倉頡http://www.cnblogs.com/xrq730/p/7067904.html
*/
public class ConditionThread implements Runnable {
private Lock lock;
private Condition condition;
public ConditionThread(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
if (“執行緒0”.equals(JdkUtil.getThreadName())) {
thread0Process();
} else if (“執行緒1”.equals(JdkUtil.getThreadName())) {
thread1Process();
} else if (“執行緒2”.equals(JdkUtil.getThreadName())) {
thread2Process();
}
}
private void thread0Process() {
try {
lock.lock();
System.out.println(“執行緒0休息5秒”);
JdkUtil.sleep(5000);
condition.signal();
System.out.println(“執行緒0喚醒等待執行緒”);
} finally {
lock.unlock();
}
}
private void thread1Process() {
try {
lock.lock();
System.out.println(“執行緒1阻塞”);
condition.await();
System.out.println(“執行緒1被喚醒”);
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
}
private void thread2Process() {
try {
System.out.println(“執行緒2想要獲取鎖”);
lock.lock();
System.out.println(“執行緒2獲取鎖成功”);
} finally {
lock.unlock();
}
}
}
這個類裡面的方法就不解釋了,反正就三個方法片段,根據執行緒名判斷,每個線層執行的是其中的一個程式碼片段。寫一段測試程式碼:
/**
* @author 五月的倉頡http://www.cnblogs.com/xrq730/p/7067904.html
*/
@Test
public void testCondition() throws Exception {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// 執行緒0的作用是signal
Runnable runnable0 = new ConditionThread(lock, condition);
Thread thread0 = new Thread(runnable0);
thread0.setName(“執行緒0”);
// 執行緒1的作用是await
Runnable runnable1 = new ConditionThread(lock, condition);
Thread thread1 = new Thread(runnable1);
thread1.setName(“執行緒1”);
// 執行緒2的作用是lock
Runnable runnable2 = new ConditionThread(lock, condition);
Thread thread2 = new Thread(runnable2);
thread2.setName(“執行緒2”);
thread1.start();
Thread.sleep(1000);
thread0.start();
Thread.sleep(1000);
thread2.start();
thread1.join();
}
測試程式碼的意思是:
-
執行緒1先啟動,獲取鎖,呼叫await()方法等待
-
執行緒0後啟動,獲取鎖,休眠5秒準備signal()
-
執行緒2最後啟動,獲取鎖,由於執行緒0未使用完畢鎖,因此執行緒2排隊,可以此時由於執行緒0還未signal(),因此執行緒1在執行緒0執行signal()後,在AbstractQueuedSynchronizer佇列中的順序是在執行緒2後面的
程式碼執行結果為:
1 執行緒1阻塞
2 執行緒0休息5秒
3 執行緒2想要獲取鎖
4 執行緒0喚醒等待執行緒
5 執行緒2獲取鎖成功
6 執行緒1被喚醒
符合我們的結論:signal()並不意味著被喚醒的執行緒立即執行。由於執行緒2先於執行緒0排隊,因此看到第5行列印的內容,執行緒2先獲取鎖。
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能