(點選上方公眾號,可快速關註)
來源:笨狐狸 ,
blog.csdn.net/liweisnake/article/details/78842176
disruptor經過幾年的發展,似乎已經成為效能最佳化的大殺器,幾乎每個想最佳化效能的專案宣稱自己用上了disruptor,效能都會呈現質的躍進。畢竟,最好的例子就是LMAX自己的架構設計,支撐了600w/s的吞吐。
本文試圖從程式碼層面將關鍵問題做些解答。
基本概念
Disruptor: 實際上就是整個基於ringBuffer實現的生產者消費者樣式的容器。
RingBuffer: 著名的環形佇列,可以類比為BlockingQueue之類的佇列,ringBuffer的使用,使得記憶體被迴圈使用,減少了某些場景的記憶體分配回收擴容等耗時操作。
EventProcessor: 事件處理器,實際上可以理解為消費者模型的框架,實現了執行緒Runnable的run方法,將迴圈判斷等操作封在了裡面。
EventHandler: 事件處置器,與前面處理器的不同是,事件處置器不負責框架內的行為,僅僅是EventProcessor作為消費者框架對外預留的擴充套件點罷了。
Sequencer: 作為RingBuffer生產者的父介面,其直接實現有SingleProducerSequencer和MultiProducerSequencer。
EventTranslator: 事件轉換器。實際上就是新事件向舊事件改寫的介面定義。
SequenceBarrier: 消費者路障。規定了消費者如何向下走。都說disruptor無鎖,事實上,該路障算是變向的鎖。
WaitStrategy: 當生產者生產得太快而消費者消費得太慢時的等待策略。
把上面幾個關鍵概念畫個圖,大概長這樣:
所以接下來主要也就從生產者,消費者以及ringBuffer3個維度去看disruptor是如何玩的。
生產者
生產者釋出訊息的過程從disruptor的publish方法為入口,實際呼叫了ringBuffer的publish方法。publish方法主要做了幾件事,一是先確保能拿到後面的n個sequence;二是使用translator來填充新資料到相應的位置;三是真正的宣告這些位置已經釋出完成。
public void publishEvent(EventTranslator
translator) {
final long sequence = sequencer.next();
translateAndPublish(translator, sequence);
}
public void publishEvents(EventTranslator
[] translators, int batchStartsAt, int batchSize) {
checkBounds(translators, batchStartsAt, batchSize);
final long finalSequence = sequencer.next(batchSize);
translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
}
獲取生產者下一個sequence的方法,細節已經註釋,實際上最終目的就是確保生產者和消費者互相不越界。
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException(“n must be > 0”);
}
//該生產者釋出的最大序列號
long nextValue = this.nextValue;
//該生產者欲釋出的序列號
long nextSequence = nextValue + n;
//改寫點,即該生產者如果釋出了這次的序列號,那它最終會落在哪個位置,實際上是nextSequence做了算術處理以後的值,最終目的是統一計算,否則就要去判絕對值以及取模等麻煩操作
long wrapPoint = nextSequence – bufferSize;
//所有消費者中消費得最慢那個的前一個序列號
long cachedGatingSequence = this.cachedValue;
//這裡兩個判斷條件:一是看生產者生產是不是超過了消費者,所以判斷的是改寫點是否超過了最慢消費者;二是看消費者是否超過了當前生產者的最大序號,判斷的是消費者是不是比生產者還快這種異常情況
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
//改寫點是不是已經超過了最慢消費者和當前生產者序列號的最小者(這兩個有點難理解,實際上就是改寫點不能超過最慢那個生產者,也不能超過當前自身,比如一次釋出超過bufferSize),gatingSequences的處理也是類似算術處理,也可以看成是相對於原點是正還是負
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
//喚醒阻塞的消費者
waitStrategy.signalAllWhenBlocking();
//等上1納秒
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
//把這個最慢消費者快取下來,以便下一次使用
this.cachedValue = minSequence;
}
//把當前序列號更新為欲釋出序列號
this.nextValue = nextSequence;
return nextSequence;
}
translator由使用者在呼叫時自己實現,其實就是預留的一個擴充套件點,將改寫事件預留出來。大部分實現都是將ByteBuffer複製到Event中,參考disruptor github官方例子。
最後宣告新序列號釋出完成,實際上就是設定了cursor,並且通知可能阻塞的消費者,這裡已經釋出完新的Event了,快來消費吧。
public void publish(long sequence)
{
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}
以上就是單生產者的分析,MultiProducerSequencer可以類似分析。
等待策略
等待策略實際上就是用來同步生產者和消費者的方法。SequenceBarrier只有一個實現ProcessingSequenceBarrier,中間就用到了WaitStrategy
BlockingWaitStrategy就是真正的加鎖阻塞策略,採用的就是ReentrantLock以及Condition來控制阻塞與喚醒。
TimeoutBlockingWaitStrategy是BlockingWaitStrategy中條件帶超時的版本。
LiteBlockingWaitStrategy是BlockingWaitStrategy的改進版,走了ReentrantLock和CAS輕量級鎖結合的方式,不過註釋說這算是實驗性質的微效能改進。
BusySpinWaitStrategy算是一個自旋鎖,其實現很有趣,即不停的呼叫Thread類的onSpinWait方法。
YieldingWaitStrategy是自旋鎖的一種改進,自旋鎖對於cpu來說太重,於是YieldingWaitStrategy先自旋100次,如果期間沒有達成退出等待的條件,則主動讓出cpu給其他執行緒作為懲罰。
SleepingWaitStrategy又是YieldingWaitStrategy的一種改進,SleepingWaitStrategy頭100次先自旋,如果期間沒有達成退出條件,則接下來100次主動讓出cpu作為懲罰,如果還沒有達成條件,則不再計數,每次睡1納秒。
PhasedBackoffWaitStrategy相對複雜點,基本上是10000次自旋以後要麼出讓cpu,然後繼續自旋,要麼就採取新的等待策略。
消費者
EventProcessor是整個消費者事件處理框架,其主體就是執行緒的run方法,來看BatchEventProcessor,總體比較簡單。
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException(“Thread is already running”);
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
//等待至少一個可用的sequence出來
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence – nextSequence + 1);
}
//一個一個消費事件
while (nextSequence <= availableSequence)
{
//從ringBuffer裡獲取下一個事件
event = dataProvider.get(nextSequence);
//消費這個事件
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
//當前的sequence推進到availableSequence
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
RingBuffer
RingBuffer這邊程式碼比較簡單,主要就是封裝了一下釋出的api
abstract class RingBufferFields
extends RingBufferPad {
private static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
private static final Unsafe UNSAFE = Util.getUnsafe();
static
{
final int scale = UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale)
{
REF_ELEMENT_SHIFT = 2;
}
else if (8 == scale)
{
REF_ELEMENT_SHIFT = 3;
}
else
{
throw new IllegalStateException(“Unknown pointer size”);
}
// 如果scale是4, BUFFER_PAD則為32
BUFFER_PAD = 128 / scale;
// Including the buffer pad in the array base offset BUFFER_PAD<
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
}
private final long indexMask;
private final Object[] entries;
protected final int bufferSize;
protected final Sequencer sequencer;
RingBufferFields(
EventFactory
eventFactory, Sequencer sequencer)
{
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
{
throw new IllegalArgumentException(“bufferSize must not be less than 1”);
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException(“bufferSize must be a power of 2”);
}
this.indexMask = bufferSize – 1;
//bufferSize再加兩倍的BUFFER_PAD大小,BUFFER_PAD分別在頭尾
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(EventFactory
eventFactory) {
for (int i = 0; i < bufferSize; i++)
{
//初始化整個buffer
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
@SuppressWarnings(“unchecked”)
protected final E elementAt(long sequence)
{
//sequence & indexMask即對sequence取模, 最終算出來的就是基地址+偏移地址
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
}
主體程式碼基本如上。其他程式碼可以自行參考。
下麵介紹下一些常見問題。
1. disruptor應該如何用才能發揮最大功效?
disruptor原本就是事件驅動的設計,其整個架構跟普通的多執行緒很不一樣。比如一種用法,將disruptor作為業務處理,中間帶I/O處理,這種玩法比多執行緒還慢;相反,如果將disruptor做業務處理,需要I/O時採用nio非同步呼叫,不阻塞disruptor消費者執行緒,等到I/O非同步呼叫回來後在回呼方法中將後續處理重新塞到disruptor佇列中,可以看出來,這是典型的事件處理架構,確實能在時間上佔據優勢,加上ringBuffer固有的幾項效能最佳化,能讓disruptor發揮最大功效。
2. disruptor為啥這麼快?
這個問題參考之前的一篇文章 disruptor框架為什麼這麼強大。
http://blog.csdn.net/liweisnake/article/details/9113119
3. 多生產者如何寫入訊息?
多生產者的訊息寫入實際上是透過availableBuffer與消費者來同步最後一個生產者寫入的位置,這樣,消費者永遠不能超越最慢的那個生產者。見如下程式碼段
private void setAvailableBufferValue(int index, int flag)
{
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
@Override
public boolean isAvailable(long sequence)
{
int index = calculateIndex(sequence);
int flag = calculateAvailabilityFlag(sequence);
long bufferAddress = (index * SCALE) + BASE;
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
@Override
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
{
if (!isAvailable(sequence))
{
return sequence – 1;
}
}
return availableSequence;
}
可以參考這篇文章 RingBuffer多生產者寫入。
http://alicharles.com/article/disruptor-ringbuffer-muti-write/
4. 除了多個消費者重覆處理生產者傳送的訊息,是否可以多消費者不重覆處理生產者傳送的訊息,即各處理各的?
若要多消費者重覆處理生產者的訊息,則使用disruptor.handleEventsWith方法將消費者傳入;而若要消費者不重覆的處理生產者的訊息,則使用disruptor.handleEventsWithWorkerPool方法將消費者傳入。
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能