歡迎光臨
每天分享高質量文章

【死磕Java併發】—–J.U.C之併發工具類:Semaphore

點選上方“Java技術驛站”,選擇“置頂公眾號”。

有內涵、有價值的文章第一時間送達!

此篇部落格所有原始碼均來自JDK 1.8

訊號量Semaphore是一個控制訪問多個共享資源的計數器,和CountDownLatch一樣,其本質上是一個“共享鎖”。

Semaphore,在API是這麼介紹的:

一個計數訊號量。從概念上講,訊號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 新增一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可物件,Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。

Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的執行緒數目。

下麵我們就一個停車場的簡單例子來闡述Semaphore:

為了簡單起見我們假設停車場僅有5個停車位,一開始停車場沒有車輛所有車位全部空著,然後先後到來三輛車,停車場車位夠,安排進去停車,然後又來三輛,這個時候由於只有兩個停車位,所有隻能停兩輛,其餘一輛必須在外面候著,直到停車場有空車位,當然以後每來一輛都需要在外面候著。當停車場有車開出去,裡面有空位了,則安排一輛車進去(至於是哪輛 要看選擇的機制是公平還是非公平)。

從程式角度看,停車場就相當於訊號量Semaphore,其中許可數為5,車輛就相對執行緒。當來一輛車時,許可數就會減 1 ,當停車場沒有車位了(許可書 == 0 ),其他來的車輛需要在外面等候著。如果有一輛車開出停車場,許可數 + 1,然後放進來一輛車。

號量Semaphore是一個非負整數(>=1)。當一個執行緒想要訪問某個共享資源時,它必須要先獲取Semaphore,當Semaphore >0時,獲取該資源並使Semaphore – 1。如果Semaphore值 = 0,則表示全部的共享資源已經被其他執行緒全部佔用,執行緒必須要等待其他執行緒釋放資源。當執行緒釋放資源時,Semaphore則+1

實現分析

Semaphore結構如下:

從上圖可以看出Semaphore內部包含公平鎖(FairSync)和非公平鎖(NonfairSync),繼承內部類Sync,其中Sync繼承AQS(再一次闡述AQS的重要性)。

Semaphore提供了兩個建構式:

  1. Semaphore(int permits) :建立具有給定的許可數和非公平的公平設定的 Semaphore。 

  2. Semaphore(int permits, boolean fair) :建立具有給定的許可數和給定的公平設定的 Semaphore。 

實現如下:

  1.    public Semaphore(int permits) {

  2.        sync = new NonfairSync(permits);

  3.    }

  4.    public Semaphore(int permits, boolean fair) {

  5.        sync = fair ? new FairSync(permits) : new NonfairSync(permits);

  6.    }

Semaphore預設選擇非公平鎖。

當訊號量Semaphore = 1 時,它可以當作互斥鎖使用。其中0、1就相當於它的狀態,當=1時表示其他執行緒可以獲取,當=0時,排他,即其他執行緒必須要等待。

訊號量獲取

Semaphore提供了acquire()方法來獲取一個許可。

  1.    public void acquire() throws InterruptedException {

  2.        sync.acquireSharedInterruptibly(1);

  3.    }

內部呼叫AQS的acquireSharedInterruptibly(int arg),該方法以共享樣式獲取同步狀態:

  1.    public final void acquireSharedInterruptibly(int arg)

  2.            throws InterruptedException {

  3.        if (Thread.interrupted())

  4.            throw new InterruptedException();

  5.        if (tryAcquireShared(arg) < 0)

  6.            doAcquireSharedInterruptibly(arg);

  7.    }

在acquireSharedInterruptibly(int arg)中,tryAcquireShared(int arg)由子類來實現,對於Semaphore而言,如果我們選擇非公平樣式,則呼叫NonfairSync的tryAcquireShared(int arg)方法,否則呼叫FairSync的tryAcquireShared(int arg)方法。

公平

  1.    protected int tryAcquireShared(int acquires) {

  2.        for (;;) {

  3.            //判斷該執行緒是否位於CLH佇列的列頭

  4.            if (hasQueuedPredecessors())

  5.                return -1;

  6.            //獲取當前的訊號量許可

  7.            int available = getState();

  8.            //設定“獲得acquires個訊號量許可之後,剩餘的訊號量許可數”

  9.            int remaining = available - acquires;

  10.            //CAS設定訊號量

  11.            if (remaining < 0 ||

  12.                    compareAndSetState(available, remaining))

  13.                return remaining;

  14.        }

  15.    }

非公平

對於非公平而言,因為它不需要判斷當前執行緒是否位於CLH同步佇列列頭,所以相對而言會簡單些。

  1.        protected int tryAcquireShared(int acquires) {

  2.            return nonfairTryAcquireShared(acquires);

  3.        }

  4.        final int nonfairTryAcquireShared(int acquires) {

  5.            for (;;) {

  6.                int available = getState();

  7.                int remaining = available - acquires;

  8.                if (remaining < 0 ||

  9.                    compareAndSetState(available, remaining))

  10.                    return remaining;

  11.            }

  12.        }

訊號量釋放

獲取了許可,當用完之後就需要釋放,Semaphore提供release()來釋放許可。

  1.    public void release() {

  2.        sync.releaseShared(1);

  3.    }

內部呼叫AQS的releaseShared(int arg):

  1.    public final boolean releaseShared(int arg) {

  2.        if (tryReleaseShared(arg)) {

  3.            doReleaseShared();

  4.            return true;

  5.        }

  6.        return false;

  7.    }

releaseShared(int arg)呼叫Semaphore內部類Sync的tryReleaseShared(int arg):

  1.    protected final boolean tryReleaseShared(int releases) {

  2.        for (;;) {

  3.            int current = getState();

  4.            //訊號量的許可數 = 當前訊號許可數 + 待釋放的訊號許可數

  5.            int next = current + releases;

  6.            if (next < current) // overflow

  7.                throw new Error("Maximum permit count exceeded");

  8.            //設定可獲取的訊號許可數為next

  9.            if (compareAndSetState(current, next))

  10.                return true;

  11.        }

  12.    }

對於訊號量的獲取釋放詳細過程,請參考如下部落格:

  1. 【死磕Java併發】-----J.U.C之AQS:CLH同步佇列

  2. 【死磕Java併發】-----J.U.C之AQS:同步狀態的獲取與釋放

  3. 【死磕Java併發】-----J.U.C之AQS:阻塞和喚醒執行緒

  4. 【死磕Java併發】-----J.U.C之重入鎖:ReentrantLock

應用示例

我們已停車為示例:

  1. public class SemaphoreTest {

  2.    static class Parking{

  3.        //訊號量

  4.        private Semaphore semaphore;

  5.        Parking(int count){

  6.            semaphore = new Semaphore(count);

  7.        }

  8.        public void park(){

  9.            try {

  10.                //獲取訊號量

  11.                semaphore.acquire();

  12.                long time = (long) (Math.random() * 10);

  13.                System.out.println(Thread.currentThread().getName() + "進入停車場,停車" + time + "秒..." );

  14.                Thread.sleep(time);

  15.                System.out.println(Thread.currentThread().getName() + "開出停車場...");

  16.            } catch (InterruptedException e) {

  17.                e.printStackTrace();

  18.            } finally {

  19.                semaphore.release();

  20.            }

  21.        }

  22.    }

  23.    static class Car extends Thread {

  24.        Parking parking ;

  25.        Car(Parking parking){

  26.            this.parking = parking;

  27.        }

  28.        @Override

  29.        public void run() {

  30.            parking.park();     //進入停車場

  31.        }

  32.    }

  33.    public static void main(String[] args){

  34.        Parking parking = new Parking(3);

  35.        for(int i = 0 ; i < 5 ; i++){

  36.            new Car(parking).start();

  37.        }

  38.    }

  39. }

執行結果如下:

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖