一個基於 RabbitMQ 的可複用的分佈式事務消息架構方案

作者:Throwable
博客園:https://www.cnblogs.com/throwable/p/12266806.html

前提

分佈式事務是微服務實踐中一個比較棘手的問題,在筆者所實施的微服務實踐方案中,都採用了折中或者規避強一致性的方案。參考Ebay多年前提出的本地消息表方案,基於RabbitMQ和MySQL(JDBC)做了輕量級的封裝,實現了低入侵性的事務消息模塊。本文的內容就是詳細分析整個方案的設計思路和實施。環境依賴如下:

  • JDK1.8+
  • spring-boot-start-web:2.x.x
  • spring-boot-start-jdbc:2.x.x
  • spring-boot-start-amqp:2.x.x
  • HikariCP:3.x.x(spring-boot-start-jdbc自帶)
  • mysql-connector-java:5.1.48
  • redisson:3.12.1

方案設計思路

事務消息原則上只適合弱一致性(或者說最終一致性)的場景,常見的弱一致性場景如:

  • 用戶服務完成了註冊動作,向短信服務推送一條營銷相關的消息。
  • 信貸體系中,訂單服務保存訂單完畢,向審批服務推送一條待審批的訂單記錄信息。
  • ......

強一致性的場景一般不應該選用事務消息。

一個基於 RabbitMQ 的可複用的分佈式事務消息架構方案

一般情況下,要求強一致性說明要嚴格同步,也就是所有操作必須同時成功或者同時失敗,這樣就會引入同步帶來的額外消耗。如果一個事務消息模塊設計合理,補償、查詢、監控等等功能都完畢,由於系統交互是異步的,整體吞吐要比嚴格同步高。在筆者負責的業務系統中基於事務消息使用還定製了一條基本原則:消息內容正確的前提下,消費方出現異常需要自理

簡單來說就是:上游保證了自身的業務正確性,成功推送了正確的消息到RabbitMQ就認為上游義務已經結束。

為了降低代碼的入侵性,事務消息需要藉助Spring的編程式事務或者聲明式事務。編程式事務一般依賴於TransactionTemplate,而聲明式事務依託於AOP模塊,依賴於註解@Transactional。

接著需要自定義一個事務消息功能模塊,新增一個事務消息記錄表(其實就是本地消息表),用於保存每一條需要發送的消息記錄。事務消息功能模塊的主要功能是:

  • 保存消息記錄。
  • 推送消息到RabbitMQ服務端。
  • 消息記錄的查詢、補償推送等等。

事務執行的邏輯單元

在事務執行的邏輯單元裡面,需要進行待推送的事務消息記錄的保存,也就是:本地(業務)邏輯和事務消息記錄保存操作綁定在同一個事務

一個基於 RabbitMQ 的可複用的分佈式事務消息架構方案

發送消息到RabbitMQ服務端這一步需要延後到事務提交之後,這樣才能保證事務提交成功和消息成功發送到RabbitMQ服務端這兩個操作是一致的。

為了把保存待發送的事務消息發送消息到RabbitMQ兩個動作從使用者感知角度合併為一個動作,這裡需要用到Spring特有的事務同步器TransactionSynchronization,這裡分析一下事務同步器的主要方法的回調位置,主要參考AbstractPlatformTransactionManager#commit()或者AbstractPlatformTransactionManager#processCommit()方法:

一個基於 RabbitMQ 的可複用的分佈式事務消息架構方案

上圖僅僅演示了事務正確提交的場景(不包含異常的場景)。這裡可以明確知道,事務同步器TransactionSynchronization的afterCommit()和afterCompletion(int status)方法都在真正的事務提交點AbstractPlatformTransactionManager#doCommit()之後回調,因此可以選用這兩個方法其中之一用於執行推送消息到RabbitMQ服務端,整體的偽代碼如下:

<code>@Transactional
public Dto businessMethod(){
business transaction code block ...
// 保存事務消息
[saveTransactionMessageRecord()]
// 註冊事務同步器 - 在afterCommit()方法中推送消息到RabbitMQ
[register TransactionSynchronization,send message in method afterCommit()]
business transaction code block ...
}
/<code>

上面偽代碼中,保存事務消息註冊事務同步器兩個步驟可以安插在事務方法中的任意位置,也就是說與執行順序無關。

事務消息的補償

雖然之前提到筆者建議下游服務自理自身服務消費異常的場景,但是有些時候迫於無奈還是需要上游把對應的消息重新推送,這個算是特殊的場景。另外還有一個場景需要考慮:事務提交之後觸發事務同步器TransactionSynchronization的afterCommit()方法失敗。這是一個低概率的場景,但是在生產中一定會出現,一個比較典型的原因就是:

事務提交完成後尚未來得及觸發TransactionSynchronization#afterCommit()方法進行推送服務實例就被重啟

如下圖所示:

一個基於 RabbitMQ 的可複用的分佈式事務消息架構方案

為了統一處理補償推送的問題,使用了有限狀態判斷消息是否已經推送成功:

  • 在事務方法內,保存事務消息的時候,標記消息記錄推送狀態為處理中。
  • 事務同步器接口TransactionSynchronization的afterCommit()方法的實現中,推送對應的消息到RabbitMQ,然後更變事務消息記錄的狀態為推送成功。

還有一種極為特殊的情況是RabbitMQ服務端本身出現故障導致消息推送異常,這種情況下需要進行重試(補償推送),經驗證明短時間內的反覆重試是沒有意義的,故障的服務一般不會瞬時恢復,所以可以考慮使用指數退避算法進行重試,同時需要限制最大重試次數。

一個基於 RabbitMQ 的可複用的分佈式事務消息架構方案

指數值、間隔值和最大重試次數上限需要根據實際情況設定,否則容易出現消息延時過大或者重試過於頻繁等問題。

方案實施

引入核心依賴:

<code><properties>
<spring.boot.version>2.2.4.RELEASE/<spring.boot.version>
<redisson.version>3.12.1/<redisson.version>
<mysql.connector.version>5.1.48/<mysql.connector.version>
/<properties>
<dependencymanagement>
<dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-dependencies/<artifactid>
<version>${spring.boot.version}/<version>
<type>pom/<type>
<scope>import/<scope>
/<dependency>
/<dependencies>
/<dependencymanagement>
<dependencies>
<dependency>
<groupid>mysql/<groupid>
<artifactid>mysql-connector-java/<artifactid>
<version>${mysql.connector.version}/<version>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-jdbc/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-aop/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>

<artifactid>spring-boot-starter-amqp/<artifactid>
/<dependency>
<dependency>
<groupid>org.redisson/<groupid>
<artifactid>redisson/<artifactid>
<version>${redisson.version}/<version>
/<dependency>
/<dependencies>
/<code>

spring-boot-starter-jdbc、mysql-connector-java和spring-boot-starter-aop是MySQL事務相關,而spring-boot-starter-amqp是RabbitMQ客戶端的封裝,redisson主要使用其分佈式鎖,用於補償定時任務的加鎖執行(以防止服務多個節點併發執行補償推送)。

表設計

事務消息模塊主要涉及兩張表,以MySQL為例,建表DDL如下:

<code>CREATE TABLE `t_transactional_message`
(
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
creator VARCHAR(20) NOT NULL DEFAULT 'admin',
editor VARCHAR(20) NOT NULL DEFAULT 'admin',
deleted TINYINT NOT NULL DEFAULT 0,
current_retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '當前重試次數',
max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT '最大重試次數',
queue_name VARCHAR(255) NOT NULL COMMENT '隊列名',
exchange_name VARCHAR(255) NOT NULL COMMENT '交換器名',
exchange_type VARCHAR(8) NOT NULL COMMENT '交換類型',
routing_key VARCHAR(255) COMMENT '路由鍵',
business_module VARCHAR(32) NOT NULL COMMENT '業務模塊',
business_key VARCHAR(255) NOT NULL COMMENT '業務鍵',
next_schedule_time DATETIME NOT NULL COMMENT '下一次調度時間',
message_status TINYINT NOT NULL DEFAULT 0 COMMENT '消息狀態',
init_backoff BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,單位為秒',
backoff_factor TINYINT NOT NULL DEFAULT 2 COMMENT '退避因子(也就是指數)',

INDEX idx_queue_name (queue_name),
INDEX idx_create_time (create_time),
INDEX idx_next_schedule_time (next_schedule_time),
INDEX idx_business_key (business_key)
) COMMENT '事務消息表';

CREATE TABLE `t_transactional_message_content`
(
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
message_id BIGINT UNSIGNED NOT NULL COMMENT '事務消息記錄ID',
content TEXT COMMENT '消息內容'
) COMMENT '事務消息內容表';
/<code>

因為此模塊有可能擴展出一個後臺管理模塊,所以要把消息的管理和狀態相關字段和大體積的消息內容分別存放在兩個表,從而避免大批量查詢消息記錄的時候MySQL服務IO使用率過高的問題(這是和上一個公司的DBA團隊商討後得到的一個比較合理的方案)。預留了兩個業務字段business_module和business_key用於標識業務模塊和業務鍵(一般是唯一識別號,例如訂單號)。

一個基於 RabbitMQ 的可複用的分佈式事務消息架構方案

一般情況下,如果服務通過配置自行提前聲明隊列和交換器的綁定關係,那麼發送RabbitMQ消息的時候其實只依賴於exchangeName和routingKey兩個字段(header類型的交換器是特殊的,也比較少用,這裡暫時不用考慮),考慮到服務可能會遺漏聲明操作,發送消息的時候會基於隊列進行首次綁定聲明並且緩存相關的信息(RabbitMQ中的隊列-交換器綁定聲明只要每次聲明綁定關係的參數一致,則不會拋出異常)。

方案代碼設計

下面的方案設計描述中,暫時忽略了消息事務管理後臺的API設計,這些可以在後期補充。

定義模型實體類TransactionalMessage和TransactionalMessageContent:

<code>@Data
public class TransactionalMessage {

private Long id;
private LocalDateTime createTime;
private LocalDateTime editTime;
private String creator;
private String editor;
private Integer deleted;
private Integer currentRetryTimes;
private Integer maxRetryTimes;
private String queueName;
private String exchangeName;
private String exchangeType;
private String routingKey;
private String businessModule;
private String businessKey;
private LocalDateTime nextScheduleTime;
private Integer messageStatus;

private Long initBackoff;
private Integer backoffFactor;
}

@Data
public class TransactionalMessageContent {

private Long id;
private Long messageId;
private String content;
}
/<code>

然後定義dao接口(這裡暫時不展開實現的細節代碼,存儲使用MySQL,如果要替換為其他類型的數據庫,只需要使用不同的實現即可):

<code>public interface TransactionalMessageDao {

void insertSelective(TransactionalMessage record);

void updateStatusSelective(TransactionalMessage record);

List<transactionalmessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime,
LocalDateTime maxScheduleTime,
int limit);
}

public interface TransactionalMessageContentDao {

void insert(TransactionalMessageContent record);

List<transactionalmessagecontent> queryByMessageIds(String messageIds);
}
/<transactionalmessagecontent>/<transactionalmessage>/<code>

接著定義事務消息服務接口TransactionalMessageService:

<code>// 對外提供的服務類接口
public interface TransactionalMessageService {

void sendTransactionalMessage(Destination destination, TxMessage message);
}


@Getter
@RequiredArgsConstructor
public enum ExchangeType {

FANOUT("fanout"),

DIRECT("direct"),

TOPIC("topic"),

DEFAULT(""),

;

private final String type;
}

// 發送消息的目的地
public interface Destination {

ExchangeType exchangeType();

String queueName();

String exchangeName();

String routingKey();
}

@Builder
public class DefaultDestination implements Destination {

private ExchangeType exchangeType;
private String queueName;
private String exchangeName;
private String routingKey;

@Override
public ExchangeType exchangeType() {
return exchangeType;
}

@Override
public String queueName() {
return queueName;
}

@Override
public String exchangeName() {
return exchangeName;

}

@Override
public String routingKey() {
return routingKey;
}
}

// 事務消息
public interface TxMessage {

String businessModule();

String businessKey();

String content();
}

@Builder
public class DefaultTxMessage implements TxMessage {

private String businessModule;
private String businessKey;
private String content;

@Override
public String businessModule() {
return businessModule;
}

@Override
public String businessKey() {
return businessKey;
}

@Override
public String content() {
return content;
}
}

// 消息狀態
@RequiredArgsConstructor
public enum TxMessageStatus {

/**
* 成功
*/
SUCCESS(1),


/**
* 待處理
*/
PENDING(0),

/**
* 處理失敗
*/
FAIL(-1),

;

private final Integer status;
}
/<code>

TransactionalMessageService的實現類是事務消息的核心功能實現,代碼如下:

<code>@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitTransactionalMessageService implements TransactionalMessageService {

private final AmqpAdmin amqpAdmin;
private final TransactionalMessageManagementService managementService;

private static final ConcurrentMap<string> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>();

@Override
public void sendTransactionalMessage(Destination destination, TxMessage message) {
String queueName = destination.queueName();
String exchangeName = destination.exchangeName();
String routingKey = destination.routingKey();
ExchangeType exchangeType = destination.exchangeType();
// 原子性的預聲明
QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> {
Queue queue = new Queue(queueName);
amqpAdmin.declareQueue(queue);
Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType());
amqpAdmin.declareExchange(exchange);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
amqpAdmin.declareBinding(binding);
return true;
});
TransactionalMessage record = new TransactionalMessage();
record.setQueueName(queueName);

record.setExchangeName(exchangeName);
record.setExchangeType(exchangeType.getType());
record.setRoutingKey(routingKey);
record.setBusinessModule(message.businessModule());
record.setBusinessKey(message.businessKey());
String content = message.content();
// 保存事務消息記錄
managementService.saveTransactionalMessageRecord(record, content);
// 註冊事務同步器
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
managementService.sendMessageSync(record, content);
}
});
}
}
/<string>/<code>

消息記錄狀態和內容持久化的管理統一放在TransactionalMessageManagementService中:

<code>@Slf4j
@RequiredArgsConstructor
@Service
public class TransactionalMessageManagementService {

private final TransactionalMessageDao messageDao;
private final TransactionalMessageContentDao contentDao;
private final RabbitTemplate rabbitTemplate;

private static final LocalDateTime END = LocalDateTime.of(2999, 1, 1, 0, 0, 0);
private static final long DEFAULT_INIT_BACKOFF = 10L;
private static final int DEFAULT_BACKOFF_FACTOR = 2;
private static final int DEFAULT_MAX_RETRY_TIMES = 5;
private static final int LIMIT = 100;

public void saveTransactionalMessageRecord(TransactionalMessage record, String content) {
record.setMessageStatus(TxMessageStatus.PENDING.getStatus());
record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF,
DEFAULT_BACKOFF_FACTOR, 0));
record.setCurrentRetryTimes(0);
record.setInitBackoff(DEFAULT_INIT_BACKOFF);
record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);
record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);
messageDao.insertSelective(record);
TransactionalMessageContent messageContent = new TransactionalMessageContent();
messageContent.setContent(content);

messageContent.setMessageId(record.getId());
contentDao.insert(messageContent);
}

public void sendMessageSync(TransactionalMessage record, String content) {
try {
rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content);
if (log.isDebugEnabled()) {
log.debug("發送消息成功,目標隊列:{},消息內容:{}", record.getQueueName(), content);
}
// 標記成功
markSuccess(record);
} catch (Exception e) {
// 標記失敗
markFail(record, e);
}
}

private void markSuccess(TransactionalMessage record) {
// 標記下一次執行時間為最大值
record.setNextScheduleTime(END);
record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus());
record.setEditTime(LocalDateTime.now());
messageDao.updateStatusSelective(record);
}

private void markFail(TransactionalMessage record, Exception e) {
log.error("發送消息失敗,目標隊列:{}", record.getQueueName(), e);
record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
// 計算下一次的執行時間
LocalDateTime nextScheduleTime = calculateNextScheduleTime(
record.getNextScheduleTime(),
record.getInitBackoff(),
record.getBackoffFactor(),
record.getCurrentRetryTimes()
);
record.setNextScheduleTime(nextScheduleTime);
record.setMessageStatus(TxMessageStatus.FAIL.getStatus());
record.setEditTime(LocalDateTime.now());
messageDao.updateStatusSelective(record);
}

/**
* 計算下一次執行時間
*
* @param base 基礎時間
* @param initBackoff 退避基準值
* @param backoffFactor 退避指數
* @param round 輪數
* @return LocalDateTime
*/
private LocalDateTime calculateNextScheduleTime(LocalDateTime base,
long initBackoff,
long backoffFactor,
long round) {
double delta = initBackoff * Math.pow(backoffFactor, round);
return base.plusSeconds((long) delta);
}

/**
* 推送補償 - 裡面的參數應該根據實際場景定製
*/
public void processPendingCompensationRecords() {
// 時間的右值為當前時間減去退避初始值,這裡預防把剛保存的消息也推送了
LocalDateTime max = LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF);
// 時間的左值為右值減去1小時
LocalDateTime min = max.plusHours(-1);
Map<long> collect = messageDao.queryPendingCompensationRecords(min, max, LIMIT)
.stream()
.collect(Collectors.toMap(TransactionalMessage::getId, x -> x));
if (!collect.isEmpty()) {
StringJoiner joiner = new StringJoiner(",", "(", ")");
collect.keySet().forEach(x -> joiner.add(x.toString()));
contentDao.queryByMessageIds(joiner.toString())
.forEach(item -> {
TransactionalMessage message = collect.get(item.getMessageId());
sendMessageSync(message, item.getContent());
});
}
}
}
/<long>/<code>

這裡有一點尚待優化:更新事務消息記錄狀態的方法可以優化為批量更新,在limit比較大的時候,批量更新的效率會更高。最後是定時任務的配置類:

<code>@Slf4j
@RequiredArgsConstructor
@Configuration
@EnableScheduling
public class ScheduleJobAutoConfiguration {

private final TransactionalMessageManagementService managementService;

/**
* 這裡用的是本地的Redis,實際上要做成配置
*/
private final RedissonClient redisson = Redisson.create();

@Scheduled(fixedDelay = 10000)
public void transactionalMessageCompensationTask() throws Exception {
RLock lock = redisson.getLock("transactionalMessageCompensationTask");
// 等待時間5秒,預期300秒執行完畢,這兩個值需要按照實際場景定製
boolean tryLock = lock.tryLock(5, 300, TimeUnit.SECONDS);
if (tryLock) {
try {
long start = System.currentTimeMillis();
log.info("開始執行事務消息推送補償定時任務...");
managementService.processPendingCompensationRecords();
long end = System.currentTimeMillis();
long delta = end - start;
// 以防鎖過早釋放
if (delta < 5000) {
Thread.sleep(5000 - delta);
}
log.info("執行事務消息推送補償定時任務完畢,耗時:{} ms...", end - start);
} finally {
lock.unlock();
}
}
}
}
/<code>

基本代碼編寫完,整個項目的結構如下:

一個基於 RabbitMQ 的可複用的分佈式事務消息架構方案

最後添加兩個測試類:

<code>@RequiredArgsConstructor
@Component
public class MockBusinessRunner implements CommandLineRunner {

private final MockBusinessService mockBusinessService;

@Override
public void run(String... args) throws Exception {
mockBusinessService.saveOrder();
}
}

@Slf4j
@RequiredArgsConstructor
@Service

public class MockBusinessService {

private final JdbcTemplate jdbcTemplate;
private final TransactionalMessageService transactionalMessageService;
private final ObjectMapper objectMapper;

@Transactional(rollbackFor = Exception.class)
public void saveOrder() throws Exception {
String orderId = UUID.randomUUID().toString();
BigDecimal amount = BigDecimal.valueOf(100L);
Map<string> message = new HashMap<>();
message.put("orderId", orderId);
message.put("amount", amount);
jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (?,?)", p -> {
p.setString(1, orderId);
p.setBigDecimal(2, amount);
});
String content = objectMapper.writeValueAsString(message);
transactionalMessageService.sendTransactionalMessage(
DefaultDestination.builder()
.exchangeName("tm.test.exchange")
.queueName("tm.test.queue")
.routingKey("tm.test.key")
.exchangeType(ExchangeType.DIRECT)
.build(),
DefaultTxMessage.builder()
.businessKey(orderId)
.businessModule("SAVE_ORDER")
.content(content)
.build()
);
log.info("保存訂單:{}成功...", orderId);
}
}
/<string>/<code>

某次測試結果如下:

一個基於 RabbitMQ 的可複用的分佈式事務消息架構方案

模擬訂單數據成功保存,而且RabbitMQ消息在事務成功提交後正常發送到RabbitMQ服務端中,如RabbitMQ控制檯數據所示。

小結

事務消息模塊的設計僅僅是使異步消息推送這個功能實現趨向於完備,其實一個合理的異步消息交互系統,一定會提供同步查詢接口,這一點是基於異步消息沒有回調或者沒有響應的特性導致的。一般而言,一個系統的吞吐量和系統的異步化處理佔比成正相關(這一點可以參考Amdahl's Law),所以在系統架構設計實際中應該儘可能使用異步交互,提高系統吞吐量同時減少同步阻塞帶來的無謂等待。事務消息模塊可以擴展出一個後臺管理,甚至可以配合Micrometer、Prometheus和Grafana體系做實時數據監控。

本文demo項目倉庫:rabbit-transactional-message

demo必須本地安裝MySQL、Redis和RabbitMQ才能正常啟動,本地必須新建一個數據庫命名local。


分享到:


相關文章: