歡迎光臨
每天分享高質量文章

Java 併發集合的實現原理

(點選上方公眾號,可快速關註)


來源:阿凡盧,

www.cnblogs.com/luxiaoxun/p/4638748.html

本文簡要介紹Java併發程式設計方面常用的類和集合,並介紹下其實現原理。

AtomicInteger

可以用原子方式更新int值。類 AtomicBoolean、AtomicInteger、AtomicLong 和 AtomicReference 的實體各自提供對相應型別單個變數的訪問和更新。基本的原理都是使用CAS操作:

boolean compareAndSet(expectedValue, updateValue);

如果此方法(在不同的類間引數型別也不同)當前保持expectedValue,則以原子方式將變數設定為updateValue,併在成功時報告true。

迴圈CAS,參考AtomicInteger中的實現:

public final int getAndIncrement() {

        for (;;) {

            int current = get();

            int next = current + 1;

            if (compareAndSet(current, next))

                return current;

        }

    }

 

    public final boolean compareAndSet(int expect, int update) {

        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);

    }

ABA問題

因為CAS需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變數前面追加上版本號,每次變數更新的時候把版本號加一,那麼A-B-A 就會變成1A-2B-3A。

從Java1.5開始JDK的atomic包裡提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法作用是首先檢查當前取用是否等於預期取用,並且當前標誌是否等於預期標誌,如果全部相等,則以原子方式將該取用和該標誌的值設定為給定的更新值。

public boolean compareAndSet(

        V      expectedReference,//預期取用

        V      newReference,//更新後的取用

        int    expectedStamp, //預期標誌

        int    newStamp) //更新後的標誌

ArrayBlockingQueue

一個由陣列支援的有界阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。佇列的頭部是在佇列中存在時間最長的元素。佇列的尾部是在佇列中存在時間最短的元素。新元素插入到佇列的尾部,佇列獲取操作則是從佇列頭部開始獲得元素。這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致操作受阻塞;試圖從空佇列中提取元素將導致類似阻塞。

此類支援對等待的生產者執行緒和使用者執行緒進行排序的可選公平策略。預設情況下,不保證是這種排序。然而,透過將公平性(fairness)設定為true而構造的佇列允許按照 FIFO 順序訪問執行緒。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。

一些原始碼參考:

/** Main lock guarding all access */

    final ReentrantLock lock;

 

    public void put(E e) throws InterruptedException {

        checkNotNull(e);

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        try {

            while (count == items.length)

                notFull.await();

            insert(e);

        } finally {

            lock.unlock();

        }

    }

 

    private void insert(E x) {

        items[putIndex] = x;

        putIndex = inc(putIndex);

        ++count;

        notEmpty.signal();

    }

 

    final int inc(int i) {

        return (++i == items.length) ? 0 : i;

    }

 

    public E take() throws InterruptedException {

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        try {

            while (count == 0)

                notEmpty.await();

            return extract();

        } finally {

            lock.unlock();

        }

    }

 

    private E extract() {

        final Object[] items = this.items;

        E x = this.cast(items[takeIndex]);

        items[takeIndex] = null;

        takeIndex = inc(takeIndex);

        –count;

        notFull.signal();

        return x;

    }

 

    final int dec(int i) {

        return ((i == 0) ? items.length : i) – 1;

    }

 

    @SuppressWarnings(“unchecked”)

    static E cast(Object item) {

        return (E) item;

    }

ArrayBlockingQueue只使用了一個lock來控制互斥訪問,所有的互斥訪問都在這個lock的try finally中實現。

LinkedBlockingQueue

一個基於已連結節點的、範圍任意的blocking queue。此佇列按 FIFO(先進先出)排序元素。佇列的頭部是在佇列中時間最長的元素。佇列的尾部是在佇列中時間最短的元素。新元素插入到佇列的尾部,並且佇列獲取操作會獲得位於佇列頭部的元素。連結佇列的吞吐量通常要高於基於陣列的佇列,但是在大多數併發應用程式中,其可預知的效能要低。

可選的容量範圍構造方法引數作為防止佇列過度擴充套件的一種方法。如果未指定容量,則它等於Integer.MAX_VALUE。除非插入節點會使佇列超出容量,否則每次插入後會動態地建立連結節點。

如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。

一些實現程式碼:

/** The capacity bound, or Integer.MAX_VALUE if none */

    private final int capacity;

 

    /** Current number of elements */

    private final AtomicInteger count = new AtomicInteger(0);

 

    /** Lock held by take, poll, etc */

    private final ReentrantLock takeLock = new ReentrantLock();

 

    /** Wait queue for waiting takes */

    private final Condition notEmpty = takeLock.newCondition();

 

    /** Lock held by put, offer, etc */

    private final ReentrantLock putLock = new ReentrantLock();

 

    /** Wait queue for waiting puts */

    private final Condition notFull = putLock.newCondition();

 

    public void put(E e) throws InterruptedException {

        if (e == null) throw new NullPointerException();

        // Note: convention in all put/take/etc is to preset local var

        // holding count negative to indicate failure unless set.

        int c = -1;

        Node node = new Node(e);

        final ReentrantLock putLock = this.putLock;

        final AtomicInteger count = this.count;

        putLock.lockInterruptibly();

        try {

            /*

             * Note that count is used in wait guard even though it is

             * not protected by lock. This works because count can

             * only decrease at this point (all other puts are shut

             * out by lock), and we (or some other waiting put) are

             * signalled if it ever changes from capacity. Similarly

             * for all other uses of count in other wait guards.

             */

            while (count.get() == capacity) {

                notFull.await();

            }

            enqueue(node);

            c = count.getAndIncrement();

            if (c + 1 < capacity)

                notFull.signal();

        } finally {

            putLock.unlock();

        }

        if (c == 0)

            signalNotEmpty();

    }

 

    public E take() throws InterruptedException {

        E x;

        int c = -1;

        final AtomicInteger count = this.count;

        final ReentrantLock takeLock = this.takeLock;

        takeLock.lockInterruptibly();

        try {

            while (count.get() == 0) {

                notEmpty.await();

            }

            x = dequeue();

            c = count.getAndDecrement();

            if (c > 1)

                notEmpty.signal();

        } finally {

            takeLock.unlock();

        }

        if (c == capacity)

            signalNotFull();

        return x;

    }

從原始碼實現來看,LinkedBlockingQueue使用了2個lock,一個takelock和一個putlock,讀和寫用不同的lock來控制,這樣併發效率更高。

ConcurrentLinkedQueue

ArrayBlockingQueue和LinkedBlockingQueue都是使用lock來實現的,也就是阻塞式的佇列,而ConcurrentLinkedQueue使用CAS來實現,是非阻塞式的“lock-free”實現。

ConcurrentLinkedQueue原始碼的實現有點複雜,具體的可看這篇文章的分析:

http://www.infoq.com/cn/articles/ConcurrentLinkedQueue

ConcurrentHashMap

HashMap不是執行緒安全的。

HashTable容器使用synchronized來保證執行緒安全,在執行緒競爭激烈的情況下HashTable的效率非常低下。

ConcurrentHashMap採用了Segment分段技術,容器裡有多把鎖,每把鎖用於鎖容器其中一部分資料,那麼當多執行緒訪問容器裡不同資料段的資料時,執行緒間就不會存在鎖競爭,從而可以有效的提高併發訪問效率。

ConcurrentHashMap結構:

ConcurrentHashMap的實現原理分析:

http://www.infoq.com/cn/articles/ConcurrentHashMap

CopyOnWriteArrayList

CopyOnWrite容器即寫時複製的容器。往一個容器新增元素的時候,不直接往當前容器新增,而是先將當前容器進行Copy,複製出一個新的容器,然後新的容器裡新增元素,新增完元素之後,再將原容器的取用指向新的容器。這樣做的好處是可以對CopyOnWrite容器進行併發的讀,而不需要加鎖,因為當前容器不會新增任何元素。所以CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不同的容器。類似的有CopyOnWriteArraySet。

public boolean add(T e) {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        Object[] elements = getArray();

        int len = elements.length;

        // 複製出新陣列

        Object[] newElements = Arrays.copyOf(elements, len + 1);

        // 把新元素新增到新陣列裡

        newElements[len] = e;

        // 把原陣列取用指向新陣列

        setArray(newElements);

        return true;

    } finally {

        lock.unlock();

    }

}

  

final void setArray(Object[] a) {

    array = a;

}

讀的時候不需要加鎖,如果讀的時候有多個執行緒正在向ArrayList新增資料,讀還是會讀到舊的資料,因為寫的時候不會鎖住舊的ArrayList。

public E get(int index) {

    return get(getArray(), index);

}

AbstractQueuedSynchronizer

為實現依賴於先進先出 (FIFO) 等待佇列的阻塞鎖和相關同步器(訊號量、事件,等等)提供一個框架。此類的設計標的是成為依靠單個原子 int 值來表示狀態的大多數同步器的一個有用基礎。子類必須定義更改此狀態的受保護方法,並定義哪種狀態對於此物件意味著被獲取或被釋放。假定這些條件之後,此類中的其他方法就可以實現所有排隊和阻塞機制。子類可以維護其他狀態列位,但只是為了獲得同步而只追蹤使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法來操作以原子方式更新的 int 值。

使用示例

以下是一個非再進入的互斥鎖類,它使用值 0 表示未鎖定狀態,使用 1 表示鎖定狀態。當非重入鎖定不嚴格地需要當前擁有者執行緒的記錄時,此類使得使用監視器更加方便。它還支援一些條件並公開了一個檢測方法:

class Mutex implements Lock, java.io.Serializable {

 

    // Our internal helper class

    private static class Sync extends AbstractQueuedSynchronizer {

      // Report whether in locked state

      protected boolean isHeldExclusively() { 

        return getState() == 1; 

      }

 

      // Acquire the lock if state is zero

      public boolean tryAcquire(int acquires) {

        assert acquires == 1; // Otherwise unused

       if (compareAndSetState(0, 1)) {

         setExclusiveOwnerThread(Thread.currentThread());

         return true;

       }

       return false;

      }

 

      // Release the lock by setting state to zero

      protected boolean tryRelease(int releases) {

        assert releases == 1; // Otherwise unused

        if (getState() == 0) throw new IllegalMonitorStateException();

        setExclusiveOwnerThread(null);

        setState(0);

        return true;

      }

        

      // Provide a Condition

      Condition newCondition() { return new ConditionObject(); }

 

      // Deserialize properly

      private void readObject(ObjectInputStream s) 

        throws IOException, ClassNotFoundException {

        s.defaultReadObject();

        setState(0); // reset to unlocked state

      }

    }

 

    // The sync object does all the hard work. We just forward to it.

    private final Sync sync = new Sync();

 

    public void lock()                { sync.acquire(1); }

    public boolean tryLock()          { return sync.tryAcquire(1); }

    public void unlock()              { sync.release(1); }

    public Condition newCondition()   { return sync.newCondition(); }

    public boolean isLocked()         { return sync.isHeldExclusively(); }

    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }

    public void lockInterruptibly() throws InterruptedException { 

      sync.acquireInterruptibly(1);

    }

    public boolean tryLock(long timeout, TimeUnit unit) 

        throws InterruptedException {

      return sync.tryAcquireNanos(1, unit.toNanos(timeout));

    }

 }

ThreadPoolExecutor

ThreadPoolExecutor 的內部工作原理,整個思路總結起來就是 5 句話:

1. 如果當前池大小 poolSize 小於 corePoolSize ,則建立新執行緒執行任務。

2. 如果當前池大小 poolSize 大於 corePoolSize ,且等待佇列未滿,則進入等待佇列

3. 如果當前池大小 poolSize 大於 corePoolSize 且小於 maximumPoolSize ,且等待佇列已滿,則建立新執行緒執行任務。

4. 如果當前池大小 poolSize 大於 corePoolSize 且大於 maximumPoolSize ,且等待佇列已滿,則呼叫拒絕策略來處理該任務。

5. 執行緒池裡的每個執行緒執行完任務後不會立刻退出,而是會去檢查下等待佇列裡是否還有執行緒任務需要執行,如果在 keepAliveTime 裡等不到新的任務了,那麼執行緒就會退出。

排隊有三種通用策略:

直接提交。工作佇列的預設選項是SynchronousQueue,它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。

無界佇列。使用無界佇列(例如,不具有預定義容量的LinkedBlockingQueue)將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web 頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。

有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列(如ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和背景關係切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。

ThreadFactory 和 RejectedExecutionHandler是ThreadPoolExecutor的兩個屬性,也 可以認為是兩個簡單的擴充套件點. ThreadFactory 是建立執行緒的工廠。預設的執行緒工廠會建立一個帶有“ pool-poolNumber-thread-threadNumber ”為名字的執行緒,如果我們有特別的需要,如執行緒組命名、優先順序等,可以定製自己的ThreadFactory 。

RejectedExecutionHandler 是拒絕的策略。常見有以下幾種:

  • AbortPolicy :不執行,會丟擲 RejectedExecutionException 異常。

  • CallerRunsPolicy :由呼叫者(呼叫執行緒池的主執行緒)執行。

  • DiscardOldestPolicy :拋棄等待佇列中最老的。

  • DiscardPolicy: 不做任何處理,即拋棄當前任務。

ScheduleThreadPoolExecutor 是對ThreadPoolExecutor的整合。增加了定時觸發執行緒任務的功能。需要註意從內部實現看,ScheduleThreadPoolExecutor 使用的是 corePoolSize 執行緒和一個無界佇列的固定大小的池,所以調整 maximumPoolSize 沒有效果。無界佇列是一個內部自定義的 DelayedWorkQueue 。

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {  

    return new ThreadPoolExecutor(nThreads, nThreads,  

                                  0L, TimeUnit.MILLISECONDS,  

                                  new LinkedBlockingQueue());  

}

實際上就是個不支援keepalivetime,且corePoolSize和maximumPoolSize相等的執行緒池。

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {  

    return new FinalizableDelegatedExecutorService  

        (new ThreadPoolExecutor(1, 1,  

                                0L, TimeUnit.MILLISECONDS,  

                                new LinkedBlockingQueue()));  

}

實際上就是個不支援keepalivetime,且corePoolSize和maximumPoolSize都等1的執行緒池。

CachedThreadPool

public static ExecutorService newCachedThreadPool() {  

      return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 

                                  60L, TimeUnit.SECONDS,  

                                  new SynchronousQueue());  

}

實際上就是個支援keepalivetime時間是60秒(執行緒空閑存活時間),且corePoolSize為0,maximumPoolSize無窮大的執行緒池。

SingleThreadScheduledExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {  

    return new DelegatedScheduledExecutorService  

        (new ScheduledThreadPoolExecutor(1, threadFactory));  

}

實際上是個corePoolSize為1的ScheduledExecutor。上文說過ScheduledExecutor採用無界等待佇列,所以maximumPoolSize沒有作用。

ScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  

    return new ScheduledThreadPoolExecutor(corePoolSize);  

}

實際上是corePoolSize課設定的ScheduledExecutor。上文說過ScheduledExecutor採用無界等待佇列,所以maximumPoolSize沒有作用。

參考:

  • 《java併發程式設計的藝術》

看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂