Disruptor實踐整合到現有的爬蟲框架

一. Disruptor

Disruptor 是一個高性能的異步處理框架。

Disruptor 是 LMAX 在線交易平臺的關鍵組成部分,LMAX平臺使用該框架對訂單處理速度能達到600萬TPS,除金融領域之外,其他一般的應用中都可以用到Disruptor,它可以帶來顯著的性能提升。其實 Disruptor 與其說是一個框架,不如說是一種設計思路,這個設計思路對於存在“併發、緩衝區、生產者—消費者模型、事務處理”這些元素的程序來說,Disruptor提出了一種大幅提升性能(TPS)的方案。

二. 實踐

NetDiscovery 是基於 Vert.x、RxJava 2 等框架實現的爬蟲框架。

NetDiscovery 默認的消息隊列採用 JDK 的 ConcurrentLinkedQueue,由於爬蟲框架各個組件都可以被替換,所以下面基於 Disruptor 實現爬蟲的 Queue。

2.1 事件的封裝

將爬蟲的 request 封裝成一個 RequestEvent,該事件會在 Disruptor 中傳輸。

import com.cv4j.netdiscovery.core.domain.Request;
import lombok.Data;
/**
* Created by tony on 2018/9/1.
*/
@Data
public class RequestEvent {
private Request request;
public String toString() {
return request.toString();
}
}

2.2 發佈事件

下面編寫事件的發佈,從 RingBuffer 中獲取下一個可寫入事件的序號,將爬蟲要請求的 request 設置到 RequestEvent 事件中,最後將事件提交到 RingBuffer。

import com.cv4j.netdiscovery.core.domain.Request;
import com.lmax.disruptor.RingBuffer;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by tony on 2018/9/2.
*/
public class Producer {
private final RingBuffer<requestevent> ringBuffer;
private AtomicInteger count = new AtomicInteger(0); // 計數器
public Producer(RingBuffer<requestevent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void pushData(Request request){
long sequence = ringBuffer.next();
try{
RequestEvent event = ringBuffer.get(sequence);
event.setRequest(request);
}finally {
ringBuffer.publish(sequence);
count.incrementAndGet();
}
}
/**
* 發送到隊列中到Request的數量
* @return
*/
public int getCount() {
return count.get();
}
}

/<requestevent>/<requestevent>

2.3 消費事件

RequestEvent 設置了 request 之後,消費者需要處理具體的事件。下面的 Consumer 僅僅是記錄消費者的線程名稱以及 request。真正的“消費”還是需要從 DisruptorQueue 的 poll() 中獲取 request ,然後在 Spider 中進行“消費”。

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by tony on 2018/9/2.
*/
@Slf4j
public class Consumer implements WorkHandler<requestevent> {
@Override
public void onEvent(RequestEvent requestEvent) throws Exception {
log.info("consumer:" + Thread.currentThread().getName() + " requestEvent: value=" + requestEvent.toString());
}
}
複製代碼
/<requestevent>

2.4 DisruptorQueue 的實現

Disruptor 支持單生產者單消費者、多生產者、多消費者、分組等方式。

在 NetDiscovery 中採用多生產者多消費者。

在 RingBuffer 創建時,ProducerType 使用 MULTI 類型表示多生產者。創建 RingBuffer 採用了 YieldingWaitStrategy 。YieldingWaitStrategy 是一種WaitStrategy,不同的 WaitStrategy 會有不同的性能。

YieldingWaitStrategy 性能是最好的,適合用於低延遲的系統。在要求極高性能且事件處理線數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。

 ringBuffer = RingBuffer.create(ProducerType.MULTI,
new EventFactory<requestevent>() {
@Override
public RequestEvent newInstance() {
return new RequestEvent();
}
},
ringBufferSize ,
new YieldingWaitStrategy());


/<requestevent>

EventProcessor 用於處理 Disruptor 中的事件。

EventProcessor 的實現類包括:BatchEventProcessor 用於單線程批量處理事件,WorkProcessor 用於多線程處理事件。

WorkerPool 管理著一組 WorkProcessor。創建完 ringBuffer 之後,創建 workerPool:

 SequenceBarrier barriers = ringBuffer.newBarrier();
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Consumer();
}
workerPool = new WorkerPool<requestevent>(ringBuffer,
barriers,
new EventExceptionHandler(),
consumers);

/<requestevent>

啟動 workerPool:

 ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(Executors.newFixedThreadPool(threadNum));

最後是 DisruptorQueue 完整的代碼:

import com.cv4j.netdiscovery.core.domain.Request;
import com.cv4j.netdiscovery.core.queue.AbstractQueue;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by tony on 2018/9/1.
*/
@Slf4j
public class DisruptorQueue extends AbstractQueue {
private RingBuffer<requestevent> ringBuffer;
private Consumer[] consumers = null;
private Producer producer = null;

private WorkerPool<requestevent> workerPool = null;
private int ringBufferSize = 1024*1024; // RingBuffer 大小,必須是 2 的 N 次方
private AtomicInteger consumerCount = new AtomicInteger(0);
private static final int CONSUME_NUM = 2;
private static final int THREAD_NUM = 4;
public DisruptorQueue() {
this(CONSUME_NUM,THREAD_NUM);
}
public DisruptorQueue(int consumerNum,int threadNum) {
consumers = new Consumer[consumerNum];
//創建ringBuffer
ringBuffer = RingBuffer.create(ProducerType.MULTI,
new EventFactory<requestevent>() {
@Override
public RequestEvent newInstance() {
return new RequestEvent();
}
},
ringBufferSize ,
new YieldingWaitStrategy());
SequenceBarrier barriers = ringBuffer.newBarrier();
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Consumer();
}
workerPool = new WorkerPool<requestevent>(ringBuffer,
barriers,
new EventExceptionHandler(),
consumers);
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(Executors.newFixedThreadPool(threadNum));
producer = new Producer(ringBuffer);
}
@Override
protected void pushWhenNoDuplicate(Request request) {
producer.pushData(request);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public Request poll(String spiderName) {
Request request = ringBuffer.get(ringBuffer.getCursor() - producer.getCount() +1).getRequest();
ringBuffer.next();
consumerCount.incrementAndGet();
return request;
}
@Override

public int getLeftRequests(String spiderName) {
return producer.getCount()-consumerCount.get();
}
public int getTotalRequests(String spiderName) {
return super.getTotalRequests(spiderName);
}
static class EventExceptionHandler implements ExceptionHandler {
public void handleEventException(Throwable ex, long sequence, Object event) {
log.debug("handleEventException:" + ex);
}
public void handleOnStartException(Throwable ex) {
log.debug("handleOnStartException:" + ex);
}
public void handleOnShutdownException(Throwable ex) {
log.debug("handleOnShutdownException:" + ex);
}
}
}

/<requestevent>/<requestevent>/<requestevent>/<requestevent>

其中,pushWhenNoDuplicate() 是將 request 發送到 ringBuffer 中。poll() 是從 ringBuffer 中取出對應的 request ,用於爬蟲進行網絡請求、解析請求等處理。


分享到:


相關文章: