(點選上方公眾號,可快速關註)
來源:forever ,
niceaz.com/高效能執行緒間佇列disruptor簡介/#more-189
disruptor簡介
背景
Disruptor是英國外匯交易公司LMAX開發的一個高效能佇列,研發的初衷是解決記憶體佇列的延遲問題。與Kafka(Apache Kafka)、RabbitMQ(RabbitMQ)用於服務間的訊息佇列不同,disruptor一般用於執行緒間訊息的傳遞。基於Disruptor開發的系統單執行緒能支撐每秒600萬訂單,2010年在QCon演講後,獲得了業界關註。2011年,企業應用軟體專家Martin Fowler專門撰寫長文介紹The LMAX Architecture。同年它還獲得了Oracle官方的Duke大獎。其他關於disruptor的背景就不在此多言,可以自己google。
https://martinfowler.com/articles/lmax.html
官方資料
disruptor github wiki有關於disruptor相關概念和原理的介紹,該wiki已經很久沒有更新。像Design and Implementation,對於想瞭解disruptor的人是很有吸引力的,但是隻有題目沒有內容,還是很遺憾的。本文稍後會對其內部原理做一個介紹性的描述。
disruptor github wiki:
Home · LMAX-Exchange/disruptor Wiki
https://github.com/LMAX-Exchange/disruptor/wiki
disruptor github:
LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library
https://github.com/LMAX-Exchange/disruptor
這個地方也有很多不錯的資料:
Disruptor by LMAX-Exchange
https://lmax-exchange.github.io/disruptor/
效能
disruptor是用於一個JVM中多個執行緒之間的訊息佇列,作用與ArrayBlockingQueue有相似之處,但是disruptor從功能、效能都遠好於ArrayBlockingQueue,當多個執行緒之間傳遞大量資料或對效能要求較高時,可以考慮使用disruptor作為ArrayBlockingQueue的替代者。
官方也對disruptor和ArrayBlockingQueue的效能在不同的應用場景下做了對比,本文列出其中一組資料,資料中P代表producer,C代表consumer,ABS代表ArrayBlockingQueue:
完整的官方效能測試資料在Performance Results · LMAX-Exchange/disruptor Wiki可以看到,效能測試的程式碼已經包含在disruptor的程式碼中,你完全可以git下來在自己的主機上測試一下看看
https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
如何使用
單生產者,單消費者
//宣告disruptor中事件型別及對應的事件工廠
private class LongEvent {
private long value;
public LongEvent() {
this.value = 0L;
}
public void set(long value) {
this.value = value;
}
public long get() {
return this.value;
}
}
private EventFactory
eventFactory = new EventFactory () { public LongEvent newInstance() {
return new LongEvent();
}
};
//宣告disruptor,
private int ringBufferSize = 1024;
private Executor executor = Executors.newFixedThreadPool(8);
private Disruptor
disruptor = new Disruptor (eventFactory, ringBufferSize, executor);
//pubisher邏輯,將原始資料轉換為event,publish到ringbuffer
private class Publisher implements EventTranslatorOneArg
{
public void translateTo(LongEvent event, long sequence, String arg0) {
event.set(Long.parseLong(arg0));
}
}
//consumer邏輯,獲取event進行處理
private class Consumer implements EventHandler
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
long value = event.get();
int index = (int) (value % Const.NUM_OF_FILE);
fileWriter[index].write(“” + value + “\n”);
if(value == Long.MAX_VALUE) {
isFinish = true;
}
}
}
//註冊consumer啟動disruptor
disruptor.handleEventsWith(new Consumer());
disruptor.start();
//獲取disruptor的ringbuffer,用於生產資料
private RingBuffer
ringBuffer = disruptor.getRingBuffer(); ringBuffer.publishEvent(new Publisher(), line);
多生產者
多生產者的改動相對簡單,只需將disruptor的宣告換一個建構式即可,但是多生產者ringbuffer的處理邏輯完全不同,只是這些不同對使用者透明,本文將在後邊討論單生產者,多生產者ringbuffer邏輯的不同
private Disruptor
disruptor1 = new Disruptor (eventFactory, ringBufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy());
多消費者
多消費者的情況分為兩類:
-
廣播:對於多個消費者,每條資訊會達到所有的消費者,被多次處理,一般每個消費者業務邏輯不通,用於同一個訊息的不同業務邏輯處理
-
分組:對於同一組內的多個消費者,每條資訊只會被組內一個消費者處理,每個消費者業務邏輯一般相同,用於多消費者併發處理一組訊息
廣播
-
消費者之間無依賴關係
假設目前有handler1,handler2,handler3三個消費者處理一批訊息,每個訊息都要被三個消費者處理到,三個消費者無依賴關係,則如下所示即可
disruptor.handleEventsWith(handler1,handler2,handler3);
-
消費者之間有依賴關係
假設handler3必須在handler1,handler2處理完成後進行處理
disruptor.handleEventsWith(handler1,handler2).then(handler3);
其他情況可視為以上兩種情況的排列組合
分組
分組情況稍微不同,對於消費者,需要實現WorkHandler而不是EventHandler,藉口定義分別如下所示:
public interface EventHandler
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain.
*/
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
public interface WorkHandler
{
/**
* Callback to indicate a unit of work needs to be processed.
*
* @param event published to the {@link RingBuffer}
* @throws Exception if the {@link WorkHandler} would like the exception handled further up the chain.
*/
void onEvent(T event) throws Exception;
}
假設handler1,handler2,handler3都實現了WorkHandler,則呼叫以下程式碼就可以實現分組
disruptor.handleEventsWithWorkerPool(handler1, handler2, handler3);
廣播和分組之間也是可以排列組合的
tips
disruptor也提供了函式讓你自定義消費者之間的關係,如
public EventHandlerGroup
當然,必須對disruptor有足夠的瞭解才能正確的在EventProcessor中實現多消費者正確的邏輯
實現原理
為何高效
事件預分配
在定義disruptor的時候我們需要指定事件工廠EventFactory的邏輯,disruptor內部的ringbuffer的資料結構是陣列,EventFactory就用於disruptor初始化時陣列每個元素的填充。生產者開始後,是透過獲取對應位置的Event,呼叫Event的setter函式更新Event達到生產資料的目的的。為什麼這樣?假設使用LinkedList,在生產消費的場景下生產者會產生大量的新節點,新節點被消費後又需要被回收,頻繁的生產消費給GC帶來很大的壓力。使用陣列後,在記憶體中存在的是一塊大小穩定的記憶體,頻繁的生產消費對GC並沒有什麼影響,大大減小了系統的最慢響應時間,更不會因為消費者的滯後導致OOM的發生。因此這種事件預分配的方法對於減輕GC壓力可以說是一種簡單有效的方法,日常工作中的借鑒意義還是很大的。
無鎖演演算法
先看一段ABQ put演演算法的實現:
-
每個物件一個鎖,首先加鎖
-
如果陣列是滿的,加入鎖的notFull條件等待佇列。(notFull的具體機制可以看這裡的一篇文章wait、notify與Condition | forever)
-
元素加入陣列
-
釋放鎖
http://niceaz.com/wait%e3%80%81notify%e4%b8%8econdition/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
透過以上程式碼說明兩點:
-
ABQ是透過lock機制實現的執行緒同步
-
ABQ的所有操作共用同一個lock,故所有操作均是互斥的
這篇文章中講述了一個實驗, 測試程式呼叫了一個函式,該函式會對一個64位的計數器迴圈自增5億次,在2.4G 6核機器上得到瞭如下的實驗資料:
http://mechanitis.blogspot.com/2011/07/dissecting-disruptor-why-its-so-fast.html
實驗資料說明,使用CAS機制比使用lock機制快了一個數量級
另一方面,ABQ的所有操作都是互斥的,這點其實不是必要的,尤其像put和get操作,沒必要共享一個lock,完全可以降低鎖的粒度提高效能。
disruptor則與之不同:
disruptor使用了CAS機制同步執行緒,執行緒同步代價小於lock
disruptor遵守single writer原則,一塊記憶體對應單個執行緒,不僅produce和consume不是互斥的,多執行緒的produce也不是互斥的
偽共享
偽共享一直是一個比較高階的話題,Doug lea在JDK的Concurrent使用了大量的快取行機制避免偽共享,disruptor也是用了這樣的機制。但是對於廣大的碼農而言,實際工作中我們可能很少會需要使用這樣的機制。畢竟對於大部分人而言,與避免偽共享帶來的效能提升而言,最佳化工程架構,演演算法,io等可能會給我們帶來更大的效能提升。所以本文只簡單提到這個話題,並不深入講解,畢竟我也沒有實際的應用經驗去講解這個話題。
單生產者樣式
如圖所示,圖中陣列代表ringbuffer,紅色元素代表已經釋出過的事件槽,綠色元素代表將要釋出的事件槽,白色元素代表尚未利用的事件槽。disruptor生產時間包括三個階段:申請事件槽,更新資料,釋出事件槽。單生產者相對簡單,
-
申請事件槽:此時,ringbuffer會將cursor後的一個事件槽傳回給使用者,但不更新cursor,所以對於消費者而言,該事件還是不可見的。
-
更新資料:生產者對該事件槽資料進行更新,
-
釋出事件槽:釋出的過程就是移動cursor的過程,完成移動cursor後,釋出完成,該事件對生產者可見。
多生產者樣式
多生產者的樣式相對就比較複雜,也體現了disuptor是如何利用CAS機制進行的執行緒間同步,並保證多個生產者的生產不互斥。如圖所示,紅色的代表已經釋出的事件,淡綠色代表生產者1申請的事件槽,淡黃色代表生產者2申請的事件槽。
-
申請事件槽:多生產者生產資料的過程就是移動cursor的過程,多個執行緒同時使用CAS操作更新cursor的值,哪個執行緒成功的更新了cursor的值哪個執行緒就成功申請了事件槽,而其他的執行緒則利用CAS操作繼續嘗試更新cursor的值。申請成功後cursor的值已經發生了改變,那怎麼保證在該事件槽釋出之前對消費者不可見呢?disruptor額外利用了一個陣列,如圖中所示。深黃色代表相應的事件槽已經釋出,白色代表相應的事件槽尚未釋出。disruptor使用了UNSAFE類對該陣列進行操作,從而保證陣列值更新的高效性。
-
更新資料:生產者按序將成功申請到的事件槽資料進行更新
-
釋出事件槽:生產者將對應陣列的標誌位更新
多個生產者生產資料唯一的競爭就發生在cursor值的更新,disruptor使用CAS操作更新cursor的值從而避免使用了鎖。申請資料之後,多個生產者可以併發更新資料,釋出事件槽,互不影響。需要說明的是,如圖中所示,生產者1申請了三個事件槽,釋出了一個事件槽,生產者2申請了兩個事件槽,釋出了一個事件槽。時間上,在生產者1釋出其剩餘的兩個事件槽之前,生產者2釋出的事件槽對於消費則也還是不可見的。所以,每個生產者一定要保證即便發生異常也要釋出事件槽,避免其後的生產者釋出的事件槽對消費者不可見。所以生產則更新資料和釋出事件槽一般是一個try…finally結構。或者使用disruptor提供的EventTranslator機制釋出事件,EventTranslator自動封裝了try…finally結構
tips
消費者的機制與生產者非常類似,本文不再贅述。
使用案例
LMAX應用場景
第一個講LMAX的應用場景,畢竟是催生disruptor的應用場景,所以非常典型。同時,disruptor作為記憶體訊息佇列,怎麼保證宕機的情況下資料不丟失這一關鍵問題在LMAX自身的應用中可以得到一點啟示。
LMAX的機構如圖所示,共包括三部分,Input Disruptor,Business Processor,Output Disruptor。
Input Disruptor從網路接收到訊息,在Business Processor處理之前需要完成三種操作:
-
Journal:將收到的資訊持久化,在Business Processor執行緒崩潰的時候恢復資料
-
Replicate:複製資訊到其他Business Processor節點
-
Unmarshall:重組資訊資料格式,便於Business Processor處理
Business Processor負責業務邏輯處理,並將結果寫入Output Disruptor
Output Disruptor負責讀取Business Processor處理結果,重組資料格式進行網路傳輸。
重點介紹一下Input Disruptor,Input Disruptor的依賴關係如圖所示:
用disruptor的語言編寫就是:
disruptor.handleWith(journal, replacate, unmarshall).then(business)
LMAX為了避免business processor出現異常導致訊息的丟失,在business processor處理前將訊息全部持久化儲存。當business processor出現異常時,重新處理持久化的資料即可。我們可以借鑒LMAX的這種方式,來避免訊息的丟失。更詳細關於LMAX的業務架構介紹可以參考The LMAX Architecture
https://martinfowler.com/articles/lmax.html
log4j 2
以下一段文字取用自Apache log4j 2官網,這段文字足以說明disruptor對log4j 2的效能提升的巨大貢獻。
Log4j 2 contains next-generation Asynchronous Loggers based on the LMAX Disruptor library. In multi-threaded scenarios Asynchronous Loggers have 18 times higher throughput and orders of magnitude lower latency than Log4j 1.x and Logback.
log4j2效能的優越主要體現在非同步日誌記錄方面,以下兩個圖片摘自官網分別從吞吐率和響應時間兩個方面體現了log4j2非同步日誌效能的強悍。
log4j2非同步日誌的實現就是每次呼叫將待記錄的日誌寫入disruptor後迅速傳回,這樣無需等待資訊落盤從而大大提高相應時間。同時,disruptor的事件槽重用機制避免產生大量Java物件,進而避免GC對相應時間和吞吐率的影響,也就是log4j2官網提到的Garbage-free。
檔案hash
還有一種比較常見的應用場景是檔案hash。如圖所示,需要對大檔案進行hash以方便後續處理,由於檔案太大,所以把檔案分給四個執行緒分別處理,每個執行緒讀取相應資訊,計算hash值,寫入相應檔案。
這樣的方法有兩個弊端:
-
同一個執行緒內,讀寫相互依賴,互相等待
-
不同執行緒可能爭奪同一個輸出檔案,需要lock同步
於是改為如下方法,四個執行緒讀取資料,計算hash值,將資訊寫入相應disruptor。每個disruptor對應一個消費者,將disruptor中的資訊落盤持久化。對於四個讀取執行緒而言,只有讀取檔案操作,沒有寫檔案操作,因此不存在讀寫互相依賴的問題。對於寫執行緒而言,只存在寫檔案操作,沒有讀檔案,因此也不存在讀寫互相依賴的問題。同時disruptor的存在又很好的解決了多個執行緒互相競爭同一個檔案的問題,因此可以大大提高程式的吞吐率。
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能