Springboot + RabbitMQ實現消息延遲重試具體實現

一:簡介

使用場景:調用第三方接口時如果調用失敗需要隔幾秒再去嘗試下一次調用,直到調用N次還失敗就停止調用。最常用的場景就是支付成功異步通知第三方支付成功。

1. 為什麼要調用多次?

如果調用1次就成功了就停止調用,如果失敗可能由於網絡原因沒有請求到服務器需要再次嘗試,第二次很可能就會調用成功了。

2. 為什麼要間隔幾秒再嘗試下次調用?

如果是因為網絡原因沒有請求到服務器如果再立刻調用,很可能此時網絡還是沒有好,可能等幾秒後網絡就恢復了,此時再去調用就好了。

實現效果類似於支付寶中的回調延遲重試:

純乾貨 | Springboot + RabbitMQ實現消息延遲重試具體實現

二: 實現原理

在介紹具體的實現思路之前,我們先來介紹一下RabbitMQ的兩個特性,一個是Time-To-Live Extensions,另一個是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允許我們為消息或者隊列設置TTL(time to live),也就是過期時間。TTL表明了一條消息可在隊列中存活的最大時間,單位為毫秒。也就是說,當某條消息被設置了TTL或者當某條消息進入了設置了TTL的隊列時,這條消息會在經過TTL秒後“死亡”,成為Dead Letter。如果既配置了消息的TTL,又配置了隊列的TTL,那麼較小的那個值會被取用。更多資料請查閱官方文檔。

Dead Letter Exchange

剛才提到了,被設置了TTL的消息在過期後會成為Dead Letter。其實在RabbitMQ中,一共有三種消息的“死亡”形式:

消息被拒絕。通過調用basic.reject或者basic.nack並且設置的requeue參數為false。消息因為設置了TTL而過期。消息進入了一條已經達到最大長度的隊列。 如果隊列設置了Dead Letter Exchange(DLX),那麼這些Dead Letter就會被重新publish到Dead Letter Exchange,通過Dead Letter Exchange路由到其他隊列。更多資料請查閱官方文檔。

通過RabbitMQ的TTL和DLX特性結合在一起,實現一個延遲隊列。

針對於上述的延遲隊列的兩個場景,我們分別有以下兩種流程圖:

1. 延遲消費

延遲消費是延遲隊列最為常用的使用模式。如下圖所示,生產者產生的消息首先會進入緩衝隊列(圖中紅色隊列)。通過RabbitMQ提供的TTL擴展,這些消息會被設置過期時間,也就是延遲消費的時間。等消息過期之後,這些消息會通過配置好的DLX轉發到實際消費隊列(圖中藍色隊列),以此達到延遲消費的效果。

純乾貨 | Springboot + RabbitMQ實現消息延遲重試具體實現

2. 延遲重試

延遲重試本質上也是延遲消費的一種,但是這種模式的結構與普通的延遲消費的流程圖較為不同,所以單獨拎出來介紹。

如下圖所示,消費者發現該消息處理出現了異常,比如是因為網絡波動引起的異常。那麼如果不等待一段時間,直接就重試的話,很可能會導致在這期間內一直無法成功,造成一定的資源浪費。那麼我們可以將其先放在緩衝隊列中(圖中紅色隊列),等消息經過一段的延遲時間後再次進入實際消費隊列中(圖中藍色隊列),此時由於已經過了“較長”的時間了,異常的一些波動通常已經恢復,這些消息可以被正常地消費。

純乾貨 | Springboot + RabbitMQ實現消息延遲重試具體實現


三:代碼示例

功能示例:每隔2秒、4秒、8秒、16秒去重試調用接口,總共調用4次。

純乾貨 | Springboot + RabbitMQ實現消息延遲重試具體實現

  1. pom.xml
<code>   <dependency>

<groupid>org.springframework.boot/<groupid>

<artifactid>spring-boot-starter-amqp/<artifactid>

/<dependency>

<dependency>

<groupid>org.apache.httpcomponents/<groupid>

<artifactid>httpclient/<artifactid>

<version>4.5.6/<version>

/<dependency>

<dependency>

<groupid>com.alibaba/<groupid>

<artifactid>fastjson/<artifactid>

<version>1.2.62/<version>

/<dependency>

<dependency>

<groupid>org.projectlombok/<groupid>

<artifactid>lombok/<artifactid>

<optional>true/<optional>

/<dependency>
/<code>
  1. application.yml
<code>   spring:

rabbitmq:

host: localhost

port: 5672

username: guest

password: guest




## http component

http:

maxTotal: 100 #最大連接數

defaultMaxPerRoute: 20 # 併發數

connectTimeout: 1000 #創建連接的最長時間

connectionRequestTimeout: 500 #從連接池中獲取到連接的最長時間

socketTimeout: 10000 #數據傳輸的最長時間

validateAfterInactivity: 1000
/<code>
  1. HttpClient
<code>   import org.apache.http.client.config.RequestConfig;

import org.apache.http.impl.client.CloseableHttpClient;

import org.apache.http.impl.client.HttpClientBuilder;

import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;



@Configuration

public class HttpClientConfig {



@Value("${http.maxTotal}")

private Integer maxTotal;



@Value("${http.defaultMaxPerRoute}")

private Integer defaultMaxPerRoute;



@Value("${http.connectTimeout}")

private Integer connectTimeout;



@Value("${http.connectionRequestTimeout}")

private Integer connectionRequestTimeout;



@Value("${http.socketTimeout}")

private Integer socketTimeout;



@Value("${http.validateAfterInactivity}")

private Integer validateAfterInactivity;





@Bean

public PoolingHttpClientConnectionManager poolingHttpClientConnectionManager(){

PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();

connectionManager.setMaxTotal(maxTotal);

connectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute);

connectionManager.setValidateAfterInactivity(validateAfterInactivity);

return connectionManager;

}



@Bean

public HttpClientBuilder httpClientBuilder(PoolingHttpClientConnectionManager poolingHttpClientConnectionManager){

HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();

httpClientBuilder.setConnectionManager(poolingHttpClientConnectionManager);



return httpClientBuilder;

}



@Bean

public CloseableHttpClient closeableHttpClient(HttpClientBuilder httpClientBuilder){

return httpClientBuilder.build();

}





@Bean

public RequestConfig.Builder builder(){

RequestConfig.Builder builder = RequestConfig.custom();

return builder.setConnectTimeout(connectTimeout)

.setConnectionRequestTimeout(connectionRequestTimeout)

.setSocketTimeout(socketTimeout);

}



@Bean

public RequestConfig requestConfig(RequestConfig.Builder builder){

return builder.build();

}

}
/<code>
<code>   import com.alibaba.fastjson.JSONObject;

import lombok.extern.slf4j.Slf4j;

import org.apache.http.HttpHeaders;

import org.apache.http.HttpStatus;

import org.apache.http.client.config.RequestConfig;

import org.apache.http.client.methods.CloseableHttpResponse;

import org.apache.http.client.methods.HttpGet;

import org.apache.http.client.methods.HttpPost;

import org.apache.http.client.utils.URIBuilder;

import org.apache.http.entity.ContentType;

import org.apache.http.entity.StringEntity;

import org.apache.http.impl.client.CloseableHttpClient;

import org.apache.http.util.EntityUtils;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;



import java.util.Map;





@Slf4j


@Component

public class HttpClient {



/** 默認字符集 */

public static final String DEFAULT_CHARSET = "UTF-8";



@Autowired

private CloseableHttpClient closeableHttpClient;



@Autowired

private RequestConfig config;







public T doPost(String url, Map<string> requestParameter, Class clazz) throws Exception {

HttpResponse httpResponse = this.doPost(url, requestParameter);

if (clazz == String.class) {

return (T) httpResponse.getBody();

}

T response = JSONObject.parseObject(httpResponse.getBody(), clazz);

return response;

}



public HttpResponse doPost(String url, Map<string> requestParameter) throws Exception {


HttpPost httpPost = new HttpPost(url);

httpPost.setConfig(config);

httpPost.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());



if (requestParameter != null) {

String requestBody = JSONObject.toJSONString(requestParameter);

StringEntity postEntity = new StringEntity(requestBody, "UTF-8");

httpPost.setEntity(postEntity);

}



CloseableHttpResponse response = this.closeableHttpClient.execute(httpPost);

// 對請求的響應進行簡單的包裝成自定義的類型

return new HttpResponse(response.getStatusLine().getStatusCode(), EntityUtils.toString(

response.getEntity(), DEFAULT_CHARSET));

}







/**

* 封裝請求的響應碼和響應的內容

*/

public class HttpResponse {

/** http status */

private Integer code;

/** http response content */

private String body;



public HttpResponse() { }



public HttpResponse(Integer code, String body) {

this.code = code;

this.body = body;

}



public Integer getCode() {

return code;

}



public void setCode(Integer code) {

this.code = code;

}



public String getBody() {

return body;

}



public void setBody(String body) {

this.body = body;

}

}

}
/<string>
/<string>
/<code>
  1. MessagePostProcessor
<code>   import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;



/**

* 設置消息過期時間

*/

public class ExpirationMessagePostProcessor implements MessagePostProcessor{



private final Long ttl;



public ExpirationMessagePostProcessor(Long ttl) {

this.ttl = ttl;

}



@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 設置失效時間

message.getMessageProperties().setExpiration(ttl.toString());

return message;

}

}
/<code>
  1. 聲明隊列和交換機
<code>   package com.example.rabbitmq.retry;



import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;



/**

* 延遲隊列

*/

@Configuration

public class DelayQueueConfig {



/** 緩衝隊列名稱:過期時間針對於每個消息 */

public final static String DELAY_QUEUE_PER_MESSAGE_TTL_NAME = "delay_queue_per_message_ttl";



/** 死亡交換機:過期消息將通過該死亡交換機放入到實際消費的隊列中 */


public final static String DELAY_EXCHANGE_NAME = "delay_exchange";



/** 死亡交換機對應的路由鍵,通過該路由鍵路由到實際消費的隊列 */

public final static String DELAY_PROCESS_QUEUE_NAME = "delay_process_queue";



/** 路由到 delay_queue_per_message_ttl(統一失效時間的隊列)的exchange(用於隊列延遲重試) */

public final static String PER_MESSAGE_TTL_EXCHANGE_NAME = "per_message_ttl_exchange";





/**

* delay_queue_per_message_ttl

* 每個消息都可以控制自己的失效時間

* x-dead-letter-exchange聲明瞭隊列裡的死信轉發到的DLX名稱

* x-dead-letter-routing-key聲明瞭這些死信在轉發時攜帶的routing-key名稱

*/

@Bean

Queue delayQueuePerMessageTTL() {

return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)

.withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME)

.withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME)


.build();

}



/**

* 死亡交換機 DLX

* @return

*/

@Bean

DirectExchange delayExchange() {

return new DirectExchange(DELAY_EXCHANGE_NAME);

}



/**

* 實際消費隊列:過期之後會進入到該隊列中來

* @return

*/

@Bean

Queue delayProcessQueue() {

return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME).build();

}





/**

* 將死亡交換機delayExchange和實際消費隊列delay_process_queue綁定在一起,並攜帶路由鍵delay_process_queue


* @param delayProcessQueue

* @param delayExchange

* @return

*/

@Bean

Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {

return BindingBuilder.bind(delayProcessQueue)

.to(delayExchange)

.with(DELAY_PROCESS_QUEUE_NAME);

}





/**

* 重試交換機:消費失敗後通過該交換機轉發到隊列中

* @return

*/

@Bean

DirectExchange perMessageTtlExchange() {

return new DirectExchange(PER_MESSAGE_TTL_EXCHANGE_NAME);

}





/**

* 重試交換機和緩衝隊列綁定


* @param delayQueuePerMessageTTL 緩衝隊列

* @param perMessageTtlExchange 重試交換機

* @return

*/

@Bean

Binding messageTtlBinding(Queue delayQueuePerMessageTTL, DirectExchange perMessageTtlExchange) {

return BindingBuilder.bind(delayQueuePerMessageTTL)

.to(perMessageTtlExchange)

.with(DELAY_QUEUE_PER_MESSAGE_TTL_NAME);

}

}
/<code>
純乾貨 | Springboot + RabbitMQ實現消息延遲重試具體實現

  1. 消費消息
<code>   import com.alibaba.fastjson.JSONObject;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;



import java.util.Date;

import java.util.Map;

import java.util.Objects;



/**

* 支付回調重試

* 2秒、4秒、8秒後分別重試,算上0秒這次總共重試4次

*/

@Slf4j

@Component

public class CallbackConsumer {



@Autowired

private AmqpTemplate rabbitTemplate;




@Autowired

private HttpClient httpClient;



/** 最大重試次數 */

private Integer maxRetryTime = 4;





@RabbitListener(queues = DelayQueueConfig.DELAY_PROCESS_QUEUE_NAME)

public void process(String msg) {

log.info("----------date = {}, msg ={}", new Date(), msg);

Map<string> callbackRequestMap = JSONObject.parseObject(msg, Map.class);

// 重試次數

Integer retryTime = (Integer)callbackRequestMap.get("retryTime");

String notifyUrl = (String)callbackRequestMap.get("notifyUrl");



try {

if (maxRetryTime <= 4) {

log.info("################Callback retry time = {}, date = {}################", retryTime, new Date());

String response = httpClient.doPost(notifyUrl, callbackRequestMap, String.class);

if (Objects.equals("SUCCESS", response)) {

log.info("ok");

} else {

// 下一次重試

log.error("error request={} response={}", callbackRequestMap, response);

if (retryTime == maxRetryTime) {

lastTimeRetryResult(response, null);

return;

}

retryCallback(callbackRequestMap);

}

}

} catch (Exception e) {

if (maxRetryTime < maxRetryTime) {

retryCallback(callbackRequestMap);

} else {

lastTimeRetryResult(null, e);

}

}

}





/**

* 嘗試下一次回調

* @param callbackRequestMap 請求參數

*/

private void retryCallback(Map<string> callbackRequestMap) {

// 下次重試,重試次數+1

Integer retryTime = (Integer)callbackRequestMap.get("retryTime");

callbackRequestMap.put("retryTime", retryTime + 1);



// 下一次延遲是上一次延遲的2倍時間

long expiration = retryTime * 2000;

String callbackMessage = JSONObject.toJSONString(callbackRequestMap);

rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,

(Object) callbackMessage,

new ExpirationMessagePostProcessor(expiration));

}



/**

* 記錄最後一次重試的結果

* @param response

* @param e

*/

private void lastTimeRetryResult(String response, Exception e) {

log.error("last time retry, response={}", response);

}

}
/<string>/<string>/<code>
  1. 模擬回調
<code>   package com.example.rabbitmq.retry;



import com.alibaba.fastjson.JSONObject;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;



import java.util.HashMap;

import java.util.Map;

import java.util.UUID;



@RestController

@RequestMapping("/mock")

public class MockController {



@Autowired

private AmqpTemplate rabbitTemplate;



@RequestMapping("/callback")

public void callback(String notifyUrl) {

Map<string> requestMap = new HashMap<>();

requestMap.put("retryTime", 1);

requestMap.put("notifyUrl", notifyUrl);

requestMap.put("orderCode", UUID.randomUUID().toString());

Object callbackMessage = JSONObject.toJSONString(requestMap);





rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME, callbackMessage, new ExpirationMessagePostProcessor(0L));

}

}
/<string>/<code>
純乾貨 | Springboot + RabbitMQ實現消息延遲重試具體實現

純乾貨 | Springboot + RabbitMQ實現消息延遲重試具體實現

歡迎關注Java實用技術,每天發佈一篇實用技術文章。


分享到:


相關文章: