niceaz.com/高性能線程間隊列disruptor簡介/#more-189
disruptor簡介
背景
Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題。與Kafka(Apache Kafka)、RabbitMQ(RabbitMQ)用於服務間的消息隊列不同,disruptor一般用於線程間消息的傳遞。基於Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講後,獲得了業界關注。2011年,企業應用軟件專家Martin Fowler專門撰寫長文介紹The LMAX Architecture。同年它還獲得了Oracle官方的Duke大獎。其他關於disruptor的背景就不在此多言,可以自己google。
https://martinfowler.com/articles/lmax.html
官方資料
disruptor github wiki有關於disruptor相關概念和原理的介紹,該wiki已經很久沒有更新。像Design and Implementation,對於想了解disruptor的人是很有吸引力的,但是隻有題目沒有內容,還是很遺憾的。本文稍後會對其內部原理做一個介紹性的描述。
disruptor github wiki:
Home · LMAX-Exchange/disruptor Wiki
https://github.com/LMAX-Exchange/disruptor/wiki
disruptor github:
LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library
https://github.com/LMAX-Exchange/disruptor
這個地方也有很多不錯的資料:
Disruptor by LMAX-Exchange
https://lmax-exchange.github.io/disruptor/
性能
disruptor是用於一個JVM中多個線程之間的消息隊列,作用與ArrayBlockingQueue有相似之處,但是disruptor從功能、性能都遠好於ArrayBlockingQueue,當多個線程之間傳遞大量數據或對性能要求較高時,可以考慮使用disruptor作為ArrayBlockingQueue的替代者。
官方也對disruptor和ArrayBlockingQueue的性能在不同的應用場景下做了對比,本文列出其中一組數據,數據中P代表producer,C代表consumer,ABS代表ArrayBlockingQueue:
完整的官方性能測試數據在Performance Results · LMAX-Exchange/disruptor Wiki可以看到,性能測試的代碼已經包含在disruptor的代碼中,你完全可以git下來在自己的主機上測試一下看看
https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
如何使用
單生產者,單消費者
//聲明disruptor中事件類型及對應的事件工廠
private class LongEvent {
private long value;
public LongEvent() {
this.value = 0L;
}
public void set(long value) {
this.value = value;
}
public long get() {
return this.value;
}
}
private EventFactory<longevent> eventFactory = new EventFactory<longevent>() { /<longevent>/<longevent>
public LongEvent newInstance() {
return new LongEvent();
}
};
private int ringBufferSize = 1024;
private Executor executor = Executors.newFixedThreadPool(8);
private Disruptor<longevent> disruptor = new Disruptor<longevent>(eventFactory, ringBufferSize, executor);/<longevent>/<longevent>
//pubisher邏輯,將原始數據轉換為event,publish到ringbuffer
private class Publisher implements EventTranslatorOneArg<longevent> {/<longevent>
public void translateTo(LongEvent event, long sequence, String arg0) {
event.set(Long.parseLong(arg0));
}
}
//consumer邏輯,獲取event進行處理
private class Consumer implements EventHandler<longevent> {/<longevent>
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
long value = event.get();
int index = (int) (value % Const.NUM_OF_FILE);
fileWriter[index].write("" + value + "\\n");
if(value == Long.MAX_VALUE) {
isFinish = true;
}
}
}
//註冊consumer啟動disruptor
disruptor.handleEventsWith(new Consumer());
disruptor.start();
//獲取disruptor的ringbuffer,用於生產數據
private RingBuffer<longevent> ringBuffer = disruptor.getRingBuffer();/<longevent>
ringBuffer.publishEvent(new Publisher(), line);
多生產者
多生產者的改動相對簡單,只需將disruptor的聲明換一個構造函數即可,但是多生產者ringbuffer的處理邏輯完全不同,只是這些不同對使用者透明,本文將在後邊討論單生產者,多生產者ringbuffer邏輯的不同
private Disruptor<longevent> disruptor1 = new Disruptor<longevent>(eventFactory, ringBufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy());/<longevent>/<longevent>
多消費者
多消費者的情況分為兩類:
- 廣播:對於多個消費者,每條信息會達到所有的消費者,被多次處理,一般每個消費者業務邏輯不通,用於同一個消息的不同業務邏輯處理
- 分組:對於同一組內的多個消費者,每條信息只會被組內一個消費者處理,每個消費者業務邏輯一般相同,用於多消費者併發處理一組消息
廣播
- 消費者之間無依賴關係
假設目前有handler1,handler2,handler3三個消費者處理一批消息,每個消息都要被三個消費者處理到,三個消費者無依賴關係,則如下所示即可
disruptor.handleEventsWith(handler1,handler2,handler3);
- 消費者之間有依賴關係
假設handler3必須在handler1,handler2處理完成後進行處理
disruptor.handleEventsWith(handler1,handler2).then(handler3);
其他情況可視為以上兩種情況的排列組合
分組
分組情況稍微不同,對於消費者,需要實現WorkHandler而不是EventHandler,藉口定義分別如下所示:
public interface EventHandler
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain.
*/
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
public interface WorkHandler
{
/**
* Callback to indicate a unit of work needs to be processed.
*
* @param event published to the {@link RingBuffer}
* @throws Exception if the {@link WorkHandler} would like the exception handled further up the chain.
*/
void onEvent(T event) throws Exception;
}
假設handler1,handler2,handler3都實現了WorkHandler,則調用以下代碼就可以實現分組
disruptor.handleEventsWithWorkerPool(handler1, handler2, handler3);
廣播和分組之間也是可以排列組合的
tips
disruptor也提供了函數讓你自定義消費者之間的關係,如
public EventHandlerGroup
當然,必須對disruptor有足夠的瞭解才能正確的在EventProcessor中實現多消費者正確的邏輯
實現原理
為何高效
事件預分配
在定義disruptor的時候我們需要指定事件工廠EventFactory的邏輯,disruptor內部的ringbuffer的數據結構是數組,EventFactory就用於disruptor初始化時數組每個元素的填充。生產者開始後,是通過獲取對應位置的Event,調用Event的setter函數更新Event達到生產數據的目的的。為什麼這樣?假設使用LinkedList,在生產消費的場景下生產者會產生大量的新節點,新節點被消費後又需要被回收,頻繁的生產消費給GC帶來很大的壓力。使用數組後,在內存中存在的是一塊大小穩定的內存,頻繁的生產消費對GC並沒有什麼影響,大大減小了系統的最慢響應時間,更不會因為消費者的滯後導致OOM的發生。因此這種事件預分配的方法對於減輕GC壓力可以說是一種簡單有效的方法,日常工作中的借鑑意義還是很大的。
無鎖算法
先看一段ABQ put算法的實現:
- 每個對象一個鎖,首先加鎖
- 如果數組是滿的,加入鎖的notFull條件等待隊列。(notFull的具體機制可以看這裡的一篇文章wait、notify與Condition | forever)
- 元素加入數組
- 釋放鎖
http://niceaz.com/wait%e3%80%81notify%e4%b8%8econdition/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
通過以上代碼說明兩點:
- ABQ是通過lock機制實現的線程同步
- ABQ的所有操作共用同一個lock,故所有操作均是互斥的
這篇文章中講述了一個實驗, 測試程序調用了一個函數,該函數會對一個64位的計數器循環自增5億次,在2.4G 6核機器上得到了如下的實驗數據:
http://mechanitis.blogspot.com/2011/07/dissecting-disruptor-why-its-so-fast.html
實驗數據說明,使用CAS機制比使用lock機制快了一個數量級
另一方面,ABQ的所有操作都是互斥的,這點其實不是必要的,尤其像put和get操作,沒必要共享一個lock,完全可以降低鎖的粒度提高性能。
disruptor則與之不同:
disruptor使用了CAS機制同步線程,線程同步代價小於lock
disruptor遵守single writer原則,一塊內存對應單個線程,不僅produce和consume不是互斥的,多線程的produce也不是互斥的
偽共享
偽共享一直是一個比較高級的話題,Doug lea在JDK的Concurrent使用了大量的緩存行機制避免偽共享,disruptor也是用了這樣的機制。但是對於廣大的碼農而言,實際工作中我們可能很少會需要使用這樣的機制。畢竟對於大部分人而言,與避免偽共享帶來的性能提升而言,優化工程架構,算法,io等可能會給我們帶來更大的性能提升。所以本文只簡單提到這個話題,並不深入講解,畢竟我也沒有實際的應用經驗去講解這個話題。
單生產者模式
如圖所示,圖中數組代表ringbuffer,紅色元素代表已經發布過的事件槽,綠色元素代表將要發佈的事件槽,白色元素代表尚未利用的事件槽。disruptor生產時間包括三個階段:申請事件槽,更新數據,發佈事件槽。單生產者相對簡單,
- 申請事件槽:此時,ringbuffer會將cursor後的一個事件槽返回給用戶,但不更新cursor,所以對於消費者而言,該事件還是不可見的。
- 更新數據:生產者對該事件槽數據進行更新,
- 發佈事件槽:發佈的過程就是移動cursor的過程,完成移動cursor後,發佈完成,該事件對生產者可見。
多生產者模式
多生產者的模式相對就比較複雜,也體現了disuptor是如何利用CAS機制進行的線程間同步,並保證多個生產者的生產不互斥。如圖所示,紅色的代表已經發布的事件,淡綠色代表生產者1申請的事件槽,淡黃色代表生產者2申請的事件槽。
- 申請事件槽:多生產者生產數據的過程就是移動cursor的過程,多個線程同時使用CAS操作更新cursor的值,哪個線程成功的更新了cursor的值哪個線程就成功申請了事件槽,而其他的線程則利用CAS操作繼續嘗試更新cursor的值。申請成功後cursor的值已經發生了改變,那怎麼保證在該事件槽發佈之前對消費者不可見呢?disruptor額外利用了一個數組,如圖中所示。深黃色代表相應的事件槽已經發布,白色代表相應的事件槽尚未發佈。disruptor使用了UNSAFE類對該數組進行操作,從而保證數組值更新的高效性。
- 更新數據:生產者按序將成功申請到的事件槽數據進行更新
- 發佈事件槽:生產者將對應數組的標誌位更新
多個生產者生產數據唯一的競爭就發生在cursor值的更新,disruptor使用CAS操作更新cursor的值從而避免使用了鎖。申請數據之後,多個生產者可以併發更新數據,發佈事件槽,互不影響。需要說明的是,如圖中所示,生產者1申請了三個事件槽,發佈了一個事件槽,生產者2申請了兩個事件槽,發佈了一個事件槽。時間上,在生產者1發佈其剩餘的兩個事件槽之前,生產者2發佈的事件槽對於消費則也還是不可見的。所以,每個生產者一定要保證即便發生異常也要發佈事件槽,避免其後的生產者發佈的事件槽對消費者不可見。所以生產則更新數據和發佈事件槽一般是一個try…finally結構。或者使用disruptor提供的EventTranslator機制發佈事件,EventTranslator自動封裝了try…finally結構
tips
消費者的機制與生產者非常類似,本文不再贅述。
使用案例
LMAX應用場景
第一個講LMAX的應用場景,畢竟是催生disruptor的應用場景,所以非常典型。同時,disruptor作為內存消息隊列,怎麼保證宕機的情況下數據不丟失這一關鍵問題在LMAX自身的應用中可以得到一點啟示。
LMAX的機構如圖所示,共包括三部分,Input Disruptor,Business Processor,Output Disruptor。
Input Disruptor從網絡接收到消息,在Business Processor處理之前需要完成三種操作:
- Journal:將收到的信息持久化,在Business Processor線程崩潰的時候恢復數據
- Replicate:複製信息到其他Business Processor節點
- Unmarshall:重組信息數據格式,便於Business Processor處理
Business Processor負責業務邏輯處理,並將結果寫入Output Disruptor
Output Disruptor負責讀取Business Processor處理結果,重組數據格式進行網絡傳輸。
重點介紹一下Input Disruptor,Input Disruptor的依賴關係如圖所示:
用disruptor的語言編寫就是:
disruptor.handleWith(journal, replacate, unmarshall).then(business)
LMAX為了避免business processor出現異常導致消息的丟失,在business processor處理前將消息全部持久化存儲。當business processor出現異常時,重新處理持久化的數據即可。我們可以借鑑LMAX的這種方式,來避免消息的丟失。更詳細關於LMAX的業務架構介紹可以參考The LMAX Architecture
https://martinfowler.com/articles/lmax.html
log4j 2
以下一段文字引用自Apache log4j 2官網,這段文字足以說明disruptor對log4j 2的性能提升的巨大貢獻。
Log4j 2 contains next-generation Asynchronous Loggers based on the LMAX Disruptor library. In multi-threaded scenarios Asynchronous Loggers have 18 times higher throughput and orders of magnitude lower latency than Log4j 1.x and Logback.
log4j2性能的優越主要體現在異步日誌記錄方面,以下兩個圖片摘自官網分別從吞吐率和響應時間兩個方面體現了log4j2異步日誌性能的強悍。
log4j2異步日誌的實現就是每次調用將待記錄的日誌寫入disruptor後迅速返回,這樣無需等待信息落盤從而大大提高相應時間。同時,disruptor的事件槽重用機制避免產生大量Java對象,進而避免GC對相應時間和吞吐率的影響,也就是log4j2官網提到的Garbage-free。
文件hash
還有一種比較常見的應用場景是文件hash。如圖所示,需要對大文件進行hash以方便後續處理,由於文件太大,所以把文件分給四個線程分別處理,每個線程讀取相應信息,計算hash值,寫入相應文件。
這樣的方法有兩個弊端:
- 同一個線程內,讀寫相互依賴,互相等待
- 不同線程可能爭奪同一個輸出文件,需要lock同步
於是改為如下方法,四個線程讀取數據,計算hash值,將信息寫入相應disruptor。每個disruptor對應一個消費者,將disruptor中的信息落盤持久化。對於四個讀取線程而言,只有讀取文件操作,沒有寫文件操作,因此不存在讀寫互相依賴的問題。對於寫線程而言,只存在寫文件操作,沒有讀文件,因此也不存在讀寫互相依賴的問題。同時disruptor的存在又很好的解決了多個線程互相競爭同一個文件的問題,因此可以大大提高程序的吞吐率。
閱讀更多 程序員BUG 的文章