歡迎光臨
每天分享高質量文章

高效能執行緒間佇列 DISRUPTOR 簡介

(點選上方公眾號,可快速關註)


來源:forever ,

niceaz.com/高效能執行緒間佇列disruptor簡介/#more-189

disruptor簡介

背景

Disruptor是英國外匯交易公司LMAX開發的一個高效能佇列,研發的初衷是解決記憶體佇列的延遲問題。與Kafka(Apache Kafka)、RabbitMQ(RabbitMQ)用於服務間的訊息佇列不同,disruptor一般用於執行緒間訊息的傳遞。基於Disruptor開發的系統單執行緒能支撐每秒600萬訂單,2010年在QCon演講後,獲得了業界關註。2011年,企業應用軟體專家Martin Fowler專門撰寫長文介紹The LMAX Architecture。同年它還獲得了Oracle官方的Duke大獎。其他關於disruptor的背景就不在此多言,可以自己google。

https://martinfowler.com/articles/lmax.html

官方資料

disruptor github wiki有關於disruptor相關概念和原理的介紹,該wiki已經很久沒有更新。像Design and Implementation,對於想瞭解disruptor的人是很有吸引力的,但是隻有題目沒有內容,還是很遺憾的。本文稍後會對其內部原理做一個介紹性的描述。

disruptor github wiki:

Home · LMAX-Exchange/disruptor Wiki

https://github.com/LMAX-Exchange/disruptor/wiki

disruptor github:

LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library

https://github.com/LMAX-Exchange/disruptor

這個地方也有很多不錯的資料:

Disruptor by LMAX-Exchange

https://lmax-exchange.github.io/disruptor/

效能

disruptor是用於一個JVM中多個執行緒之間的訊息佇列,作用與ArrayBlockingQueue有相似之處,但是disruptor從功能、效能都遠好於ArrayBlockingQueue,當多個執行緒之間傳遞大量資料或對效能要求較高時,可以考慮使用disruptor作為ArrayBlockingQueue的替代者。

官方也對disruptor和ArrayBlockingQueue的效能在不同的應用場景下做了對比,本文列出其中一組資料,資料中P代表producer,C代表consumer,ABS代表ArrayBlockingQueue:

完整的官方效能測試資料在Performance Results · LMAX-Exchange/disruptor Wiki可以看到,效能測試的程式碼已經包含在disruptor的程式碼中,你完全可以git下來在自己的主機上測試一下看看

https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results

如何使用

單生產者,單消費者

//宣告disruptor中事件型別及對應的事件工廠

private class LongEvent {

        private long value;

         

        public LongEvent() {

            this.value = 0L;

        }

         

        public void set(long value) {

            this.value = value;

        }

         

        public long get() {

            return this.value;

        }

    }

private EventFactory eventFactory = new EventFactory() {      

        public LongEvent newInstance() {

            return new LongEvent();

        }

};

//宣告disruptor,

private int ringBufferSize = 1024;

private Executor executor = Executors.newFixedThreadPool(8);

private Disruptor disruptor = new Disruptor(eventFactory, ringBufferSize, executor);

 

//pubisher邏輯,將原始資料轉換為event,publish到ringbuffer

private class Publisher implements EventTranslatorOneArg {

 

        public void translateTo(LongEvent event, long sequence, String arg0) {

            event.set(Long.parseLong(arg0));

        }       

    }

//consumer邏輯,獲取event進行處理

private class Consumer implements EventHandler {

 

        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {

            long value = event.get();           

            int index = (int) (value % Const.NUM_OF_FILE);

            fileWriter[index].write(“” + value + “\n”);

             

            if(value == Long.MAX_VALUE) {

                isFinish = true;

            }

        }

         

    }

//註冊consumer啟動disruptor

disruptor.handleEventsWith(new Consumer());

disruptor.start();

 

//獲取disruptor的ringbuffer,用於生產資料

private RingBuffer ringBuffer = disruptor.getRingBuffer();

ringBuffer.publishEvent(new Publisher(), line);

多生產者

多生產者的改動相對簡單,只需將disruptor的宣告換一個建構式即可,但是多生產者ringbuffer的處理邏輯完全不同,只是這些不同對使用者透明,本文將在後邊討論單生產者,多生產者ringbuffer邏輯的不同

private Disruptor disruptor1 = new Disruptor(eventFactory, ringBufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy());

多消費者

多消費者的情況分為兩類:

  • 廣播:對於多個消費者,每條資訊會達到所有的消費者,被多次處理,一般每個消費者業務邏輯不通,用於同一個訊息的不同業務邏輯處理

  • 分組:對於同一組內的多個消費者,每條資訊只會被組內一個消費者處理,每個消費者業務邏輯一般相同,用於多消費者併發處理一組訊息

廣播

  • 消費者之間無依賴關係

假設目前有handler1,handler2,handler3三個消費者處理一批訊息,每個訊息都要被三個消費者處理到,三個消費者無依賴關係,則如下所示即可

disruptor.handleEventsWith(handler1,handler2,handler3);

  • 消費者之間有依賴關係

假設handler3必須在handler1,handler2處理完成後進行處理

disruptor.handleEventsWith(handler1,handler2).then(handler3);

其他情況可視為以上兩種情況的排列組合

分組

分組情況稍微不同,對於消費者,需要實現WorkHandler而不是EventHandler,藉口定義分別如下所示:

public interface EventHandler

{

    /**

     * Called when a publisher has published an event to the {@link RingBuffer}

     *

     * @param event      published to the {@link RingBuffer}

     * @param sequence   of the event being processed

     * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}

     * @throws Exception if the EventHandler would like the exception handled further up the chain.

     */

    void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;

}

public interface WorkHandler

{

    /**

     * Callback to indicate a unit of work needs to be processed.

     *

     * @param event published to the {@link RingBuffer}

     * @throws Exception if the {@link WorkHandler} would like the exception handled further up the chain.

     */

    void onEvent(T event) throws Exception;

}

假設handler1,handler2,handler3都實現了WorkHandler,則呼叫以下程式碼就可以實現分組

disruptor.handleEventsWithWorkerPool(handler1, handler2, handler3);

廣播和分組之間也是可以排列組合的

tips

disruptor也提供了函式讓你自定義消費者之間的關係,如

public EventHandlerGroup handleEventsWith(final EventProcessor… processors)

當然,必須對disruptor有足夠的瞭解才能正確的在EventProcessor中實現多消費者正確的邏輯

實現原理

為何高效

事件預分配

在定義disruptor的時候我們需要指定事件工廠EventFactory的邏輯,disruptor內部的ringbuffer的資料結構是陣列,EventFactory就用於disruptor初始化時陣列每個元素的填充。生產者開始後,是透過獲取對應位置的Event,呼叫Event的setter函式更新Event達到生產資料的目的的。為什麼這樣?假設使用LinkedList,在生產消費的場景下生產者會產生大量的新節點,新節點被消費後又需要被回收,頻繁的生產消費給GC帶來很大的壓力。使用陣列後,在記憶體中存在的是一塊大小穩定的記憶體,頻繁的生產消費對GC並沒有什麼影響,大大減小了系統的最慢響應時間,更不會因為消費者的滯後導致OOM的發生。因此這種事件預分配的方法對於減輕GC壓力可以說是一種簡單有效的方法,日常工作中的借鑒意義還是很大的。

無鎖演演算法

先看一段ABQ put演演算法的實現:

  • 每個物件一個鎖,首先加鎖

  • 如果陣列是滿的,加入鎖的notFull條件等待佇列。(notFull的具體機制可以看這裡的一篇文章wait、notify與Condition | forever

  • 元素加入陣列

  • 釋放鎖

http://niceaz.com/wait%e3%80%81notify%e4%b8%8econdition/

public void put(E e) throws InterruptedException {

        checkNotNull(e);

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        try {

            while (count == items.length)

                notFull.await();

            enqueue(e);

        } finally {

            lock.unlock();

        }

    }

透過以上程式碼說明兩點:

  • ABQ是透過lock機制實現的執行緒同步

  • ABQ的所有操作共用同一個lock,故所有操作均是互斥的

這篇文章中講述了一個實驗, 測試程式呼叫了一個函式,該函式會對一個64位的計數器迴圈自增5億次,在2.4G 6核機器上得到瞭如下的實驗資料:

http://mechanitis.blogspot.com/2011/07/dissecting-disruptor-why-its-so-fast.html

實驗資料說明,使用CAS機制比使用lock機制快了一個數量級

另一方面,ABQ的所有操作都是互斥的,這點其實不是必要的,尤其像put和get操作,沒必要共享一個lock,完全可以降低鎖的粒度提高效能。

disruptor則與之不同:

disruptor使用了CAS機制同步執行緒,執行緒同步代價小於lock

disruptor遵守single writer原則,一塊記憶體對應單個執行緒,不僅produce和consume不是互斥的,多執行緒的produce也不是互斥的

偽共享

偽共享一直是一個比較高階的話題,Doug lea在JDK的Concurrent使用了大量的快取行機制避免偽共享,disruptor也是用了這樣的機制。但是對於廣大的碼農而言,實際工作中我們可能很少會需要使用這樣的機制。畢竟對於大部分人而言,與避免偽共享帶來的效能提升而言,最佳化工程架構,演演算法,io等可能會給我們帶來更大的效能提升。所以本文只簡單提到這個話題,並不深入講解,畢竟我也沒有實際的應用經驗去講解這個話題。

單生產者樣式

如圖所示,圖中陣列代表ringbuffer,紅色元素代表已經釋出過的事件槽,綠色元素代表將要釋出的事件槽,白色元素代表尚未利用的事件槽。disruptor生產時間包括三個階段:申請事件槽,更新資料,釋出事件槽。單生產者相對簡單,

  • 申請事件槽:此時,ringbuffer會將cursor後的一個事件槽傳回給使用者,但不更新cursor,所以對於消費者而言,該事件還是不可見的。

  • 更新資料:生產者對該事件槽資料進行更新,

  • 釋出事件槽:釋出的過程就是移動cursor的過程,完成移動cursor後,釋出完成,該事件對生產者可見。

多生產者樣式

多生產者的樣式相對就比較複雜,也體現了disuptor是如何利用CAS機制進行的執行緒間同步,並保證多個生產者的生產不互斥。如圖所示,紅色的代表已經釋出的事件,淡綠色代表生產者1申請的事件槽,淡黃色代表生產者2申請的事件槽。

  • 申請事件槽:多生產者生產資料的過程就是移動cursor的過程,多個執行緒同時使用CAS操作更新cursor的值,哪個執行緒成功的更新了cursor的值哪個執行緒就成功申請了事件槽,而其他的執行緒則利用CAS操作繼續嘗試更新cursor的值。申請成功後cursor的值已經發生了改變,那怎麼保證在該事件槽釋出之前對消費者不可見呢?disruptor額外利用了一個陣列,如圖中所示。深黃色代表相應的事件槽已經釋出,白色代表相應的事件槽尚未釋出。disruptor使用了UNSAFE類對該陣列進行操作,從而保證陣列值更新的高效性。

  • 更新資料:生產者按序將成功申請到的事件槽資料進行更新

  • 釋出事件槽:生產者將對應陣列的標誌位更新

多個生產者生產資料唯一的競爭就發生在cursor值的更新,disruptor使用CAS操作更新cursor的值從而避免使用了鎖。申請資料之後,多個生產者可以併發更新資料,釋出事件槽,互不影響。需要說明的是,如圖中所示,生產者1申請了三個事件槽,釋出了一個事件槽,生產者2申請了兩個事件槽,釋出了一個事件槽。時間上,在生產者1釋出其剩餘的兩個事件槽之前,生產者2釋出的事件槽對於消費則也還是不可見的。所以,每個生產者一定要保證即便發生異常也要釋出事件槽,避免其後的生產者釋出的事件槽對消費者不可見。所以生產則更新資料和釋出事件槽一般是一個try…finally結構。或者使用disruptor提供的EventTranslator機制釋出事件,EventTranslator自動封裝了try…finally結構

tips

消費者的機制與生產者非常類似,本文不再贅述。

使用案例

LMAX應用場景

第一個講LMAX的應用場景,畢竟是催生disruptor的應用場景,所以非常典型。同時,disruptor作為記憶體訊息佇列,怎麼保證宕機的情況下資料不丟失這一關鍵問題在LMAX自身的應用中可以得到一點啟示。

LMAX的機構如圖所示,共包括三部分,Input Disruptor,Business Processor,Output Disruptor。

Input Disruptor從網路接收到訊息,在Business Processor處理之前需要完成三種操作:

  • Journal:將收到的資訊持久化,在Business Processor執行緒崩潰的時候恢復資料

  • Replicate:複製資訊到其他Business Processor節點

  • Unmarshall:重組資訊資料格式,便於Business Processor處理

Business Processor負責業務邏輯處理,並將結果寫入Output Disruptor

Output Disruptor負責讀取Business Processor處理結果,重組資料格式進行網路傳輸。

重點介紹一下Input Disruptor,Input Disruptor的依賴關係如圖所示:

用disruptor的語言編寫就是:

disruptor.handleWith(journal, replacate, unmarshall).then(business)

LMAX為了避免business processor出現異常導致訊息的丟失,在business processor處理前將訊息全部持久化儲存。當business processor出現異常時,重新處理持久化的資料即可。我們可以借鑒LMAX的這種方式,來避免訊息的丟失。更詳細關於LMAX的業務架構介紹可以參考The LMAX Architecture

https://martinfowler.com/articles/lmax.html

log4j 2

以下一段文字取用自Apache log4j 2官網,這段文字足以說明disruptor對log4j 2的效能提升的巨大貢獻。

Log4j 2 contains next-generation Asynchronous Loggers based on the LMAX Disruptor library. In multi-threaded scenarios Asynchronous Loggers have 18 times higher throughput and orders of magnitude lower latency than Log4j 1.x and Logback.

log4j2效能的優越主要體現在非同步日誌記錄方面,以下兩個圖片摘自官網分別從吞吐率和響應時間兩個方面體現了log4j2非同步日誌效能的強悍。

log4j2非同步日誌的實現就是每次呼叫將待記錄的日誌寫入disruptor後迅速傳回,這樣無需等待資訊落盤從而大大提高相應時間。同時,disruptor的事件槽重用機制避免產生大量Java物件,進而避免GC對相應時間和吞吐率的影響,也就是log4j2官網提到的Garbage-free。

檔案hash

還有一種比較常見的應用場景是檔案hash。如圖所示,需要對大檔案進行hash以方便後續處理,由於檔案太大,所以把檔案分給四個執行緒分別處理,每個執行緒讀取相應資訊,計算hash值,寫入相應檔案。

這樣的方法有兩個弊端:

  • 同一個執行緒內,讀寫相互依賴,互相等待

  • 不同執行緒可能爭奪同一個輸出檔案,需要lock同步

於是改為如下方法,四個執行緒讀取資料,計算hash值,將資訊寫入相應disruptor。每個disruptor對應一個消費者,將disruptor中的資訊落盤持久化。對於四個讀取執行緒而言,只有讀取檔案操作,沒有寫檔案操作,因此不存在讀寫互相依賴的問題。對於寫執行緒而言,只存在寫檔案操作,沒有讀檔案,因此也不存在讀寫互相依賴的問題。同時disruptor的存在又很好的解決了多個執行緒互相競爭同一個檔案的問題,因此可以大大提高程式的吞吐率。

看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂