歡迎光臨
每天分享高質量文章

併發程式設計 – Concurrent 使用者指南( 上 )

(點選上方公眾號,可快速關註)


來源:高廣超 ,

www.jianshu.com/p/8cb5d816cb69

譯序

本指南根據 Jakob Jenkov 最新部落格翻譯,請隨時關註部落格更新:http://tutorials.jenkov.com/java-util-concurrent/index.html。

本指南已做成中英文對照閱讀版的 pdf 檔案,有興趣的朋友可以去 Java併發工具包java.util.concurrent使用者指南中英文對照閱讀版.pdf[帶書簽] 進行下載。

http://download.csdn.net/detail/defonds/8469189

1. java.util.concurrent – Java 併發工具包

Java 5 添加了一個新的包到 Java 平臺,java.util.concurrent 包。這個包包含有一系列能夠讓 Java 的併發程式設計變得更加簡單輕鬆的類。在這個包被新增以前,你需要自己去動手實現自己的相關工具類。

本文我將帶你一一認識 java.util.concurrent 包裡的這些類,然後你可以嘗試著如何在專案中使用它們。本文中我將使用 Java 6 版本,我不確定這和 Java 5 版本里的是否有一些差異。我不會去解釋關於 Java 併發的核心問題 – 其背後的原理,也就是說,如果你對那些東西感興趣,參考《Java 併發指南》。

http://tutorials.jenkov.com/java-concurrency/index.html

半成品

本文很大程度上還是個 “半成品”,所以當你發現一些被漏掉的類或介面時,請耐心等待。在我空閑的時候會把它們加進來的。

2. 阻塞佇列 BlockingQueue

java.util.concurrent 包裡的 BlockingQueue 介面表示一個執行緒安放入和提取實體的佇列。本小節我將給你演示如何使用這個 BlockingQueue。本節不會討論如何在 Java 中實現一個你自己的 BlockingQueue。如果你對那個感興趣,參考《Java 併發指南》

BlockingQueue 用法

BlockingQueue 通常用於一個執行緒生產物件,而另外一個執行緒消費這些物件的場景。下圖是對這個原理的闡述:

一個執行緒往裡邊放,另外一個執行緒從裡邊取的一個 BlockingQueue。

一個執行緒將會持續生產新物件並將其插入到佇列之中,直到佇列達到它所能容納的臨界點。也就是說,它是有限的。如果該阻塞佇列到達了其臨界點,負責生產的執行緒將會在往裡邊插入新物件時發生阻塞。它會一直處於阻塞之中,直到負責消費的執行緒從佇列中拿走一個物件。負責消費的執行緒將會一直從該阻塞佇列中拿出物件。如果消費執行緒嘗試去從一個空的佇列中提取物件的話,這個消費執行緒將會處於阻塞之中,直到一個生產執行緒把一個物件丟進佇列。

BlockingQueue 的方法

BlockingQueue 具有 4 組不同的方法用於插入、移除以及對佇列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:

四組不同的行為方式解釋:

  • 拋異常:如果試圖的操作無法立即執行,拋一個異常。

  • 特定值:如果試圖的操作無法立即執行,傳回一個特定的值(常常是 true / false)。

  • 阻塞:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行。

  • 超時:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。傳回一個特定值以告知該操作是否成功(典型的是 true / false)。

無法向一個 BlockingQueue 中插入 null。如果你試圖插入 null,BlockingQueue 將會丟擲一個 NullPointerException。

可以訪問到 BlockingQueue 中的所有元素,而不僅僅是開始和結束的元素。比如說,你將一個物件放入佇列之中以等待處理,但你的應用想要將其取消掉。那麼你可以呼叫諸如 remove(o) 方法來將佇列之中的特定物件進行移除。但是這麼乾效率並不高(譯者註:基於佇列的資料結構,獲取除開始或結束位置的其他物件的效率不會太高),因此你儘量不要用這一類的方法,除非你確實不得不那麼做。

BlockingQueue 的實現

BlockingQueue 是個介面,你需要使用它的實現之一來使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 介面的實現(Java 6):

  • ArrayBlockingQueue

  • DelayQueue

  • LinkedBlockingQueue

  • PriorityBlockingQueue

  • SynchronousQueue

Java 中使用 BlockingQueue 的例子

這裡是一個 Java 中使用 BlockingQueue 的示例。本示例使用的是 BlockingQueue 介面的 ArrayBlockingQueue 實現。

首先,BlockingQueueExample 類分別在兩個獨立的執行緒中啟動了一個 Producer 和 一個 Consumer。

Producer 向一個共享的 BlockingQueue 中註入字串,而 Consumer 則會從中把它們拿出來。

public class BlockingQueueExample {  

 

    public static void main(String[] args) throws Exception {  

 

        BlockingQueue queue = new ArrayBlockingQueue(1024);  

 

        Producer producer = new Producer(queue);  

        Consumer consumer = new Consumer(queue);  

 

        new Thread(producer).start();  

        new Thread(consumer).start();  

 

        Thread.sleep(4000);  

    }  

}

以下是 Producer 類。註意它在每次 put() 呼叫時是如何休眠一秒鐘的。這將導致 Consumer 在等待佇列中物件的時候發生阻塞。

public class Producer implements Runnable{  

 

    protected BlockingQueue queue = null;  

 

    public Producer(BlockingQueue queue) {  

        this.queue = queue;  

    }  

 

    public void run() {  

        try {  

            queue.put(“1”);  

            Thread.sleep(1000);  

            queue.put(“2”);  

            Thread.sleep(1000);  

            queue.put(“3”);  

        } catch (InterruptedException e) {  

            e.printStackTrace();  

        }  

    }  

}

以下是 Consumer 類。它只是把物件從佇列中抽取出來,然後將它們列印到 System.out。

public class Consumer implements Runnable{  

 

    protected BlockingQueue queue = null;  

 

    public Consumer(BlockingQueue queue) {  

        this.queue = queue;  

    }  

 

    public void run() {  

        try {  

            System.out.println(queue.take());  

            System.out.println(queue.take());  

            System.out.println(queue.take());  

        } catch (InterruptedException e) {  

            e.printStackTrace();  

        }  

    }  

}

3. 陣列阻塞佇列 ArrayBlockingQueue

ArrayBlockingQueue 類實現了 BlockingQueue 介面。

ArrayBlockingQueue 是一個有界的阻塞佇列,其內部實現是將物件放到一個陣列裡。有界也就意味著,它不能夠儲存無限多數量的元素。它有一個同一時間能夠儲存元素數量的上限。你可以在對其初始化的時候設定這個上限,但之後就無法對這個上限進行修改了(譯者註:因為它是基於陣列實現的,也就具有陣列的特性:一旦初始化,大小就無法修改)。

‘ArrayBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行儲存。佇列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。

以下是在使用 ArrayBlockingQueue 的時候對其初始化的一個示例:

BlockingQueue queue = new ArrayBlockingQueue(1024);  

 

queue.put(“1”);  

 

Object object = queue.take();

以下是使用了 Java 泛型的一個 BlockingQueue 示例。註意其中是如何對 String 元素放入和提取的:

BlockingQueue queue = new ArrayBlockingQueue(1024);  

 

queue.put(“1”);  

 

String string = queue.take();

4. 延遲佇列 DelayQueue

DelayQueue 實現了 BlockingQueue 介面。DelayQueue 對元素進行持有直到一個特定的延遲到期。註入其中的元素必須實現 java.util.concurrent.Delayed 介面,該介面定義:

public interface Delayed extends Comparable

 

 public long getDelay(TimeUnit timeUnit);  

 

}

DelayQueue 將會在每個元素的 getDelay() 方法傳回的值的時間段之後才釋放掉該元素。如果傳回的是 0 或者負值,延遲將被認為過期,該元素將會在 DelayQueue 的下一次 take 被呼叫的時候被釋放掉。傳遞給 getDelay 方法的 getDelay 實體是一個列舉型別,它表明瞭將要延遲的時間段。

TimeUnit 列舉將會取以下值:

DAYS  

HOURS  

MINUTES  

SECONDS  

MILLISECONDS  

MICROSECONDS  

NANOSECONDS  

`

正如你所看到的,Delayed 介面也繼承了 java.lang.Comparable 介面,這也就意味著 Delayed 物件之間可以進行對比。這個可能在對 DelayQueue 佇列中的元素進行排序時有用,因此它們可以根據過期時間進行有序釋放。以下是使用 DelayQueue 的例子:

public class DelayQueueExample {  

 

    public static void main(String[] args) {  

        DelayQueue queue = new DelayQueue();  

 

        Delayed element1 = new DelayedElement();  

 

        queue.put(element1);  

 

        Delayed element2 = queue.take();  

    }  

}

DelayedElement 是我所建立的一個 DelayedElement 介面的實現類,它不在 Java.util.concurrent 包裡。你需要自行建立你自己的 Delayed 介面的實現以使用 DelayQueue 類。

5. 鏈阻塞佇列 LinkedBlockingQueue

LinkedBlockingQueue 類實現了 BlockingQueue 介面。

LinkedBlockingQueue 內部以一個鏈式結構(連結節點)對其元素進行儲存。如果需要的話,這一鏈式結構可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。

LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行儲存。佇列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。

以下是 LinkedBlockingQueue 的初始化和使用示例程式碼:

BlockingQueue unbounded = new LinkedBlockingQueue();  

BlockingQueue bounded   = new LinkedBlockingQueue(1024);  

 

bounded.put(“Value”);  

 

String value = bounded.take();

6. 具有優先順序的阻塞佇列 PriorityBlockingQueue

PriorityBlockingQueue 類實現了 BlockingQueue 介面。

PriorityBlockingQueue 是一個無界的併發佇列。它使用了和類 java.util.PriorityQueue 一樣的排序規則。你無法向這個佇列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 介面。因此該佇列中元素的排序就取決於你自己的 Comparable 實現。註意 PriorityBlockingQueue 對於具有相等優先順序(compare() == 0)的元素並不強制任何特定行為。

同時註意,如果你從一個 PriorityBlockingQueue 獲得一個 Iterator 的話,該 Iterator 並不能保證它對元素的遍歷是以優先順序為序的。

以下是使用 PriorityBlockingQueue 的示例:

BlockingQueue queue   = new PriorityBlockingQueue();  

 

    //String implements java.lang.Comparable  

    queue.put(“Value”);  

 

    String value = queue.take();

7. 同步佇列 SynchronousQueue

SynchronousQueue 類實現了 BlockingQueue 介面。

SynchronousQueue 是一個特殊的佇列,它的內部同時只能夠容納單個元素。如果該佇列已有一元素的話,試圖向佇列中插入一個新元素的執行緒將會阻塞,直到另一個執行緒將該元素從佇列中抽走。同樣,如果該佇列為空,試圖向佇列中抽取一個元素的執行緒將會阻塞,直到另一個執行緒向佇列中插入了一條新的元素。

據此,把這個類稱作一個佇列顯然是誇大其詞了。它更多像是一個匯合點。

8. 阻塞雙端佇列 BlockingDeque

java.util.concurrent 包裡的 BlockingDeque 介面表示一個執行緒安放入和提取實體的雙端佇列。本小節我將給你演示如何使用 BlockingDeque。BlockingDeque 類是一個雙端佇列,在不能夠插入元素時,它將阻塞住試圖插入元素的執行緒;在不能夠抽取元素時,它將阻塞住試圖抽取的執行緒。deque(雙端佇列) 是 “Double Ended Queue” 的縮寫。因此,雙端佇列是一個你可以從任意一端插入或者抽取元素的佇列。

BlockingDeque 的使用

在執行緒既是一個佇列的生產者又是這個佇列的消費者的時候可以使用到 BlockingDeque。如果生產者執行緒需要在佇列的兩端都可以插入資料,消費者執行緒需要在佇列的兩端都可以移除資料,這個時候也可以使用 BlockingDeque。

一個 BlockingDeque – 執行緒在雙端佇列的兩端都可以插入和提取元素。

一個執行緒生產元素,並把它們插入到佇列的任意一端。如果雙端佇列已滿,插入執行緒將被阻塞,直到一個移除執行緒從該佇列中移出了一個元素。如果雙端佇列為空,移除執行緒將被阻塞,直到一個插入執行緒向該佇列插入了一個新元素。

BlockingDeque 的方法

BlockingDeque 具有 4 組不同的方法用於插入、移除以及對雙端佇列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:

四組不同的行為方式解釋:

  • 拋異常:如果試圖的操作無法立即執行,拋一個異常。

  • 特定值:如果試圖的操作無法立即執行,傳回一個特定的值(常常是 true / false)。

  • 阻塞:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行。

  • 超時:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。傳回一個特定值以告知該操作是否成功(典型的是 true / false)。

BlockingDeque 繼承自 BlockingQueue

BlockingDeque 介面繼承自 BlockingQueue 介面。

這就意味著你可以像使用一個 BlockingQueue 那樣使用 BlockingDeque。如果你這麼乾的話,各種插入方法將會把新元素新增到雙端佇列的尾端,而移除方法將會把雙端佇列的首端的元素移除。正如 BlockingQueue 介面的插入和移除方法一樣。

以下是 BlockingDeque 對 BlockingQueue 介面的方法的具體內部實現:

BlockingDeque 的實現

既然 BlockingDeque 是一個介面,那麼你想要使用它的話就得使用它的眾多的實現類的其中一個。java.util.concurrent 包提供了以下 BlockingDeque 介面的實現類:

LinkedBlockingDeque

http://blog.csdn.net/defonds/article/details/44021605#t18

BlockingDeque 程式碼示例

以下是如何使用 BlockingDeque 方法的一個簡短程式碼示例:

BlockingDeque deque = new LinkedBlockingDeque();  

 

deque.addFirst(“1”);  

deque.addLast(“2”);  

 

String two = deque.takeLast();  

String one = deque.takeFirst();

9. 鏈阻塞雙端佇列 LinkedBlockingDeque

LinkedBlockingDeque 類實現了 BlockingDeque 介面。

deque(雙端佇列) 是 “Double Ended Queue” 的縮寫。因此,雙端佇列是一個你可以從任意一端插入或者抽取元素的佇列。(譯者註:唐僧啊,受不了。)LinkedBlockingDeque 是一個雙端佇列,在它為空的時候,一個試圖從中抽取資料的執行緒將會阻塞,無論該執行緒是試圖從哪一端抽取資料。以下是 LinkedBlockingDeque 實體化以及使用的示例:

BlockingDeque deque = new LinkedBlockingDeque();  

 

deque.addFirst(“1”);  

deque.addLast(“2”);  

 

String two = deque.takeLast();  

String one = deque.takeFirst();

10. 併發 Map(對映) ConcurrentMap

java.util.concurrent.ConcurrentMap

java.util.concurrent.ConcurrentMap 介面表示了一個能夠對別人的訪問(插入和提取)進行併發處理的 java.util.Map。ConcurrentMap 除了從其父介面 java.util.Map 繼承來的方法之外還有一些額外的原子性方法。

ConcurrentMap 的實現

既然 ConcurrentMap 是個介面,你想要使用它的話就得使用它的實現類之一。java.util.concurrent 包具備 ConcurrentMap 介面的以下實現類:

ConcurrentHashMap

ConcurrentHashMap

ConcurrentHashMap 和 java.util.HashTable 類很相似,但 ConcurrentHashMap 能夠提供比 HashTable 更好的併發效能。在你從中讀取物件的時候 ConcurrentHashMap 並不會把整個 Map 鎖住。

此外,在你向其中寫入物件的時候,ConcurrentHashMap 也不會鎖住整個 Map。它的內部只是把 Map 中正在被寫入的部分進行鎖定。

另外一個不同點是,在被遍歷的時候,即使是 ConcurrentHashMap 被改動,它也不會拋 ConcurrentModificationException。儘管 Iterator 的設計不是為多個執行緒的同時使用。更多關於 ConcurrentMap 和 ConcurrentHashMap 的細節請參考官方檔案。

ConcurrentMap 例子

以下是如何使用 ConcurrentMap 介面的一個例子。

本示例使用了 ConcurrentHashMap 實現類:

ConcurrentMap concurrentMap = new ConcurrentHashMap();  

concurrentMap.put(“key”, “value”);   

Object value = concurrentMap.get(“key”);

11. 併發導航對映 ConcurrentNavigableMap

java.util.concurrent.ConcurrentNavigableMap 是一個支援併發訪問的 java.util.NavigableMap,它還能讓它的子 map 具備併發訪問的能力。所謂的 “子 map” 指的是諸如 headMap(),subMap(),tailMap() 之類的方法傳回的 map。

NavigableMap 中的方法不再贅述,本小節我們來看一下 ConcurrentNavigableMap 新增的方法。

headMap()

headMap(T toKey) 方法傳回一個包含了小於給定 toKey 的 key 的子 map。如果你對原始 map 裡的元素做了改動,這些改動將影響到子 map 中的元素(譯者註:map 集合持有的其實只是物件的取用)。以下示例演示了對 headMap() 方法的使用:

ConcurrentNavigableMap map = new ConcurrentSkipListMap();  

 

map.put(“1”, “one”);  

map.put(“2”, “two”);  

map.put(“3”, “three”);  

 

ConcurrentNavigableMap headMap = map.headMap(“2”);

headMap 將指向一個只含有鍵 “1″ 的 ConcurrentNavigableMap,因為只有這一個鍵小於 “2″。關於這個方法及其多載版本具體是怎麼工作的細節請參考 Java 檔案。

tailMap()

tailMap(T fromKey) 方法傳回一個包含了不小於給定 fromKey 的 key 的子 map。

如果你對原始 map 裡的元素做了改動,這些改動將影響到子 map 中的元素(譯者註:map 集合持有的其實只是物件的取用)。

以下示例演示了對 tailMap() 方法的使用:

ConcurrentNavigableMap map = new ConcurrentSkipListMap();  

 

map.put(“1”, “one”);  

map.put(“2”, “two”);  

map.put(“3”, “three”);  

 

ConcurrentNavigableMap tailMap = map.tailMap(“2”);

tailMap 將擁有鍵 “2″ 和 “3″,因為它們不小於給定鍵 “2″。關於這個方法及其多載版本具體是怎麼工作的細節請參考 Java 檔案。

subMap()

subMap() 方法傳回原始 map 中,鍵介於 from(包含) 和 to (不包含) 之間的子 map。

示例如下:

ConcurrentNavigableMap map = new ConcurrentSkipListMap();  

 

map.put(“1”, “one”);  

map.put(“2”, “two”);  

map.put(“3”, “three”);  

 

ConcurrentNavigableMap subMap = map.subMap(“2”, “3”);

傳回的 submap 只包含鍵 “2″,因為只有它滿足不小於 “2″,比 “3″ 小。

更多方法

ConcurrentNavigableMap 介面還有其他一些方法可供使用,

比如:

  • descendingKeySet()

  • descendingMap()

  • navigableKeySet()

關於這些方法更多資訊參考官方 Java 檔案。

12. 閉鎖 CountDownLatch

java.util.concurrent.CountDownLatch 是一個併發構造,它允許一個或多個執行緒等待一系列指定操作的完成。

CountDownLatch 以一個給定的數量初始化。countDown() 每被呼叫一次,這一數量就減一。透過呼叫 await() 方法之一,執行緒可以阻塞等待這一數量到達零。以下是一個簡單示例。

Decrementer 三次呼叫 countDown() 之後,等待中的 Waiter 才會從 await() 呼叫中釋放出來。

CountDownLatch latch = new CountDownLatch(3);  

 

Waiter      waiter      = new Waiter(latch);  

Decrementer decrementer = new Decrementer(latch);  

 

new Thread(waiter)     .start();  

new Thread(decrementer).start();  

 

Thread.sleep(4000);  

 

public class Waiter implements Runnable{  

 

    CountDownLatch latch = null;  

 

    public Waiter(CountDownLatch latch) {  

        this.latch = latch;  

    }  

 

    public void run() {  

        try {  

            latch.await();  

        } catch (InterruptedException e) {  

            e.printStackTrace();  

        }  

 

        System.out.println(“Waiter Released”);  

    }  

}  

 

public class Decrementer implements Runnable {  

 

    CountDownLatch latch = null;  

 

    public Decrementer(CountDownLatch latch) {  

        this.latch = latch;  

    }  

 

    public void run() {  

 

        try {  

            Thread.sleep(1000);  

            this.latch.countDown();  

 

            Thread.sleep(1000);  

            this.latch.countDown();  

 

            Thread.sleep(1000);  

            this.latch.countDown();  

        } catch (InterruptedException e) {  

            e.printStackTrace();  

        }  

    }  

}

13. 柵欄 CyclicBarrier

java.util.concurrent.CyclicBarrier 類是一種同步機制,它能夠對處理一些演演算法的執行緒實現同步。換句話講,它就是一個所有執行緒必須等待的一個柵欄,直到所有執行緒都到達這裡,然後所有執行緒才可以繼續做其他事情。

圖示如下:

兩個執行緒在柵欄旁等待對方。

透過呼叫 CyclicBarrier 物件的 await() 方法,兩個執行緒可以實現互相等待。一旦 N 個執行緒在等待 CyclicBarrier 達成,所有執行緒將被釋放掉去繼續執行。

建立一個 CyclicBarrier

在建立一個 CyclicBarrier 的時候你需要定義有多少執行緒在被釋放之前等待柵欄。

建立 CyclicBarrier 示例:

CyclicBarrier barrier = new CyclicBarrier(2);

等待一個 CyclicBarrier

以下演示瞭如何讓一個執行緒等待一個 CyclicBarrier:

barrier.await();

當然,你也可以為等待執行緒設定一個超時時間。等待超過了超時時間之後,即便還沒有達成 N 個執行緒等待 CyclicBarrier 的條件,該執行緒也會被釋放出來。以下是定義超時時間示例:

barrier.await(10, TimeUnit.SECONDS);

滿足以下任何條件都可以讓等待 CyclicBarrier 的執行緒釋放:

  • 最後一個執行緒也到達 CyclicBarrier(呼叫 await())

  • 當前執行緒被其他執行緒打斷(其他執行緒呼叫了這個執行緒的 interrupt() 方法)

  • 其他等待柵欄的執行緒被打斷

  • 其他等待柵欄的執行緒因超時而被釋放

  • 外部執行緒呼叫了柵欄的 CyclicBarrier.reset() 方法

CyclicBarrier 行動

CyclicBarrier 支援一個柵欄行動,柵欄行動是一個 Runnable 實體,一旦最後等待柵欄的執行緒抵達,該實體將被執行。你可以在 CyclicBarrier 的構造方法中將 Runnable 柵欄行動傳給它:

Runnable barrierAction = … ; 

CyclicBarrier barrier = new CyclicBarrier(2, barrierAction);

CyclicBarrier 示例

以下程式碼演示瞭如何使用 CyclicBarrier:

Runnable barrier1Action = new Runnable() {  

    public void run() {  

        System.out.println(“BarrierAction 1 executed “);  

    }  

};  

Runnable barrier2Action = new Runnable() {  

    public void run() {  

        System.out.println(“BarrierAction 2 executed “);  

    }  

};  

 

CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);  

CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);  

 

CyclicBarrierRunnable barrierRunnable1 =  

        new CyclicBarrierRunnable(barrier1, barrier2);  

 

CyclicBarrierRunnable barrierRunnable2 =  

        new CyclicBarrierRunnable(barrier1, barrier2);  

 

new Thread(barrierRunnable1).start();  

new Thread(barrierRunnable2).start();

CyclicBarrierRunnable 類:

public class CyclicBarrierRunnable implements Runnable{  

 

    CyclicBarrier barrier1 = null;  

    CyclicBarrier barrier2 = null;  

 

    public CyclicBarrierRunnable(  

            CyclicBarrier barrier1,  

            CyclicBarrier barrier2) {  

 

        this.barrier1 = barrier1;  

        this.barrier2 = barrier2;  

    }  

 

    public void run() {  

        try {  

            Thread.sleep(1000);  

            System.out.println(Thread.currentThread().getName() +  

                                ” waiting at barrier 1″);  

            this.barrier1.await();  

 

            Thread.sleep(1000);  

            System.out.println(Thread.currentThread().getName() +  

                                ” waiting at barrier 2″);  

            this.barrier2.await();  

 

            System.out.println(Thread.currentThread().getName() +  

                                ” done!”);  

 

        } catch (InterruptedException e) {  

            e.printStackTrace();  

        } catch (BrokenBarrierException e) {  

            e.printStackTrace();  

        }  

    }  

}

以上程式碼控制檯輸出如下。註意每個執行緒寫入控制檯的時序可能會跟你實際執行不一樣。比如有時 Thread-0 先列印,有時 Thread-1 先列印。

Thread-0 waiting at barrier 1

Thread-1 waiting at barrier 1

BarrierAction 1 executed

Thread-1 waiting at barrier 2

Thread-0 waiting at barrier 2

BarrierAction 2 executed

Thread-0 done!

Thread-1 done!

14. 交換機 Exchanger

java.util.concurrent.Exchanger 類表示一種兩個執行緒可以進行互相交換物件的會和點。這種機製圖示如下:

兩個執行緒透過一個 Exchanger 交換物件。

交換物件的動作由 Exchanger 的兩個 exchange() 方法的其中一個完成。

以下是一個示例:

Exchanger exchanger = new Exchanger();  

 

ExchangerRunnable exchangerRunnable1 =  

        new ExchangerRunnable(exchanger, “A”);  

 

ExchangerRunnable exchangerRunnable2 =  

        new ExchangerRunnable(exchanger, “B”);  

 

new Thread(exchangerRunnable1).start();  

new Thread(exchangerRunnable2).start();

ExchangerRunnable 程式碼:

public class ExchangerRunnable implements Runnable{  

 

    Exchanger exchanger = null;  

    Object    object    = null;  

 

    public ExchangerRunnable(Exchanger exchanger, Object object) {  

        this.exchanger = exchanger;  

        this.object = object;  

    }  

 

    public void run() {  

        try {  

            Object previous = this.object;  

 

            this.object = this.exchanger.exchange(this.object);  

 

            System.out.println(  

                    Thread.currentThread().getName() +  

                    ” exchanged ” + previous + ” for ” + this.object  

            );  

        } catch (InterruptedException e) {  

            e.printStackTrace();  

        }  

    }  

}

以上程式輸出:Thread-0 exchanged A for BThread-1 exchanged B for A

15. 訊號量 Semaphore

java.util.concurrent.Semaphore 類是一個計數訊號量。這就意味著它具備兩個主要方法:

  • acquire()

  • release()

計數訊號量由一個指定數量的 “許可” 初始化。每呼叫一次 acquire(),一個許可會被呼叫執行緒取走。每呼叫一次 release(),一個許可會被返還給訊號量。因此,在沒有任何 release() 呼叫時,最多有 N 個執行緒能夠透過 acquire() 方法,N 是該訊號量初始化時的許可的指定數量。這些許可只是一個簡單的計數器。這裡沒啥奇特的地方。

Semaphore 用法

訊號量主要有兩種用途:

  1. 保護一個重要(程式碼)部分防止一次超過 N 個執行緒進入。

  2. 在兩個執行緒之間傳送訊號。

保護重要部分

如果你將訊號量用於保護一個重要部分,試圖進入這一部分的程式碼通常會首先嘗試獲得一個許可,然後才能進入重要部分(程式碼塊),執行完之後,再把許可釋放掉。

比如這樣:

Semaphore semaphore = new Semaphore(1);  

 

//critical section  

semaphore.acquire();  

 

…  

 

semaphore.release();

在執行緒之間傳送訊號

如果你將一個訊號量用於在兩個執行緒之間傳送訊號,通常你應該用一個執行緒呼叫 acquire() 方法,而另一個執行緒呼叫 release() 方法。如果沒有可用的許可,acquire() 呼叫將會阻塞,直到一個許可被另一個執行緒釋放出來。同理,如果無法往訊號量釋放更多許可時,一個 release() 呼叫也會阻塞。

透過這個可以對多個執行緒進行協調。比如,如果執行緒 1 將一個物件插入到了一個共享串列(list)之後之後呼叫了 acquire(),而執行緒 2 則在從該串列中獲取一個物件之前呼叫了 release(),這時你其實已經建立了一個阻塞佇列。訊號量中可用的許可的數量也就等同於該阻塞佇列能夠持有的元素個數。

公平

沒有辦法保證執行緒能夠公平地可從訊號量中獲得許可。也就是說,無法擔保掉第一個呼叫 acquire() 的執行緒會是第一個獲得一個許可的執行緒。如果第一個執行緒在等待一個許可時發生阻塞,而第二個執行緒前來索要一個許可的時候剛好有一個許可被釋放出來,那麼它就可能會在第一個執行緒之前獲得許可。如果你想要強制公平,Semaphore 類有一個具有一個布林型別的引數的構造子,透過這個引數以告知 Semaphore 是否要強制公平。強制公平會影響到併發效能,所以除非你確實需要它否則不要啟用它。

以下是如何在公平樣式建立一個 Semaphore 的示例:

Semaphore semaphore = new Semaphore(1, true);

更多方法

java.util.concurrent.Semaphore 類還有很多方法,比如:

  • availablePermits()

  • acquireUninterruptibly()

  • drainPermits()

  • hasQueuedThreads()

  • getQueuedThreads()

  • tryAcquire()

  • 等等

這些方法的細節請參考 Java 檔案。

看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂