一:簡介
使用場景:調用第三方接口時如果調用失敗需要隔幾秒再去嘗試下一次調用,直到調用N次還失敗就停止調用。最常用的場景就是支付成功異步通知第三方支付成功。
1. 為什麼要調用多次?
如果調用1次就成功了就停止調用,如果失敗可能由於網絡原因沒有請求到服務器需要再次嘗試,第二次很可能就會調用成功了。
2. 為什麼要間隔幾秒再嘗試下次調用?
如果是因為網絡原因沒有請求到服務器如果再立刻調用,很可能此時網絡還是沒有好,可能等幾秒後網絡就恢復了,此時再去調用就好了。
實現效果類似於支付寶中的回調延遲重試:
二: 實現原理
在介紹具體的實現思路之前,我們先來介紹一下RabbitMQ的兩個特性,一個是Time-To-Live Extensions,另一個是Dead Letter Exchanges。
Time-To-Live Extensions
RabbitMQ允許我們為消息或者隊列設置TTL(time to live),也就是過期時間。TTL表明了一條消息可在隊列中存活的最大時間,單位為毫秒。也就是說,當某條消息被設置了TTL或者當某條消息進入了設置了TTL的隊列時,這條消息會在經過TTL秒後“死亡”,成為Dead Letter。如果既配置了消息的TTL,又配置了隊列的TTL,那麼較小的那個值會被取用。更多資料請查閱官方文檔。
Dead Letter Exchange
剛才提到了,被設置了TTL的消息在過期後會成為Dead Letter。其實在RabbitMQ中,一共有三種消息的“死亡”形式:
消息被拒絕。通過調用basic.reject或者basic.nack並且設置的requeue參數為false。消息因為設置了TTL而過期。消息進入了一條已經達到最大長度的隊列。 如果隊列設置了Dead Letter Exchange(DLX),那麼這些Dead Letter就會被重新publish到Dead Letter Exchange,通過Dead Letter Exchange路由到其他隊列。更多資料請查閱官方文檔。
通過RabbitMQ的TTL和DLX特性結合在一起,實現一個延遲隊列。
針對於上述的延遲隊列的兩個場景,我們分別有以下兩種流程圖:
1. 延遲消費
延遲消費是延遲隊列最為常用的使用模式。如下圖所示,生產者產生的消息首先會進入緩衝隊列(圖中紅色隊列)。通過RabbitMQ提供的TTL擴展,這些消息會被設置過期時間,也就是延遲消費的時間。等消息過期之後,這些消息會通過配置好的DLX轉發到實際消費隊列(圖中藍色隊列),以此達到延遲消費的效果。
2. 延遲重試
延遲重試本質上也是延遲消費的一種,但是這種模式的結構與普通的延遲消費的流程圖較為不同,所以單獨拎出來介紹。
如下圖所示,消費者發現該消息處理出現了異常,比如是因為網絡波動引起的異常。那麼如果不等待一段時間,直接就重試的話,很可能會導致在這期間內一直無法成功,造成一定的資源浪費。那麼我們可以將其先放在緩衝隊列中(圖中紅色隊列),等消息經過一段的延遲時間後再次進入實際消費隊列中(圖中藍色隊列),此時由於已經過了“較長”的時間了,異常的一些波動通常已經恢復,這些消息可以被正常地消費。
三:代碼示例
功能示例:每隔2秒、4秒、8秒、16秒去重試調用接口,總共調用4次。
- pom.xml
<code> <dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-amqp/<artifactid>
/<dependency>
<dependency>
<groupid>org.apache.httpcomponents/<groupid>
<artifactid>httpclient/<artifactid>
<version>4.5.6/<version>
/<dependency>
<dependency>
<groupid>com.alibaba/<groupid>
<artifactid>fastjson/<artifactid>
<version>1.2.62/<version>
/<dependency>
<dependency>
<groupid>org.projectlombok/<groupid>
<artifactid>lombok/<artifactid>
<optional>true/<optional>
/<dependency>
/<code>
- application.yml
<code> spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
## http component
http:
maxTotal: 100 #最大連接數
defaultMaxPerRoute: 20 # 併發數
connectTimeout: 1000 #創建連接的最長時間
connectionRequestTimeout: 500 #從連接池中獲取到連接的最長時間
socketTimeout: 10000 #數據傳輸的最長時間
validateAfterInactivity: 1000
/<code>
- HttpClient
<code> import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HttpClientConfig {
@Value("${http.maxTotal}")
private Integer maxTotal;
@Value("${http.defaultMaxPerRoute}")
private Integer defaultMaxPerRoute;
@Value("${http.connectTimeout}")
private Integer connectTimeout;
@Value("${http.connectionRequestTimeout}")
private Integer connectionRequestTimeout;
@Value("${http.socketTimeout}")
private Integer socketTimeout;
@Value("${http.validateAfterInactivity}")
private Integer validateAfterInactivity;
@Bean
public PoolingHttpClientConnectionManager poolingHttpClientConnectionManager(){
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(maxTotal);
connectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute);
connectionManager.setValidateAfterInactivity(validateAfterInactivity);
return connectionManager;
}
@Bean
public HttpClientBuilder httpClientBuilder(PoolingHttpClientConnectionManager poolingHttpClientConnectionManager){
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
httpClientBuilder.setConnectionManager(poolingHttpClientConnectionManager);
return httpClientBuilder;
}
@Bean
public CloseableHttpClient closeableHttpClient(HttpClientBuilder httpClientBuilder){
return httpClientBuilder.build();
}
@Bean
public RequestConfig.Builder builder(){
RequestConfig.Builder builder = RequestConfig.custom();
return builder.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectionRequestTimeout)
.setSocketTimeout(socketTimeout);
}
@Bean
public RequestConfig requestConfig(RequestConfig.Builder builder){
return builder.build();
}
}
/<code>
<code> import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j
@Component
public class HttpClient {
/** 默認字符集 */
public static final String DEFAULT_CHARSET = "UTF-8";
@Autowired
private CloseableHttpClient closeableHttpClient;
@Autowired
private RequestConfig config;
publicT doPost(String url, Map<string> requestParameter, Class /<code>clazz) throws Exception { /<string>
HttpResponse httpResponse = this.doPost(url, requestParameter);
if (clazz == String.class) {
return (T) httpResponse.getBody();
}
T response = JSONObject.parseObject(httpResponse.getBody(), clazz);
return response;
}
public HttpResponse doPost(String url, Map<string> requestParameter) throws Exception {
HttpPost httpPost = new HttpPost(url);
httpPost.setConfig(config);
httpPost.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
if (requestParameter != null) {
String requestBody = JSONObject.toJSONString(requestParameter);
StringEntity postEntity = new StringEntity(requestBody, "UTF-8");
httpPost.setEntity(postEntity);
}
CloseableHttpResponse response = this.closeableHttpClient.execute(httpPost);
// 對請求的響應進行簡單的包裝成自定義的類型
return new HttpResponse(response.getStatusLine().getStatusCode(), EntityUtils.toString(
response.getEntity(), DEFAULT_CHARSET));
}
/**
* 封裝請求的響應碼和響應的內容
*/
public class HttpResponse {
/** http status */
private Integer code;
/** http response content */
private String body;
public HttpResponse() { }
public HttpResponse(Integer code, String body) {
this.code = code;
this.body = body;
}
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
}
/<string>
- MessagePostProcessor
<code> import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
/**
* 設置消息過期時間
*/
public class ExpirationMessagePostProcessor implements MessagePostProcessor{
private final Long ttl;
public ExpirationMessagePostProcessor(Long ttl) {
this.ttl = ttl;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 設置失效時間
message.getMessageProperties().setExpiration(ttl.toString());
return message;
}
}
/<code>
- 聲明隊列和交換機
<code> package com.example.rabbitmq.retry;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 延遲隊列
*/
@Configuration
public class DelayQueueConfig {
/** 緩衝隊列名稱:過期時間針對於每個消息 */
public final static String DELAY_QUEUE_PER_MESSAGE_TTL_NAME = "delay_queue_per_message_ttl";
/** 死亡交換機:過期消息將通過該死亡交換機放入到實際消費的隊列中 */
public final static String DELAY_EXCHANGE_NAME = "delay_exchange";
/** 死亡交換機對應的路由鍵,通過該路由鍵路由到實際消費的隊列 */
public final static String DELAY_PROCESS_QUEUE_NAME = "delay_process_queue";
/** 路由到 delay_queue_per_message_ttl(統一失效時間的隊列)的exchange(用於隊列延遲重試) */
public final static String PER_MESSAGE_TTL_EXCHANGE_NAME = "per_message_ttl_exchange";
/**
* delay_queue_per_message_ttl
* 每個消息都可以控制自己的失效時間
* x-dead-letter-exchange聲明瞭隊列裡的死信轉發到的DLX名稱
* x-dead-letter-routing-key聲明瞭這些死信在轉發時攜帶的routing-key名稱
*/
@Bean
Queue delayQueuePerMessageTTL() {
return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)
.withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME)
.build();
}
/**
* 死亡交換機 DLX
* @return
*/
@Bean
DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
/**
* 實際消費隊列:過期之後會進入到該隊列中來
* @return
*/
@Bean
Queue delayProcessQueue() {
return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME).build();
}
/**
* 將死亡交換機delayExchange和實際消費隊列delay_process_queue綁定在一起,並攜帶路由鍵delay_process_queue
* @param delayProcessQueue
* @param delayExchange
* @return
*/
@Bean
Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(delayProcessQueue)
.to(delayExchange)
.with(DELAY_PROCESS_QUEUE_NAME);
}
/**
* 重試交換機:消費失敗後通過該交換機轉發到隊列中
* @return
*/
@Bean
DirectExchange perMessageTtlExchange() {
return new DirectExchange(PER_MESSAGE_TTL_EXCHANGE_NAME);
}
/**
* 重試交換機和緩衝隊列綁定
* @param delayQueuePerMessageTTL 緩衝隊列
* @param perMessageTtlExchange 重試交換機
* @return
*/
@Bean
Binding messageTtlBinding(Queue delayQueuePerMessageTTL, DirectExchange perMessageTtlExchange) {
return BindingBuilder.bind(delayQueuePerMessageTTL)
.to(perMessageTtlExchange)
.with(DELAY_QUEUE_PER_MESSAGE_TTL_NAME);
}
}
/<code>
- 消費消息
<code> import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Map;
import java.util.Objects;
/**
* 支付回調重試
* 2秒、4秒、8秒後分別重試,算上0秒這次總共重試4次
*/
@Slf4j
@Component
public class CallbackConsumer {
@Autowired
private AmqpTemplate rabbitTemplate;
@Autowired
private HttpClient httpClient;
/** 最大重試次數 */
private Integer maxRetryTime = 4;
@RabbitListener(queues = DelayQueueConfig.DELAY_PROCESS_QUEUE_NAME)
public void process(String msg) {
log.info("----------date = {}, msg ={}", new Date(), msg);
Map<string> callbackRequestMap = JSONObject.parseObject(msg, Map.class);
// 重試次數
Integer retryTime = (Integer)callbackRequestMap.get("retryTime");
String notifyUrl = (String)callbackRequestMap.get("notifyUrl");
try {
if (maxRetryTime <= 4) {
log.info("################Callback retry time = {}, date = {}################", retryTime, new Date());
String response = httpClient.doPost(notifyUrl, callbackRequestMap, String.class);
if (Objects.equals("SUCCESS", response)) {
log.info("ok");
} else {
// 下一次重試
log.error("error request={} response={}", callbackRequestMap, response);
if (retryTime == maxRetryTime) {
lastTimeRetryResult(response, null);
return;
}
retryCallback(callbackRequestMap);
}
}
} catch (Exception e) {
if (maxRetryTime < maxRetryTime) {
retryCallback(callbackRequestMap);
} else {
lastTimeRetryResult(null, e);
}
}
}
/**
* 嘗試下一次回調
* @param callbackRequestMap 請求參數
*/
private void retryCallback(Map<string> callbackRequestMap) {
// 下次重試,重試次數+1
Integer retryTime = (Integer)callbackRequestMap.get("retryTime");
callbackRequestMap.put("retryTime", retryTime + 1);
// 下一次延遲是上一次延遲的2倍時間
long expiration = retryTime * 2000;
String callbackMessage = JSONObject.toJSONString(callbackRequestMap);
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,
(Object) callbackMessage,
new ExpirationMessagePostProcessor(expiration));
}
/**
* 記錄最後一次重試的結果
* @param response
* @param e
*/
private void lastTimeRetryResult(String response, Exception e) {
log.error("last time retry, response={}", response);
}
}
/<string>/<string>/<code>
- 模擬回調
<code> package com.example.rabbitmq.retry;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
@RequestMapping("/mock")
public class MockController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/callback")
public void callback(String notifyUrl) {
Map<string> requestMap = new HashMap<>();
requestMap.put("retryTime", 1);
requestMap.put("notifyUrl", notifyUrl);
requestMap.put("orderCode", UUID.randomUUID().toString());
Object callbackMessage = JSONObject.toJSONString(requestMap);
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME, callbackMessage, new ExpirationMessagePostProcessor(0L));
}
}
/<string>/<code>
歡迎關注Java實用技術,每天發佈一篇實用技術文章。
閱讀更多 Java實用技術 的文章