(給ImportNew加星標,提高Java技能)
作者:Pedlar(本文來自作者投稿)
本文分析的ReentrantLock所對應的Java版本為JDK8。
在閱讀本文前,讀者應該知道什麼是CAS、自旋。
本文大綱
- ReentrantLock公平鎖簡介
- AQS
- lock方法
- unlock方法
1. ReentrantLock公平鎖簡介
ReentrantLock是JUC(java.util.concurrent)包中Lock介面的一個實現類,它是基於AbstractQueuedSynchronizer(下文簡稱AQS)來實現鎖的功能。ReentrantLock的內部類Sync繼承了AbstractQueuedSynchronizer,Sync又有FairSync和NonFairSync兩個子類。FairSync實現了公平鎖相關的操作,NonFairSync實現了非公平鎖相關的操作。它們之間的關係如下:
公平鎖的公平之處主要體現在,對於一個新來的執行緒,如果鎖沒有被佔用,它會判斷等待佇列中是否還有其它的等待執行緒,如果有的話,就加入等待佇列隊尾,否則就去搶佔鎖。
下麵這段程式碼展示了公平鎖的使用方法:
private final Lock lock = new ReentrantLock(true); // 引數true代表建立公平鎖
public void method() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock();
}
}
2. AQS
下麵簡單介紹一下AQS中的Node內部類和幾個重要的成員變數。
2.1 Node
AQS中,維護了一個Node內部類,用於包裝我們的執行緒。我們需要關註Node中的如下屬性:
- pre:當前節點的前驅節點。
- next:當前節點的後繼節點。
- thread:thread表示被包裝的執行緒。
- waitStatus:waitStatus是一個int整型,可以被賦予如下幾種值:
static final int CANCELLED = 1; // 執行緒被取消
static final int SIGNAL = -1; // 後繼節點中的執行緒需要被喚醒
static final int CONDITION = -2; // 暫不關註
static final int PROPAGATE = -3; // 暫不關註
另外,當一個新的Node被建立時,waitStatus是0。
2.2 head
head指向佇列中的隊首元素,可以理解為當前持有鎖的執行緒。
2.3 tail
tail指向佇列中的隊尾元素。
2.4 state
state表示在ReentrantLock中可以理解為鎖的狀態,0表示當前鎖沒有被佔用,大於0的數表示鎖被當前執行緒重入的次數。例如,當state等於2時,表示當前執行緒在這把鎖上進入了兩次。
2.5 exclusiveOwnerThread
表示當前佔用鎖的執行緒。
2.6 等待佇列
下圖簡單展示了AQS中的等待佇列:
3. lock方法
有了上面的AQS的基礎知識後,我們就可以展開對ReentrantLock公平鎖的分析了,先從lock方法入手。
ReentrantLock中的lock方法很簡單,只是呼叫了Sync類(本文研究公平鎖,所以應該是FairSync類)中的lock方法:
public void lock() {
sync.lock();
}
我們跟到FairSync的lock方法,程式碼也很簡單,呼叫了AQS中的acquire方法:
final void lock() {
acquire(1);
}
acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 呼叫tryAcquire嘗試去獲取鎖,如果獲取成功,則方法結束
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果獲取鎖失敗,執行acquireQueued方法,將把當前執行緒排入隊尾
selfInterrupt();
}
tryAcquire方法:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 獲取鎖的狀態
if (c == 0) { // 如果狀態是0,表示鎖沒有被佔用
if (!hasQueuedPredecessors() && // 判斷是佇列中是否有排隊中的執行緒
compareAndSetState(0, acquires)) { // 佇列中沒有排隊的執行緒,則嘗試用CAS去獲取一下鎖
setExclusiveOwnerThread(current); // 獲取鎖成功,則將當前佔有鎖的執行緒設定為當前執行緒
return true;
}
}
// 鎖被佔用、佇列中有排隊的執行緒或者當前執行緒在獲取鎖的時候失敗將執行下麵的程式碼
else if (current == getExclusiveOwnerThread()) { // 當前執行緒是否是佔有鎖的執行緒
int nextc = c + acquires; // 是的話,表示當前執行緒是重入這把鎖,將鎖的狀態進行加1
if (nextc < 0)
throw new Error("Maximum lock count exceeded"); // 鎖的重入次數超過int能夠表示最大的值,丟擲異常
setState(nextc); // 設定鎖的狀態
return true;
}
return false; // 沒有獲取到鎖
}
hasQueuedPredecessors方法:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t && // 佇列中的隊首和隊尾元素不相同
((s = h.next) == null || s.thread != Thread.currentThread()); // 佇列中的第二個元素不為null,且第二個元素中的執行緒不是當前執行緒。這裡如果傳回true,說明佇列中至少存在tail、head兩個節點,就會執行acquireQueued將當前執行緒加入隊尾
}
如果tryAcquire沒有獲取到鎖,將執行:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
我們先分析addWaiter方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 將當前執行緒包裝成Node,mode引數值為null,表示獨佔樣式
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred; // 如果佇列中的尾節點不為空,將當前node的前驅節點設定為之前佇列中的tail
if (compareAndSetTail(pred, node)) { // 用CAS把當前node設定為隊尾元素
pred.next = node; // 成功的話,則將之前隊尾元素的後繼節點設定為當前節點。如果這裡不清楚的話,請結合前面講等待佇列的那張圖進行理解。
return node;
}
}
enq(node); // 隊尾節點為空,或者用CAS設定隊尾元素失敗,則用自旋的方式入隊
return node;
}
enq方法:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node())) // 隊尾元素為空,建立一個空的Node,並設定為隊首
tail = head; // 設定隊首和隊尾為同一個空Node,進入下一次迴圈
} else {
node.prev = t; // 如果佇列中的尾節點不為空,將當前node的前驅節點設定為之前佇列中的tail
if (compareAndSetTail(t, node)) { // 用CAS把當前node設定為隊尾元素
t.next = node; // 成功的話,則將之前隊尾元素的後繼節點設定為當前節點
return t;
}
}
}
}
下麵這張圖反應了上面enq方法的處理流程:
經過上面的方法,當前node已經加入等待佇列的隊尾,接下來將執行acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 獲取node的前驅節點
if (p == head && tryAcquire(arg)) { // 如果node的前驅是head,它將去嘗試獲取鎖(tryAcquire方法在前面已經分析過)
setHead(node); // 獲取成功,則將node設定為head
p.next = null; // 將之前的head的後繼節點置空
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 當前node的前驅不是head,將為當前node找到一個能夠將其喚醒的前驅節點;或者當前node的前驅是head,但是獲取鎖失敗
parkAndCheckInterrupt()) // 將當前執行緒掛起
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire方法的作用就是找到一個能夠喚醒當前node的節點:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 開始時是0
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true; // 前驅節點的狀態是-1,會喚醒後繼節點,可以將執行緒掛起
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev; // 前驅節點中的執行緒被取消,那就需要一直迴圈直到找到一個沒有被設定為取消狀態的前驅節點
} while (pred.waitStatus > 0);
pred.next = node; // 從後向前找,將第一個非取消狀態的節點,設定這個節點的後繼節點設定為當前node
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // waitStatus是0或者-3的時候,這時waitStatus都將被設定為-1
// 即後繼節點需要前驅節點喚醒
}
return false; // 上層程式碼再進行一次迴圈,下次進入此方法時,將進入第一個if條件
}
找到了合適的前驅節點,parkAndCheckInterrupt方法當前執行緒掛起:
private final boolean parkAndCheckInterrupt() { // 將執行緒掛起,等待前驅節點的喚醒
LockSupport.park(this);
return Thread.interrupted();
}
4. unlock方法
ReentrantLock的unlock方法呼叫AQS中的release方法:
public void unlock() {
sync.release(1); // 呼叫AQS的release方法
}
release方法:
public final boolean release(int arg) {
if (tryRelease(arg)) { // 嘗試去釋放鎖
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 釋放鎖成功,head不為空,並且head的waitStatus不為0的情況下,將喚醒後繼節點
return true;
}
return false;
}
tryRelease方法:
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 將鎖的狀態減1
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException(); // 準備釋放鎖的執行緒不是持有鎖的執行緒,丟擲異常
boolean free = false;
if (c == 0) {
free = true; // 鎖的狀態是0,說明不存在重入的情況了,可以直接釋放了
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
鎖釋放成功,將喚醒後繼節點,unparkSuccessor方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; // 註意,這個node是head節點
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 當前node的狀態是小於0,將其狀態設定為0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // head節點的後繼節點
if (s == null || s.waitStatus > 0) {
s = null; // 執行到這表示head的後繼節點是1,處於取消的狀態
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t; // 從等待佇列的隊尾向前找,找到倒序的最後一個處於非取消狀態的節點
}
if (s != null)
LockSupport.unpark(s.thread); // 喚醒head後面的處於非取消狀態的第一個(正序)節點
}
到此,全文結束,大家看程式碼的時候結合圖來理解會容易很多。
朋友會在“發現-看一看”看到你“在看”的內容