高性能線程間隊列 DISRUPTOR 簡介

niceaz.com/高性能線程間隊列disruptor簡介/#more-189


disruptor簡介


背景


Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題。與Kafka(Apache Kafka)、RabbitMQ(RabbitMQ)用於服務間的消息隊列不同,disruptor一般用於線程間消息的傳遞。基於Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講後,獲得了業界關注。2011年,企業應用軟件專家Martin Fowler專門撰寫長文介紹The LMAX Architecture。同年它還獲得了Oracle官方的Duke大獎。其他關於disruptor的背景就不在此多言,可以自己google。


https://martinfowler.com/articles/lmax.html


官方資料


disruptor github wiki有關於disruptor相關概念和原理的介紹,該wiki已經很久沒有更新。像Design and Implementation,對於想了解disruptor的人是很有吸引力的,但是隻有題目沒有內容,還是很遺憾的。本文稍後會對其內部原理做一個介紹性的描述。


disruptor github wiki:

Home · LMAX-Exchange/disruptor Wiki


https://github.com/LMAX-Exchange/disruptor/wiki


disruptor github:

LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library


https://github.com/LMAX-Exchange/disruptor


這個地方也有很多不錯的資料:


Disruptor by LMAX-Exchange


https://lmax-exchange.github.io/disruptor/


性能


disruptor是用於一個JVM中多個線程之間的消息隊列,作用與ArrayBlockingQueue有相似之處,但是disruptor從功能、性能都遠好於ArrayBlockingQueue,當多個線程之間傳遞大量數據或對性能要求較高時,可以考慮使用disruptor作為ArrayBlockingQueue的替代者。


官方也對disruptor和ArrayBlockingQueue的性能在不同的應用場景下做了對比,本文列出其中一組數據,數據中P代表producer,C代表consumer,ABS代表ArrayBlockingQueue:


高性能線程間隊列 DISRUPTOR 簡介


完整的官方性能測試數據在Performance Results · LMAX-Exchange/disruptor Wiki可以看到,性能測試的代碼已經包含在disruptor的代碼中,你完全可以git下來在自己的主機上測試一下看看


https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results


如何使用


單生產者,單消費者


//聲明disruptor中事件類型及對應的事件工廠

private class LongEvent {

private long value;

public LongEvent() {

this.value = 0L;

}

public void set(long value) {

this.value = value;

}

public long get() {

return this.value;

}

}

private EventFactory<longevent> eventFactory = new EventFactory<longevent>() { /<longevent>/<longevent>

public LongEvent newInstance() {

return new LongEvent();

}

};

private int ringBufferSize = 1024;

private Executor executor = Executors.newFixedThreadPool(8);

private Disruptor<longevent> disruptor = new Disruptor<longevent>(eventFactory, ringBufferSize, executor);/<longevent>/<longevent>

//pubisher邏輯,將原始數據轉換為event,publish到ringbuffer

private class Publisher implements EventTranslatorOneArg<longevent> {/<longevent>

public void translateTo(LongEvent event, long sequence, String arg0) {

event.set(Long.parseLong(arg0));

}

}

//consumer邏輯,獲取event進行處理

private class Consumer implements EventHandler<longevent> {/<longevent>

public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {

long value = event.get();

int index = (int) (value % Const.NUM_OF_FILE);

fileWriter[index].write("" + value + "\\n");

if(value == Long.MAX_VALUE) {

isFinish = true;

}

}

}

//註冊consumer啟動disruptor

disruptor.handleEventsWith(new Consumer());

disruptor.start();

//獲取disruptor的ringbuffer,用於生產數據

private RingBuffer<longevent> ringBuffer = disruptor.getRingBuffer();/<longevent>

ringBuffer.publishEvent(new Publisher(), line);


多生產者


多生產者的改動相對簡單,只需將disruptor的聲明換一個構造函數即可,但是多生產者ringbuffer的處理邏輯完全不同,只是這些不同對使用者透明,本文將在後邊討論單生產者,多生產者ringbuffer邏輯的不同


private Disruptor<longevent> disruptor1 = new Disruptor<longevent>(eventFactory, ringBufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy());/<longevent>/<longevent>


多消費者


多消費者的情況分為兩類:


  • 廣播:對於多個消費者,每條信息會達到所有的消費者,被多次處理,一般每個消費者業務邏輯不通,用於同一個消息的不同業務邏輯處理
  • 分組:對於同一組內的多個消費者,每條信息只會被組內一個消費者處理,每個消費者業務邏輯一般相同,用於多消費者併發處理一組消息


廣播


  • 消費者之間無依賴關係


假設目前有handler1,handler2,handler3三個消費者處理一批消息,每個消息都要被三個消費者處理到,三個消費者無依賴關係,則如下所示即可

disruptor.handleEventsWith(handler1,handler2,handler3);


  • 消費者之間有依賴關係


假設handler3必須在handler1,handler2處理完成後進行處理

disruptor.handleEventsWith(handler1,handler2).then(handler3);

其他情況可視為以上兩種情況的排列組合


分組


分組情況稍微不同,對於消費者,需要實現WorkHandler而不是EventHandler,藉口定義分別如下所示:


public interface EventHandler

{

/**

* Called when a publisher has published an event to the {@link RingBuffer}

*

* @param event published to the {@link RingBuffer}

* @param sequence of the event being processed

* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}

* @throws Exception if the EventHandler would like the exception handled further up the chain.

*/

void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;

}


public interface WorkHandler

{

/**

* Callback to indicate a unit of work needs to be processed.

*

* @param event published to the {@link RingBuffer}

* @throws Exception if the {@link WorkHandler} would like the exception handled further up the chain.

*/

void onEvent(T event) throws Exception;

}


假設handler1,handler2,handler3都實現了WorkHandler,則調用以下代碼就可以實現分組


disruptor.handleEventsWithWorkerPool(handler1, handler2, handler3);


廣播和分組之間也是可以排列組合的


tips


disruptor也提供了函數讓你自定義消費者之間的關係,如

public EventHandlerGroup

handleEventsWith(final EventProcessor… processors)

當然,必須對disruptor有足夠的瞭解才能正確的在EventProcessor中實現多消費者正確的邏輯


實現原理


為何高效


事件預分配


在定義disruptor的時候我們需要指定事件工廠EventFactory的邏輯,disruptor內部的ringbuffer的數據結構是數組,EventFactory就用於disruptor初始化時數組每個元素的填充。生產者開始後,是通過獲取對應位置的Event,調用Event的setter函數更新Event達到生產數據的目的的。為什麼這樣?假設使用LinkedList,在生產消費的場景下生產者會產生大量的新節點,新節點被消費後又需要被回收,頻繁的生產消費給GC帶來很大的壓力。使用數組後,在內存中存在的是一塊大小穩定的內存,頻繁的生產消費對GC並沒有什麼影響,大大減小了系統的最慢響應時間,更不會因為消費者的滯後導致OOM的發生。因此這種事件預分配的方法對於減輕GC壓力可以說是一種簡單有效的方法,日常工作中的借鑑意義還是很大的。


無鎖算法


先看一段ABQ put算法的實現:


  • 每個對象一個鎖,首先加鎖
  • 如果數組是滿的,加入鎖的notFull條件等待隊列。(notFull的具體機制可以看這裡的一篇文章wait、notify與Condition | forever
  • 元素加入數組
  • 釋放鎖


http://niceaz.com/wait%e3%80%81notify%e4%b8%8econdition/


public void put(E e) throws InterruptedException {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == items.length)

notFull.await();

enqueue(e);

} finally {

lock.unlock();

}

}


通過以上代碼說明兩點:


  • ABQ是通過lock機制實現的線程同步
  • ABQ的所有操作共用同一個lock,故所有操作均是互斥的


這篇文章中講述了一個實驗, 測試程序調用了一個函數,該函數會對一個64位的計數器循環自增5億次,在2.4G 6核機器上得到了如下的實驗數據:


http://mechanitis.blogspot.com/2011/07/dissecting-disruptor-why-its-so-fast.html


高性能線程間隊列 DISRUPTOR 簡介


實驗數據說明,使用CAS機制比使用lock機制快了一個數量級


另一方面,ABQ的所有操作都是互斥的,這點其實不是必要的,尤其像put和get操作,沒必要共享一個lock,完全可以降低鎖的粒度提高性能。


disruptor則與之不同:


disruptor使用了CAS機制同步線程,線程同步代價小於lock

disruptor遵守single writer原則,一塊內存對應單個線程,不僅produce和consume不是互斥的,多線程的produce也不是互斥的


偽共享


偽共享一直是一個比較高級的話題,Doug lea在JDK的Concurrent使用了大量的緩存行機制避免偽共享,disruptor也是用了這樣的機制。但是對於廣大的碼農而言,實際工作中我們可能很少會需要使用這樣的機制。畢竟對於大部分人而言,與避免偽共享帶來的性能提升而言,優化工程架構,算法,io等可能會給我們帶來更大的性能提升。所以本文只簡單提到這個話題,並不深入講解,畢竟我也沒有實際的應用經驗去講解這個話題。


單生產者模式


如圖所示,圖中數組代表ringbuffer,紅色元素代表已經發布過的事件槽,綠色元素代表將要發佈的事件槽,白色元素代表尚未利用的事件槽。disruptor生產時間包括三個階段:申請事件槽,更新數據,發佈事件槽。單生產者相對簡單,


  • 申請事件槽:此時,ringbuffer會將cursor後的一個事件槽返回給用戶,但不更新cursor,所以對於消費者而言,該事件還是不可見的。
  • 更新數據:生產者對該事件槽數據進行更新,
  • 發佈事件槽:發佈的過程就是移動cursor的過程,完成移動cursor後,發佈完成,該事件對生產者可見。


高性能線程間隊列 DISRUPTOR 簡介


多生產者模式


多生產者的模式相對就比較複雜,也體現了disuptor是如何利用CAS機制進行的線程間同步,並保證多個生產者的生產不互斥。如圖所示,紅色的代表已經發布的事件,淡綠色代表生產者1申請的事件槽,淡黃色代表生產者2申請的事件槽。


  • 申請事件槽:多生產者生產數據的過程就是移動cursor的過程,多個線程同時使用CAS操作更新cursor的值,哪個線程成功的更新了cursor的值哪個線程就成功申請了事件槽,而其他的線程則利用CAS操作繼續嘗試更新cursor的值。申請成功後cursor的值已經發生了改變,那怎麼保證在該事件槽發佈之前對消費者不可見呢?disruptor額外利用了一個數組,如圖中所示。深黃色代表相應的事件槽已經發布,白色代表相應的事件槽尚未發佈。disruptor使用了UNSAFE類對該數組進行操作,從而保證數組值更新的高效性。
  • 更新數據:生產者按序將成功申請到的事件槽數據進行更新
  • 發佈事件槽:生產者將對應數組的標誌位更新


高性能線程間隊列 DISRUPTOR 簡介


多個生產者生產數據唯一的競爭就發生在cursor值的更新,disruptor使用CAS操作更新cursor的值從而避免使用了鎖。申請數據之後,多個生產者可以併發更新數據,發佈事件槽,互不影響。需要說明的是,如圖中所示,生產者1申請了三個事件槽,發佈了一個事件槽,生產者2申請了兩個事件槽,發佈了一個事件槽。時間上,在生產者1發佈其剩餘的兩個事件槽之前,生產者2發佈的事件槽對於消費則也還是不可見的。所以,每個生產者一定要保證即便發生異常也要發佈事件槽,避免其後的生產者發佈的事件槽對消費者不可見。所以生產則更新數據和發佈事件槽一般是一個try…finally結構。或者使用disruptor提供的EventTranslator機制發佈事件,EventTranslator自動封裝了try…finally結構


tips


消費者的機制與生產者非常類似,本文不再贅述。


使用案例


LMAX應用場景


第一個講LMAX的應用場景,畢竟是催生disruptor的應用場景,所以非常典型。同時,disruptor作為內存消息隊列,怎麼保證宕機的情況下數據不丟失這一關鍵問題在LMAX自身的應用中可以得到一點啟示。


LMAX的機構如圖所示,共包括三部分,Input Disruptor,Business Processor,Output Disruptor。


高性能線程間隊列 DISRUPTOR 簡介


Input Disruptor從網絡接收到消息,在Business Processor處理之前需要完成三種操作:


  • Journal:將收到的信息持久化,在Business Processor線程崩潰的時候恢復數據
  • Replicate:複製信息到其他Business Processor節點
  • Unmarshall:重組信息數據格式,便於Business Processor處理


Business Processor負責業務邏輯處理,並將結果寫入Output Disruptor

Output Disruptor負責讀取Business Processor處理結果,重組數據格式進行網絡傳輸。


重點介紹一下Input Disruptor,Input Disruptor的依賴關係如圖所示:


高性能線程間隊列 DISRUPTOR 簡介


用disruptor的語言編寫就是:

disruptor.handleWith(journal, replacate, unmarshall).then(business)

LMAX為了避免business processor出現異常導致消息的丟失,在business processor處理前將消息全部持久化存儲。當business processor出現異常時,重新處理持久化的數據即可。我們可以借鑑LMAX的這種方式,來避免消息的丟失。更詳細關於LMAX的業務架構介紹可以參考The LMAX Architecture


https://martinfowler.com/articles/lmax.html


log4j 2


以下一段文字引用自Apache log4j 2官網,這段文字足以說明disruptor對log4j 2的性能提升的巨大貢獻。


Log4j 2 contains next-generation Asynchronous Loggers based on the LMAX Disruptor library. In multi-threaded scenarios Asynchronous Loggers have 18 times higher throughput and orders of magnitude lower latency than Log4j 1.x and Logback.


log4j2性能的優越主要體現在異步日誌記錄方面,以下兩個圖片摘自官網分別從吞吐率和響應時間兩個方面體現了log4j2異步日誌性能的強悍。


高性能線程間隊列 DISRUPTOR 簡介

高性能線程間隊列 DISRUPTOR 簡介


log4j2異步日誌的實現就是每次調用將待記錄的日誌寫入disruptor後迅速返回,這樣無需等待信息落盤從而大大提高相應時間。同時,disruptor的事件槽重用機制避免產生大量Java對象,進而避免GC對相應時間和吞吐率的影響,也就是log4j2官網提到的Garbage-free。


文件hash


還有一種比較常見的應用場景是文件hash。如圖所示,需要對大文件進行hash以方便後續處理,由於文件太大,所以把文件分給四個線程分別處理,每個線程讀取相應信息,計算hash值,寫入相應文件。


高性能線程間隊列 DISRUPTOR 簡介


這樣的方法有兩個弊端:


  • 同一個線程內,讀寫相互依賴,互相等待
  • 不同線程可能爭奪同一個輸出文件,需要lock同步


於是改為如下方法,四個線程讀取數據,計算hash值,將信息寫入相應disruptor。每個disruptor對應一個消費者,將disruptor中的信息落盤持久化。對於四個讀取線程而言,只有讀取文件操作,沒有寫文件操作,因此不存在讀寫互相依賴的問題。對於寫線程而言,只存在寫文件操作,沒有讀文件,因此也不存在讀寫互相依賴的問題。同時disruptor的存在又很好的解決了多個線程互相競爭同一個文件的問題,因此可以大大提高程序的吞吐率。


高性能線程間隊列 DISRUPTOR 簡介


分享到:


相關文章: