基於RocketMQ分佈式事務

前言

之前我們說到,分佈式事務是一個複雜的技術問題。沒有通用的解決方案,也缺乏簡單高效的手段。

不過,如果我們的系統不追求強一致性,那麼最常用的還是最終一致性方案。今天,我們就基於 RocketMQ來實現消息最終一致性方案的分佈式事務。

本文代碼不只是簡單的demo,考慮到一些異常情況、冪等性消費和死信隊列等情況,儘量向可靠業務場景靠攏。

另外,在最後還有《RocketMQ技術內幕》一書中,關於分佈式事務示例代碼的錯誤流程分析,所以篇幅較長,希望大家耐心觀看。

一、事務消息

在這裡,筆者不想使用大量的文字贅述 RocketMQ事務消息的原理,我們只需要搞明白兩個概念。

  • Half Message,半消息

暫時不能被 Consumer消費的消息。Producer已經把消息發送到 Broker端,但是此消息的狀態被標記為不能投遞,處於這種狀態下的消息稱為半消息。事實上,該狀態下的消息會被放在一個叫做 RMQ_SYS_TRANS_HALF_TOPIC的主題下。

當 Producer端對它二次確認後,也就是 Commit之後,Consumer端才可以消費到;那麼如果是Rollback,該消息則會被刪除,永遠不會被消費到。

  • 事務狀態回查

我們想,可能會因為網絡原因、應用問題等,導致Producer端一直沒有對這個半消息進行確認,那麼這時候 Broker服務器會定時掃描這些半消息,主動找Producer端查詢該消息的狀態。

當然,什麼時候去掃描,包含掃描幾次,我們都可以配置,在後文我們再細說。

簡而言之,RocketMQ事務消息的實現原理就是基於兩階段提交和事務狀態回查,來決定消息最終是提交還是回滾的。

在本文,我們的代碼就以 訂單服務、積分服務 為例。結合上文來看,整體流程如下:


基於RocketMQ分佈式事務 - 完整示例


二、訂單服務

在訂單服務中,我們接收前端的請求創建訂單,保存相關數據到本地數據庫。

1、事務日誌表

在訂單服務中,除了有一張訂單表之外,還需要一個事務日誌表。 它的定義如下:

<code>CREATE TABLE `transaction_log` (
`id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '事務ID',
`business` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '業務標識',
`foreign_key` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '對應業務表中的主鍵',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;/<code>

這張表專門作用於事務狀態回查。當提交業務數據時,此表也插入一條數據,它們共處一個本地事務中。通過事務ID查詢該表,如果返回記錄,則證明本地事務已提交;如果未返回記錄,則本地事務可能是未知狀態或者是回滾狀態。

2、TransactionMQProducer

我們知道,通過 RocketMQ發送消息,需先創建一個消息發送者。值得注意的是,如果發送事務消息,在這裡我們的創建的實例必須是 TransactionMQProducer。

<code>@Component
public class TransactionProducer {
\t
private String producerGroup = "order_trans_group";
private TransactionMQProducer producer;

//用於執行本地事務和事務狀態回查的監聽器
@Autowired
OrderTransactionListener orderTransactionListener;
//執行任務的線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));

@PostConstruct
public void init(){
producer = new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendMsgTimeout(Integer.MAX_VALUE);
producer.setExecutorService(executor);
producer.setTransactionListener(orderTransactionListener);
this.start();
}
private void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
//事務消息發送
public TransactionSendResult send(String data, String topic) throws MQClientException {
Message message = new Message(topic,data.getBytes());
return this.producer.sendMessageInTransaction(message, null);
}
}/<code>

上面的代碼中,主要就是創建事務消息的發送者。在這裡,我們重點關注 OrderTransactionListener,它負責執行本地事務和事務狀態回查。

3、OrderTransactionListener

<code>@Component 

public class OrderTransactionListener implements TransactionListener {

@Autowired
OrderService orderService;

@Autowired
TransactionLogService transactionLogService;

Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
logger.info("開始執行本地事務....");
LocalTransactionState state;
try{
String body = new String(message.getBody());
OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
orderService.createOrder(order,message.getTransactionId());
state = LocalTransactionState.COMMIT_MESSAGE;
logger.info("本地事務已提交。{}",message.getTransactionId());
}catch (Exception e){
logger.info("執行本地事務失敗。{}",e);
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
logger.info("開始回查本地事務狀態。{}",messageExt.getTransactionId());
LocalTransactionState state;
String transactionId = messageExt.getTransactionId();
if (transactionLogService.get(transactionId)>0){
state = LocalTransactionState.COMMIT_MESSAGE;
}else {
state = LocalTransactionState.UNKNOW;
}
logger.info("結束本地事務狀態查詢:{}",state);
return state;
}
}/<code>

在通過 producer.sendMessageInTransaction發送事務消息後,如果消息發送成功,就會調用到這裡的executeLocalTransaction方法,來執行本地事務。在這裡,它會完成訂單數據和事務日誌的插入。

該方法返回值 LocalTransactionState 代表本地事務狀態,它是一個枚舉類。

<code>public enum LocalTransactionState {
//提交事務消息,消費者可以看到此消息
COMMIT_MESSAGE,
//回滾事務消息,消費者不會看到此消息
ROLLBACK_MESSAGE,
//事務未知狀態,需要調用事務狀態回查,確定此消息是提交還是回滾
UNKNOW;
}/<code>

那麼, checkLocalTransaction 方法就是用於事務狀態查詢。在這裡,我們通過事務ID查詢transaction_log這張表,如果可以查詢到結果,就提交事務消息;如果沒有查詢到,就返回未知狀態。

注意,這裡還涉及到另外一個問題。如果是返回未知狀態,RocketMQ Broker服務器會以1分鐘的間隔時間不斷回查,直至達到事務回查最大檢測數,如果超過這個數字還未查詢到事務狀態,則回滾此消息。

當然,事務回查的頻率和最大次數,我們都可以配置。在 Broker 端,可以通過這樣來配置它:

<code>brokerConfig.setTransactionCheckInterval(10000); //回查頻率10秒一次
brokerConfig.setTransactionCheckMax(3); //最大檢測次數為3/<code>

4、業務實現類

<code>@Service
public class OrderServiceImpl implements OrderService {
@Autowired
OrderMapper orderMapper;
@Autowired
TransactionLogMapper transactionLogMapper;
@Autowired
TransactionProducer producer;

Snowflake snowflake = new Snowflake(1,1);
Logger logger = LoggerFactory.getLogger(this.getClass());

//執行本地事務時調用,將訂單數據和事務日誌寫入本地數據庫
@Transactional
@Override
public void createOrder(OrderDTO orderDTO,String transactionId){

//1.創建訂單
Order order = new Order();
BeanUtils.copyProperties(orderDTO,order);
orderMapper.createOrder(order);

//2.寫入事務日誌
TransactionLog log = new TransactionLog();
log.setId(transactionId);
log.setBusiness("order");
log.setForeignKey(String.valueOf(order.getId()));
transactionLogMapper.insert(log);

logger.info("訂單創建完成。{}",orderDTO);
}

//前端調用,只用於向RocketMQ發送事務消息
@Override
public void createOrder(OrderDTO order) throws MQClientException {
order.setId(snowflake.nextId());
order.setOrderNo(snowflake.nextIdStr());
producer.send(JSON.toJSONString(order),"order");
}
}/<code>

在訂單業務服務類中,我們有兩個方法。一個用於向RocketMQ發送事務消息,一個用於真正的業務數據落庫。

至於為什麼這樣做,其實有一些原因的,我們後面再說。

5、調用

<code>@RestController
public class OrderController {

@Autowired
OrderService orderService;
Logger logger = LoggerFactory.getLogger(this.getClass());

@PostMapping("/create_order")
public void createOrder(@RequestBody OrderDTO order) throws MQClientException {
logger.info("接收到訂單數據:{}",order.getCommodityCode());
orderService.createOrder(order);
}
}/<code>

6、總結

目前已經完成了訂單服務的業務邏輯。我們總結流程如下:


基於RocketMQ分佈式事務 - 完整示例


考慮到異常情況,這裡的要點如下:

  • 第一次調用createOrder,發送事務消息。如果發送失敗,導致報錯,則將異常返回,此時不會涉及到任何數據安全。
  • 如果事務消息發送成功,但在執行本地事務時發生異常,那麼訂單數據和事務日誌都不會被保存,因為它們是一個本地事務中。
  • 如果執行完本地事務,但未能及時的返回本地事務狀態或者返回了未知狀態。那麼,會由Broker定時回查事務狀態,然後根據事務日誌表,就可以判斷訂單是否已完成,並寫入到數據庫。

基於這些要素,我們可以說,已經保證了訂單服務和事務消息的一致性。那麼,接下來就是積分服務如何正確的消費訂單數據並完成相應的業務操作。

三、積分服務

在積分服務中,主要就是消費訂單數據,然後根據訂單內容,給相應用戶增加積分。

1、積分記錄表

<code>CREATE TABLE `t_points` (
`id` bigint(16) NOT NULL COMMENT '主鍵',
`user_id` bigint(16) NOT NULL COMMENT '用戶id',
`order_no` bigint(16) NOT NULL COMMENT '訂單編號',
`points` int(4) NOT NULL COMMENT '積分',
`remarks` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '備註',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;/<code>

在這裡,我們重點關注order_no字段,它是實現冪等消費的一種選擇。

2、消費者啟動

<code>@Component
public class Consumer {

String consumerGroup = "consumer-group";
DefaultMQPushConsumer consumer;

@Autowired
OrderListener orderListener;

@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("order","*");
consumer.registerMessageListener(orderListener);
consumer.start();
}
}/<code>

啟動一個消費者比較簡單,我們指定要消費的 topic 和監聽器就好了。

3、消費者監聽器

<code>@Component
public class OrderListener implements MessageListenerConcurrently {

@Autowired
PointsService pointsService;
Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<messageext> list, ConsumeConcurrentlyContext context) {
logger.info("消費者線程監聽到消息。");
try{
for (MessageExt message:list) {
logger.info("開始處理訂單數據,準備增加積分....");
OrderDTO order = JSONObject.parseObject(message.getBody(), OrderDTO.class);
pointsService.increasePoints(order);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
logger.error("處理消費者數據發生異常。{}",e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}/<messageext>/<code>

監聽到消息之後,調用業務服務類處理即可。處理完成則返回CONSUME_SUCCESS以提交,處理失敗則返回RECONSUME_LATER來重試。

4、增加積分

在這裡,主要就是對積分數據入庫。但注意,入庫之前需要先做判斷,來達到冪等性消費。

<code>@Service
public class PointsServiceImpl implements PointsService {

@Autowired
PointsMapper pointsMapper;

Snowflake snowflake = new Snowflake(1,1);
Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void increasePoints(OrderDTO order) {

\t\t
//入庫之前先查詢,實現冪等
if (pointsMapper.getByOrderNo(order.getOrderNo())>0){
logger.info("積分添加完成,訂單已處理。{}",order.getOrderNo());
}else{
Points points = new Points();
points.setId(snowflake.nextId());
points.setUserId(order.getUserId());
points.setOrderNo(order.getOrderNo());
Double amount = order.getAmount();
points.setPoints(amount.intValue()*10);
points.setRemarks("商品消費共【"+order.getAmount()+"】元,獲得積分"+points.getPoints());
pointsMapper.insert(points);
logger.info("已為訂單號碼{}增加積分。",points.getOrderNo());
}
}
}/<code>

5、冪等性消費

實現冪等性消費的方式有很多種,具體怎麼做,根據自己的情況來看。

比如,在本例中,我們直接將訂單號和積分記錄綁定在同一個表中,在增加積分之前,就可以先查詢此訂單是否已處理過。

或者,我們也可以額外創建一張表,來記錄訂單的處理情況。

再者,也可以將這些信息直接放到redis緩存裡,在入庫之前先查詢緩存。

不管以哪種方式來做,總的思路就是在執行業務前,必須先查詢該消息是否被處理過。那麼這裡就涉及到一個數據主鍵問題,在這個例子中,我們以訂單號為主鍵,也可以用事務ID作主鍵,如果是普通消息的話,我們也可以創建唯一的消息ID作為主鍵。

6、消費異常

我們知道,當消費者處理失敗後會返回 RECONSUME_LATER ,讓消息來重試,默認最多重試16次。

那,如果真的由於特殊原因,消息一直不能被正確處理,那怎麼辦 ?

我們考慮兩種方式來解決這個問題。

第一,在代碼中設置消息重試次數,如果達到指定次數,就發郵件或者短信通知業務方人工介入處理。

<code>@Component
public class OrderListener implements MessageListenerConcurrently {

Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<messageext> list, ConsumeConcurrentlyContext context) {
logger.info("消費者線程監聽到消息。");
for (MessageExt message:list) {
if (!processor(message)){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

/**
* 消息處理,第3次處理失敗後,發送郵件通知人工介入
* @param message
* @return
*/
private boolean processor(MessageExt message){
String body = new String(message.getBody());

try {
logger.info("消息處理....{}",body);
int k = 1/0;
return true;
}catch (Exception e){
if(message.getReconsumeTimes()>=3){
logger.error("消息重試已達最大次數,將通知業務人員排查問題。{}",message.getMsgId());
sendMail(message);
return true;
}
return false;
}
}
}/<messageext>/<code>

第二,等待消息重試最大次數後,進入死信隊列。

消息重試最大次數默認是16次,我們也可以在消費者端設置這個次數。

<code>consumer.setMaxReconsumeTimes(3);//設置消息重試最大次數/<code>

死信隊列的主題名稱是 %DLQ% + 消費者組名稱,比如在訂單數據中,我們設置了消費者組名:

String consumerGroup = "order-consumer-group";

那麼這個消費者,對應的死信隊列主題名稱就是%DLQ%order-consumer-group


基於RocketMQ分佈式事務 - 完整示例


如上圖,我們還需要點擊TOPIC配置,來修改裡面的 perm 屬性,改為 6 即可。


基於RocketMQ分佈式事務 - 完整示例


最後就可以通過程序代碼監聽這個主題,來通知人工介入處理或者直接在控制檯查看處理了。通過冪等性消費和對死信消息的處理,基本上就能保證消息一定會被處理。

四、《RocketMQ技術內幕》中的代碼示例

筆者手裡有一本書《RocketMQ技術內幕》,在 9.4 章節有一段分佈式事務的代碼。

不過,筆者在看了之後,感覺它裡面的流程是有問題的,會造成本地事務的不一致,下面我們就來分析一下。

在這裡,我們主要是關注書中訂單業務服務類和事務監聽器的流程。

在書中,訂單下單偽代碼如下:

<code>public Map createOrder(){
Map result = new HashMap();
//執行下訂單相關的業務流程,例如操作本地數據庫落庫相關代碼
//生成事務消息唯一業務標識,將該業務標識組裝到待發送的消息體中,方便消息端進行冪等消費。
//調用消息客戶端API,發送事務prepare消息。
//返回結果,提交事務
return result;
}/<code>

上述是第一步,發送事務消息,接下來需要實現TransactionListener,實現執行本地事務與本地事務回查。

<code>public class OrderTransactionListenerImpl implements TransactionListener {
@Override

public LocalTransactionState executeLocalTransaction(Message message, Object o) {

//從消息體中獲取業務唯一ID
String bizUniNo = message.getUserProperty("bizUniNo");
//將bizUniNo入庫,表名:t_message_transaction,表結構 bizUniNo(主鍵),業務類型。
return LocalTransactionState.UNKNOW;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt message) {
//從消息體中獲取業務唯一ID
String bizUniNo = message.getUserProperty("bizUniNo");
//如果本地事務表(t_message_transaction)存在記錄,則認為提交;如果不存在返回未知。
//如果多次回查還是未查到消息,則回滾。
if (query(bizUniNo)>0){
return LocalTransactionState.COMMIT_MESSAGE;
}else{
return LocalTransactionState.UNKNOW;
}
}
//查詢數據庫是否存在記錄
public int query(String bizUniNo){
//select count(1) from t_message_transaction where biz_uni_no = #{bizUniNo}
return 1;
}
}/<code>

上面的代碼是筆者在這本書裡,抄錄出來的,如果是按照這種做法, 實際上是有問題的,我們來分析一下。

1、下單異常

我們看上面的訂單下單的偽代碼,裡面包含兩個操作:訂單入庫和事務消息發送。

那麼我們繼續思考:

  • 如果訂單入庫的時候發生異常,這個沒問題,因為事務消息也不會發送;
  • 如果訂單入庫執行完畢,但發送事務消息報錯。這個也沒問題,訂單數據會回滾;
  • 如果訂單入庫執行完畢,發送事務消息也沒有報錯。但返回的不是SEND_OK狀態,這個是有問題的。

因為只有發送事務消息成功,並且發送狀態為SEND_OK,才會執行監聽器中的本地事務,向t_message_transaction表寫入事務日誌。

那麼就會造成一個現場:本地訂單數據已經入庫,但是由於沒有返回SEND_OK狀態,導致不會執行本地事務中的事務日誌。那麼這條事務消息早晚會被回滾,最後的問題就是用戶下單成功,但沒有增加積分。

2、本地事務執行異常

事實上,第一個問題也可以規避。那就是在發送完事務消息後,再判斷下發送狀態是不是SEND_OK,如果不是的話,就通過拋異常的方式來回滾訂單數據。

但是,還有第二個問題:

如果訂單數據和事務消息發送都沒有問題,但是在執行本地事務時,寫入事務日誌時發生異常怎麼辦 ?

如果是這樣,也會導致本地訂單數據已經入庫,但是事務日誌沒有寫入,在事務狀態回查的時候一直查詢不到此記錄,最後只能回滾事務消息。最後的現象同樣是用戶下單成功,但沒有增加積分。

但是在書中,作者有這樣一段話:

executeLocalTransaction,該方法主要設置本地事務狀態,與業務代碼在一個事務中。例如在OrderService#createOrder中,只要本地事務提交成功,該方法也會提交成功。故在這裡,主要是向t_message_transaction添加一條記錄,在事務回查時,如果存在記錄,就認為該消息需要提交。

作者這段話的意思,我理解是說他們都處於一個本地事務中。如果createOrder方法執行成功,則executeLocalTransaction方法也會執行成功;如果任何一方出錯,都會回滾事務。

但是,我們從源碼中分析的話,如果本地事務執行報錯,訂單數據是不會回滾的。

3、源碼分析

首先,我們要知道,executeLocalTransaction方法和createOrder方法確實在一個事務裡。

這是因為executeLocalTransaction方法,是在發送事務消息之後,同步調用到的,所以它們在一個事務裡。

我們來看源碼中,事務消息發送的過程:

<code>public TransactionSendResult sendMessageInTransaction(Message msg, 
LocalTransactionExecuter localTransactionExecuter,
Object arg)throws MQClientException {
\t
//發送事務消息返回結果
SendResult sendResult = null;
//如果發送消息失敗,拋出異常
try {
\tsendResult = this.send(msg);
} catch (Exception var11) {
\tthrow new MQClientException("send message Exception", var11);
}
//初始化本地事務狀態:未知狀態
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch(sendResult.getSendStatus()) {
//如果發送事務消息狀態為send_ok
case SEND_OK:
try {
//執行本地事務方法
if (transactionListener != null) {
this.log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
} catch (Throwable var10) {
this.log.info("executeLocalTransactionBranch exception", var10);
this.log.info(msg.toString());
localException = var10;
}
break;

\t//如果發送事務狀態不是send_ok,該事務消息會被回滾
\tcase FLUSH_DISK_TIMEOUT:
\tcase FLUSH_SLAVE_TIMEOUT:
\tcase SLAVE_NOT_AVAILABLE:
\t localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
\t}
\t//結束事務,就是根據本地事務狀態,執行提交、回滾或暫不處理事務
\ttry {
\t this.endTransaction(sendResult, localTransactionState, localException);
\t} catch (Exception var9) {
\t this.log.warn("", var9);
\t}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}/<code>

上面的代碼,就是發送事務消息的過程。我們重點來看,如果事務消息發送成功,並且返回狀態為SEND_OK,那麼就去執行監聽器中的executeLocalTransaction方法,這說明它們在一個事務中。

但是,在執行過程中,它手動捕獲了 Throwable 異常。這就說明,即便執行本地事務失敗,也不會觸發回滾的。

至此,我們已經非常明確了,如果按照書裡的流程來寫代碼,這塊就會成為一個隱患點。

如果想規避這個問題,我們只能修改rocket-client中的代碼,比如:

<code>try {
//執行本地事務方法
if (transactionListener != null) {
this.log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
} catch (Throwable var10) {
this.log.info("executeLocalTransactionBranch exception", var10);
this.log.info(msg.toString());
localException = var10;
throw new MQClientException(e.getMessage(),e);
} /<code>

筆者通過修改源碼,並測試了一下,通過這種手動拋出異常的方式也是可以的。這樣的話如果執行本地事務的時候出錯,也會回滾訂單數據。

到這裡,就能回答筆者本文2.4章節裡的一個問題:

為什麼在訂單業務服務類中,需要有兩個方法。一個用於向RocketMQ發送事務消息,一個用於真正的業務數據落庫。

總結

本文重點闡述了基於RocketMQ來實現最終一致性的分佈式事務案例。

另外,也分享了關於《RocketMQ技術內幕》一書中,分佈式事務示例代碼,可能出現的異常問題。關於這一點,也希望朋友們如果有不同看法,積極留言,共同交流。


作者:清幽之地
鏈接:https://juejin.im/post/5e737d155188254943200ed0


分享到:


相關文章: