高併發場景下請求合併的實踐

前言

項目中一般會請求第三方的接口,也會對外提供接口,可能是RPC,也可能是HTTP等方式。在對外提供接口時,有必要提供相應的批量接口,好的批量實現能夠提升性能。

高併發場景中,調用批量接口相比調用非批量接口有更大的性能優勢。但有時候,請求更多的是單個接口,不能夠直接調用批量接口,如果這個接口是高頻接口,對其做請求合併就很有必要了。比如電影網站的獲取電影詳情接口,APP的一次請求是單個接口調用,用戶量少的時候請求也不多,完全沒問題;但同一時刻往往有大量用戶訪問電影詳情,是個高併發的高頻接口,如果都是單次查詢,後臺就不一定能hold住了。為了優化這個接口,後臺可以將相同的請求進行合併,然後調用批量的查詢接口。如下圖所示

高併發場景下請求合併的實踐

無合併請求

合併請求前,我們一般是調用服務層的單次創建方法。看起來都比較簡單,且易於理解。

創建設備接口為例。

@Reference(check = false)

private DeviceService deviceService;

/**

* 註冊設備

*

* @param productKey 產品key

* @param deviceName 設備名

* @return 設備ID

*/

public R registDevice(String productKey, String deviceName){

log.debug("開始註冊: {}, {}", productKey, deviceName);

DeviceRequestDto deviceCreateQuery = new DeviceRequestDto()

.setProductKey(productKey)

.setName(deviceName);

Long deviceId = deviceService.createDevice(deviceCreateQuery);

return deviceId != null

? R.ok(deviceId)

: R.error(DEVICE_CREATE_ERROR);

}

請求合併

請求合併的好處前面有提到,那不能每次寫接口就做請求合併吧?我們要明白,技術無好壞,要在特定的業務場景下衡量利弊,採用與否需要深思熟慮。合併請求會令代碼變得複雜,也會增加一定的接口延遲,其中還可能存在各種未知的風險。

合併請求是針對高併發場景的一種手段,我們實現請求合併之前,要結合業務場景思考一番,是否值得承受的合併帶來的訪問延遲?用戶體驗是否會打折扣?自身的技術是否足夠hold住請求合併帶來的未知風險?

思路:收到前端的請求時,先存起來,隔段時間批量請求第三方服務批量接口,然後分別通知存起來的請求,並且響應前端。

代碼實現

還是針對上述設備註冊接口,我們對其進行改造,來實現一個簡單的請求合併。

1. 批量接口

首先,我們需要有能夠批量調用的接口。在對外提供接口時,也非常有必要提供相應的批量接口,且內部實現應該是優化過的。

此處我們在服務層模擬了一個批量創建設備的接口, 如下:

  • 方法簽名

/**

* 批量創建設備接口

*

* @param deviceRequestDtoList 入參信息

* @return 創建結果

*/

R> batchCreateDevice(List deviceList);

  • 入參

@Data

publicclassDeviceCreateQueryimplementsSerializable{

/**

* 產品標識

*/

private String productKey;

/**

* 設備名稱

*/

private String name;

/**

* 請求源,一次批量請求保證唯一

*/

private String requestSource;

}

  • 返回值

@Data

publicclassDeviceCreateRespimplementsSerializable{

/**

* 設備ID

*/

private Long deviceId;

/**

* 請求源,一次批量請求保證唯一

*/

private String requestSource;

}

2. 合併單個請求

  • 積攢請求的阻塞隊列

private LinkedBlockingDeque deviceCreateQueue = new LinkedBlockingDeque<>();

  • 積攢請求的自定義結構

@Data

staticclassDeviceCreateRequest{

/** 產品key */

private String productKey;

/** 設備名 */

private String deviceName;

/** 請求源,需保證唯一 */

private String requestSource;

/** CompletableFuture接口 */

private CompletableFuture completedFuture;

}

  • 積攢請求

public R registDevice(String productKey, String deviceName){

log.debug("開始註冊: {}, {}", productKey, deviceName);

// 緩存請求 ====== start

CompletableFuture completedFuture = new CompletableFuture<>();

DeviceCreateRequest deviceCreateRequest = new DeviceCreateRequest();

deviceCreateRequest.setProductKey(productKey);

deviceCreateRequest.setDeviceName(deviceName);

deviceCreateRequest.setRequestSource(UUID.randomUUID().toString());

deviceCreateRequest.setCompletedFuture(completedFuture);

deviceCreateQueue.add(deviceCreateRequest);

// 緩存請求 ====== end

Long deviceId = null;

try {

deviceId = completedFuture.get();

} catch (Exception e) {

log.error("設備註冊失敗", e);

}

return deviceId != null

? R.ok(deviceId)

: R.error(DEVICE_CREATE_ERROR);

}

3. 發送批量請求

此處使用了spring,在init方法中利用定時任務線程池批量分發請求。同時使用了newScheduledThreadPool,其中線程池大小

定時間隔時長需要根據業務量做權衡

/** 積攢請求的阻塞隊列 */

private LinkedBlockingDeque deviceCreateQueue = new LinkedBlockingDeque<>();

/** 線程池數量 */

@Value("${iot.register.merge.device.request.num:100}")

privateint createDeviceMergeNum;

/** 定時間隔時長 */

@Value("${iot.register.merge.device.request.period:30}")

privatelong createDeviceMergePeriod;

@Reference(check = false)

private DeviceService deviceService;

@PostConstruct

publicvoidinit(){

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(createDeviceMergeNum);

scheduledExecutorService.scheduleAtFixedRate(() -> {

// 把出queue的請求存儲一次

List questBak = new ArrayList<>();

// 批量創建設備的入參

List deviceCreateQueryList = new ArrayList<>();

int size = deviceCreateQueue.size();

for (int i = 0; i < size; i++) {

DeviceCreateRequest deviceCreateRequest = deviceCreateQueue.poll();

if (Objects.nonNull(deviceCreateRequest)) {

questBak.add(deviceCreateRequest);

deviceCreateQueryList.add(buildDeviceCreateQuery(deviceCreateRequest));

}

}

if (!deviceCreateQueryList.isEmpty()) {

try {

List response = deviceService.batchCreateDevice(deviceCreateQueryList);

Map collect = response.stream()

.collect(Collectors.toMap(

DeviceCreateResp::getRequestSource, DeviceCreateResp::getDeviceId

));

// 通知請求的線程

for (DeviceCreateRequest deviceCreateRequest : questBak) {

deviceCreateRequest.getCompletedFuture().complete(collect.get(deviceCreateRequest.getRequestSource()));

}

} catch (Throwable throwable) {

log.error("批量註冊設備異常", throwable);

// 通知請求的線程-異常

questBak.forEach(deviceCreateRequest -> deviceCreateRequest.getCompletedFuture().obtrudeException(throwable));

}

}

}, 0, createDeviceMergePeriod, TimeUnit.MILLISECONDS);

}

總結

請求合併是解決高併發場景下某些問題的一種思路,本文只做了一個簡單的實現,算是對這塊知識的一次實踐吧。用到了BlockingDeque、CompletableFuture接口,涉及Java多線程相關的知識,實現方式比較野蠻。業界有很多優秀的開源框架做請求合併,比如hystrix,需要花時間好好學習哈哈。

出處:https://www.cnblogs.com/flylinran/p/10177304.html


分享到:


相關文章: