此篇部落格所有原始碼均來自JDK 1.8
訊號量Semaphore是一個控制訪問多個共享資源的計數器,和CountDownLatch一樣,其本質上是一個“共享鎖”。
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群討論技術和原始碼。
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群討論技術和原始碼。
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群討論技術和原始碼。
Semaphore,在API是這麼介紹的:
一個計數訊號量。從概念上講,訊號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 新增一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可物件,Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。
Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的執行緒數目。
下麵我們就一個停車場的簡單例子來闡述Semaphore:
為了簡單起見我們假設停車場僅有5個停車位,一開始停車場沒有車輛所有車位全部空著,然後先後到來三輛車,停車場車位夠,安排進去停車,然後又來三輛,這個時候由於只有兩個停車位,所有隻能停兩輛,其餘一輛必須在外面候著,直到停車場有空車位,當然以後每來一輛都需要在外面候著。當停車場有車開出去,裡面有空位了,則安排一輛車進去(至於是哪輛 要看選擇的機制是公平還是非公平)。
從程式角度看,停車場就相當於訊號量Semaphore,其中許可數為5,車輛就相對執行緒。當來一輛車時,許可數就會減 1 ,當停車場沒有車位了(許可書 == 0 ),其他來的車輛需要在外面等候著。如果有一輛車開出停車場,許可數 + 1,然後放進來一輛車。
號量Semaphore是一個非負整數(>=1)。當一個執行緒想要訪問某個共享資源時,它必須要先獲取Semaphore,當Semaphore >0時,獲取該資源並使Semaphore – 1。如果Semaphore值 = 0,則表示全部的共享資源已經被其他執行緒全部佔用,執行緒必須要等待其他執行緒釋放資源。當執行緒釋放資源時,Semaphore則+1
實現分析
Semaphore結構如下:
從上圖可以看出Semaphore內部包含公平鎖(FairSync)和非公平鎖(NonfairSync),繼承內部類Sync,其中Sync繼承AQS(再一次闡述AQS的重要性)。
Semaphore提供了兩個建構式:
-
Semaphore(int permits) :建立具有給定的許可數和非公平的公平設定的 Semaphore。
-
Semaphore(int permits, boolean fair) :建立具有給定的許可數和給定的公平設定的 Semaphore。
實現如下:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore預設選擇非公平鎖。
當訊號量Semaphore = 1 時,它可以當作互斥鎖使用。其中0、1就相當於它的狀態,當=1時表示其他執行緒可以獲取,當=0時,排他,即其他執行緒必須要等待。
訊號量獲取
Semaphore提供了acquire()方法來獲取一個許可。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
內部呼叫AQS的acquireSharedInterruptibly(int arg),該方法以共享樣式獲取同步狀態:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
在acquireSharedInterruptibly(int arg)中,tryAcquireShared(int arg)由子類來實現,對於Semaphore而言,如果我們選擇非公平樣式,則呼叫NonfairSync的tryAcquireShared(int arg)方法,否則呼叫FairSync的tryAcquireShared(int arg)方法。
公平
protected int tryAcquireShared(int acquires) {
for (;;) {
//判斷該執行緒是否位於CLH佇列的列頭
if (hasQueuedPredecessors())
return -1;
//獲取當前的訊號量許可
int available = getState();
//設定“獲得acquires個訊號量許可之後,剩餘的訊號量許可數”
int remaining = available - acquires;
//CAS設定訊號量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
非公平
對於非公平而言,因為它不需要判斷當前執行緒是否位於CLH同步佇列列頭,所以相對而言會簡單些。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
訊號量釋放
獲取了許可,當用完之後就需要釋放,Semaphore提供release()來釋放許可。
public void release() {
sync.releaseShared(1);
}
內部呼叫AQS的releaseShared(int arg):
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared(int arg)呼叫Semaphore內部類Sync的tryReleaseShared(int arg):
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//訊號量的許可數 = 當前訊號許可數 + 待釋放的訊號許可數
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//設定可獲取的訊號許可數為next
if (compareAndSetState(current, next))
return true;
}
}
對於訊號量的獲取釋放詳細過程,請參考如下部落格:
-
【死磕Java併發】—–J.U.C之AQS:CLH同步佇列
-
【死磕Java併發】—–J.U.C之AQS:同步狀態的獲取與釋放
-
【死磕Java併發】—–J.U.C之AQS:阻塞和喚醒執行緒
-
【死磕Java併發】—–J.U.C之重入鎖:ReentrantLock
應用示例
我們已停車為示例:
public class SemaphoreTest {
static class Parking{
//訊號量
private Semaphore semaphore;
Parking(int count){
semaphore = new Semaphore(count);
}
public void park(){
try {
//獲取訊號量
semaphore.acquire();
long time = (long) (Math.random() * 10);
System.out.println(Thread.currentThread().getName() + "進入停車場,停車" + time + "秒..." );
Thread.sleep(time);
System.out.println(Thread.currentThread().getName() + "開出停車場...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
static class Car extends Thread {
Parking parking ;
Car(Parking parking){
this.parking = parking;
}
@Override
public void run() {
parking.park(); //進入停車場
}
}
public static void main(String[] args){
Parking parking = new Parking(3);
for(int i = 0 ; i < 5 ; i++){
new Car(parking).start();
}
}}
執行結果如下: