Springcloud + RocketMQ 解決分佈式事務


開篇思考

  1. 為什麼要分佈式事務?
  2. 分佈式事務有哪些實現方式?哪種可靠?
  3. 分佈式哪些環節會出問題?出了問題怎麼應對?

站在巨人的肩膀觀察和思考

隨著互聯網時代的高速發展,分佈式成了大型系統的標配,這是時代發展的選擇。大型分佈式系統不是每個公司和開發人員都能夠涉及的領域,因為大型系統後面都 隱藏著眾多代名詞:複雜,昂貴,高科技,人才雲集,大戰略。。。

大部分領頭互聯網公司甚至依託自己的分佈式經驗逐步建立自己的體系,並使用這套體系搭建自己的平臺對內,甚至對外提供服務, 就像現在眾多的雲平臺提供的服務,甚至有些大戰略提出促進發展:大中臺小前臺、大炮臺支援單兵作戰等等。

這裡提到了中臺的概念,這個概念很廣,都是以用戶為中心的,分佈式只是其中的一小部分運用,之所以強行和分佈式掛鉤,是想說明現在的發展趨勢變了, 我們的眼界是有限的,但是完全可以站在巨人的肩膀上,利用他們的高度來提升自己的眼界,來思考,我們到底應該怎麼做,怎麼做適合 我們自身發展。

我認為思考能力永遠是一個程序員魅力所在,善於發現和思考,能夠不斷的幫助我們提升。


Springcloud + RocketMQ 解決分佈式事務


微服務架構的優勢和問題

優勢:

  • 擴展性強:可根據業務需要增加服務,不影響現有的服務架構
  • 單一隔離:服務和服務之前通過遠程調用,單個服務只提供單一職責的功能,開發人員可以一人一服務進行開發,互不影響
  • 高可用:服務可以集群部署,單個應用失敗通過重試和熔斷等措施可以提高穩定性
  • 技術選型靈活:可以根據團隊特點、業務需求選擇合適的技術棧,提高開發效率
  • 突破性能瓶頸:可以集群方式部署,解決單體訪問峰值等處理能力的性能瓶頸
  • 降低運維成本:應對不同的場景,例如雙十一,可以動態增減服務集群數量,做到按需付費,減少開支

問題:

  • 複雜度高:整體架構設計十分複雜,需要考慮到整體性、經濟性、穩定性
  • 容易混淆服務界限:哪些服務需要劃分微服務,哪些可以作為子模塊,需要認真思考
  • 難以確保一致性:因為是鏈路調用,一個請求會經過多個服務,難以確保執行結果達到一致性

CAP & BASE

因為寫過這兩個理論的具體介紹,這裡簡單再說明下。還有疑問的可以看介紹 鏈接


Springcloud + RocketMQ 解決分佈式事務


這兩是分佈式架構發展至今形成的理論基礎,只要分佈式都繞不開的原則。CAP 教會我們如何在設計的時候取捨,是要高可用,還是強一致性? 這個看業務的具體情況和需求分析,如果是關於資金的流轉的 例如:銀行轉賬系統,肯定會要求保證一致性的場景更多,錢嘛,你懂得,不能多也不能少,少了錯了就是大事。如果只是一些簡單的商品查詢,可能用戶更希望是可用性,也就是我隨時查看都可以看到商品,但是商品信息可以在短時間內不一致。

也就是任何的架構設計,都是根據業務場景來分析的。通常 CAP 理論不夠完善明確具體實現,這時候就可以用到 BASE 理論,簡單的說就是通過基本可用和軟狀態,來達到最終一致性的狀態。

舉個例子:秒殺服務,商品秒殺成功,商品庫存 -1,但是訂單入庫需要調用訂單服務,但是我們必須要先執行減庫存操作成功後再執行,這個時候可以先提交,然後發送到可靠消息系統 MQ,MQ 發送到訂單服務進行入庫操作,如果入庫失敗則進行重試

這裡有箇中間狀態,庫存信息入庫而訂單服務可能失敗或者進行重試狀態,這裡要麼重試成功,最終訂單成功入庫;要麼徹底失敗,回滾庫存入庫信息改為初始狀態。

上面的整體流程基本就是下面介紹的基於 RocketMQ 進行的分佈式事務流程。

基於可靠消息(RocketMQ)最終一致性方案


Springcloud + RocketMQ 解決分佈式事務


RocketMQ 是一個來自阿里巴巴的分佈式消息中間件,於 2012 年開源。​RocketMQ 事務消息設計則主要是為了解決 Producer 端的消息發送與本地事務執行的原子性問題,RocketMQ 的設計中 broker 與producer 端的雙向通信能力,使得 broker 天生可以作為一個事務協調者存在;而 RocketMQ 本身提供的存儲機制為事務消息提供了持久化能力;RocketMQ 的高可用機制以及可靠消息設計則為事務消息在系統發生異常時依然能夠保證達成事務的最終一致性。

使用場景分析

流程舉例(中國銀行 -- 轉賬 -- 人民銀行):

  1. 中國銀行要扣款 - 100 ,準備 HalfMsg,消息中攜帶中國銀行 - 100 的信息
  2. 中國銀行 HalfMsg 成功發送後,執行數據庫本地事務,在自己的庫中 - 100
  3. 查看本地事務執行情況,成功則 commit 消息;失敗則回滾不發送消息
  4. 人民銀行系統訂閱了 MQ,MQ 確保消息發送到人民銀行系統
  5. 人民銀行系統執行本地事務,數據庫中 + 100

問題分析:

  1. half 消息沒有發送成功:不會進入本地事務執行邏輯,且 MQ 沒有消息
  2. 本地事務失敗,rollback half 消息:MQ 只有 half 消息,不會被其他服務消費,此時可以回滾清除消息
  3. 本地事務成功,half 消息 commit 失敗:MQ 定時查詢本地事務狀態,本地事務成功則繼續 commit MQ 消息
  4. 消費者消費 MQ 消息失敗:事務沒有執行,或者事務執行失敗,基於 ACK 的重試機制重試,知道消費成功,如果還是不行,記錄日誌持久化,預警查看問題是執行回滾還是更新

如何在項目中使用

以上主要流程都是 RocketMQ 實現,對用戶使用來說,用戶需要實現的部分是:①本地事務執行 ;②本地事務回查方法因此代碼中的實現只需關注本地事務的執行狀態即可。下面貼出具體實現方式

pom 依賴

springcloud pom 的基礎上,添加依賴,完整的 springcloud alibaba pom 可以參考這裡

https://github.com/TorGor/seckill/blob/master/nacos-consumer-one/pom.xml

<code>        <dependency>
<groupid>com.alibaba.cloud/<groupid>
<artifactid>spring-cloud-starter-stream-rocketmq/<artifactid>
/<dependency>
/<code>

模塊添加

<code>@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@MapperScan("com.holy.nacosconsumerone.dao")
@EnableBinding({ MySource.class })
public class NacosConsumerOneApplication {

public static void main(String[] args) {
SpringApplication.run(NacosConsumerOneApplication.class, args);
}
}
/<code>

mysource

<code>public interface MySource {

\t\t@Output("output1")
\t\tMessageChannel output1();


\t\t@Output("output2")
\t\tMessageChannel output2();

\t\t@Output("output3")
\t\tMessageChannel output3();

\t\t@Output("output4")
\t\tMessageChannel output4();

\t}
/<code>

sendservice

<code>/**
* @author
*/
@Service
public class SenderService {

\t@Autowired
\tprivate MySource source;

\tpublic void send(String msg) throws Exception {
\t\tsource.output1().send(MessageBuilder.withPayload(msg).build());
\t}

\tpublic void sendTransactionalMsg(T msg, int num) throws Exception {
\t\tMessageBuilder builder = MessageBuilder.withPayload(msg)
\t\t\t\t.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
\t\tbuilder.setHeader("tx-header-num", String.valueOf(num));
\t\tbuilder.setHeader(RocketMQHeaders.TAGS, "binder");
\t\tMessage message = builder.build();
\t\tsource.output2().send(message);
\t}

}
/<code>

劃重點 : MQ 事務監聽類實現 RocketMQLocalTransactionListener

<code>import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

/**
* 對需要使用分佈式事務的消息發送接口監聽
* 根據事務消息分組來致性
* ①本地事務先執行,根據業務情況執行提交、回滾操作
* ②本地事務回查
*
* @author
*/
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5,
maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

/**
* 執行本地事務
* ①事務執行成功,commit
* ②事務執行失敗,rollback
* ③回查發送消息,unknown
*
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object num = msg.getHeaders().get("tx-header-num");
try {
// 本地業務代碼,事務執行
if ("1".equals(num)) {
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown");
return RocketMQLocalTransactionState.UNKNOWN;
} else if ("2".equals(num)) {
throw new Exception("Exception for RocketMQ rollback");
}
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
System.out.println(e.getMessage());
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback");
return RocketMQLocalTransactionState.ROLLBACK;
}

}

/**
* 執行本地事務回查,當狀態為 UNKNOW 會執行這個方法,回查間隔時間差不多一分鐘。
*
* 業務代碼用來檢查事務當前狀態,是否執行完成,如果完成就執行 COMMIT
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 檢查本地事務
System.out.println("check: " + new String((byte[]) msg.getPayload()));
return RocketMQLocalTransactionState.COMMIT;
}

}
/<code>

controller 測試:

<code>/**
* @author holy
*/
@RestController
@RequestMapping("/rocketMQ")
public class RocketMQController {

\t@Resource
\tprivate SenderService senderService;

\t@GetMapping(value = "/transactionMsg")
\tpublic Object rocketMQTX(int num,String msg) {
\t\ttry {
\t\t\tsenderService.sendTransactionalMsg(msg,num);
\t\t} catch (Exception e) {
\t\t\te.printStackTrace();
\t\t}
\t\treturn "OK" + num;
\t}

}

/<code>

配置文件:

<code>spring:
application:
name: service-consumer
cloud:
stream:
bindings:
output1:
content-type: application/json
destination: test-topic
output2:
content-type: application/json
destination: TransactionTopic
output3:
content-type: text/plain
destination: pull-topic
rocketmq:
bindings:
output1:
producer:
group: binder-group
sync: true
output2:
producer:
group: myTxProducerGroup
transactional: true
output3:
producer:
group: pull-binder-group
binder:
name-server: 192.168.244.89:987

/<code>

完整的代碼的可以看我的完整項目:項目地址 (https://github.com/TorGor/seckill)

後續思考

本地事務複雜,執行查詢時間太久如何處理?

針對不同的情況,其實我們只要認真分析場景,自然可以設計出對應的解決辦法。有些時候我們更希望通過一張事務執行情況表來判斷事務的整體實行情況,比如業務比較複雜的時候,需要更新很多的表信息,這時候使用事務表根據 TransactionID 來記錄事務執行情況反而更貼合實際的使用場景。

如果我不是用 rocketMQ ,可以通過其他的 MQ 來實現分佈式事務處理嗎?

其實這個也是沒有問題,大體思路就是封裝新服務,專門用來檢查事務執行情況,根據事務狀態來決定是否發送消息到 MQ。

有好的建議歡迎在下方留言,大家一起討論一起進步。優秀的評論會被選出並贈送獎勵

程序領域程序領域(id:think-holy) 作者:holy 程序汪一隻


分享到:


相關文章: