分佈式事務之 RocketMQ 事務消息詳解

點擊上方 "程序員小樂"關注, 星標或置頂一起成長

每天凌晨00點00分, 第一時間與你相約


每日英文

If you wait to do everything until you're sure it's right, you'll probably never do much of anything.

如果你等到每件事都確定是對的才去做,那你也許永遠都成不了什麼事。


每日掏心話

生命的最寶貴的,就是你要去每天都有意識的生活,不要夢遊式的過你的生活。也許這個世界上有卑微的生命,但是決不應該有卑微的心靈。

來自:張申傲 | 責編:樂樂

鏈接:blog.csdn.net/weixin_34452850/article/details/88851419

分佈式事務之 RocketMQ 事務消息詳解

程序員小樂(ID:study_tech)第 843 次推文 圖片來自百度


往日回顧:前後端分離開發,RESTful 接口應該這樣設計


正文

事務消息是RocketMQ提供的非常重要的一個特性,在4.x版本之後開源,可以利用事務消息輕鬆地實現分佈式事務。本文對RocketMQ的事務消息進行詳細介紹,並給出了代碼示例。

一. 相關概念

RocketMQ在其消息定義的基礎上,對事務消息擴展了兩個相關的概念:

  1. Half(Prepare) Message——半消息(預處理消息)

半消息是一種特殊的消息類型,該狀態的消息暫時不能被Consumer消費。當一條事務消息被成功投遞到Broker上,但是Broker並沒有接收到Producer發出的二次確認時,該事務消息就處於"

暫時不可被消費"狀態,該狀態的事務消息被稱為半消息。

  1. Message Status Check——消息狀態回查

由於網絡抖動、Producer重啟等原因,可能導致Producer向Broker發送的二次確認消息沒有成功送達。如果Broker檢測到某條事務消息長時間處於半消息狀態,則會主動向Producer端發起回查操作,查詢該事務消息在Producer端的事務狀態(Commit 或 Rollback)。可以看出,Message Status Check主要用來解決分佈式事務中的超時問題。

二. 執行流程

分佈式事務之 RocketMQ 事務消息詳解

上面是官網提供的事務消息執行流程圖,下面對具體流程進行分析:

  1. Step1:Producer向Broker端發送Half Message;
  2. Step2:Broker ACK,Half Message發送成功;
  3. Step3:Producer執行本地事務;
  4. Step4:本地事務完畢,根據事務的狀態,Producer向Broker發送二次確認消息,確認該Half Message的Commit或者Rollback狀態。Broker收到二次確認消息後,對於Commit狀態,則直接發送到Consumer端執行消費邏輯,而對於Rollback則直接標記為失敗,一段時間後清除,並不會發給Consumer。正常情況下,到此分佈式事務已經完成,剩下要處理的就是超時問題,即一段時間後Broker仍沒有收到Producer的二次確認消息;
  5. Step5:針對超時狀態,Broker主動向Producer發起消息回查;
  6. Step6:Producer處理回查消息,返回對應的本地事務的執行結果;
  7. Step7:Broker針對回查消息的結果,執行Commit或Rollback操作,同Step4。

三. 代碼實例

本節通過一個簡單的場景模擬RocketMQ的事務消息:存在2個微服務,分別是訂單服務和商品服務。訂單服務進行下單處理,併發送消息給商品服務,對於下單成功的商品進行減庫存。

首先是訂單服務:

<code> 

public

 

class

 

OrderService

 

{   

public

 

static

 

void

 

main

(String[] args)

 

throws

 Exception 

{     TransactionMQProducer producer = 

new

 TransactionMQProducer();     producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);     producer.setProducerGroup(RocketMQConstants.TRANSACTION_PRODUCER_GROUP);          ThreadPoolExecutor executor = 

new

 ThreadPoolExecutor(

10

50

10L

, TimeUnit.SECONDS, 

new

 ArrayBlockingQueue<>(

20

), (Runnable r) -> 

new

 Thread(

"Order Transaction Massage Thread"

));     producer.setExecutorService(executor);          producer.setTransactionListener(

new

 OrderTransactionListener());     producer.start();     System.err.println(

"OrderService Start"

);     

for

 (

int

 i = 

0

;i 10;i++){       String orderId = UUID.randomUUID().toString();       String payload = 

"下單,orderId: "

 + orderId;       String tags = 

"Tag"

;       Message message = 

new

 Message(RocketMQConstants.TRANSACTION_TOPIC_NAME, tags, orderId, payload.getBytes(RemotingHelper.DEFAULT_CHARSET));              TransactionSendResult result = producer.sendMessageInTransaction(message, orderId);       System.err.println(

"發送事務消息,發送結果: "

 + result);     }   } }/<code>

事務消息需要一個TransactionListener,主要進行本地事務的執行和事務回查,代碼如下:

<code> 

public

 

class

 OrderTransactionListener 

implements

 TransactionListener {   

private

 

static

 final Map<

String

Boolean

> results = 

new

 ConcurrentHashMap<>();      

public

 LocalTransactionState executeLocalTransaction(Message msg, 

Object

 arg) {     

String

 orderId = (

String

) arg;          

boolean

 success = persistTransactionResult(orderId);     System.err.println(

"訂單服務執行本地事務下單,orderId: "

 + orderId + 

", result: "

 + success);     

return

 success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;   }      

public

 LocalTransactionState checkLocalTransaction(MessageExt msg) {     

String

 orderId = msg.getKeys();     System.err.println(

"執行事務消息回查,orderId: "

 + orderId);     

return

 

Boolean

.TRUE.equals(results.get(orderId)) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;   }   

private

 

boolean

 persistTransactionResult(

String

 orderId) {     

boolean

 success = 

Math

.abs(Objects.hash(orderId)) % 

2

 == 

0

;     results.put(orderId, success);     

return

 success;   } }/<code>

下面是商品服務及監聽器:

<code> 

public

 

class

 

ProductService

 

{   

public

 

static

 

void

 

main

(String[] args)

 

throws

 Exception 

{     DefaultMQPushConsumer consumer = 

new

 DefaultMQPushConsumer();     consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);     consumer.setConsumerGroup(RocketMQConstants.TRANSACTION_CONSUMER_GROUP);     consumer.subscribe(RocketMQConstants.TRANSACTION_TOPIC_NAME, 

"*"

);     consumer.registerMessageListener(

new

 ProductListener());     consumer.start();     System.err.println(

"ProductService Start"

);   } }/<code>
<code> 

public

 

class

 

ProductListener

 

implements

 

MessageListenerConcurrently

 

{   @Override   

public

 ConsumeConcurrentlyStatus consumeMessage(

List

 msgs, ConsumeConcurrentlyContext context) {     Optional.ofNullable(msgs).orElse(Collections.emptyList()).

forEach

(m -> {       String orderId = m.getKeys();       System.err.println(

"監聽到下單消息,orderId: "

 + orderId + 

", 商品服務減庫存"

);     });     

return

 ConsumeConcurrentlyStatus.CONSUME_SUCCESS;   } }/<code>

分別運行OrderService和ProductService,可以看出只有事務執行成功的訂單才會通知商品服務進行減庫存。

<code>監聽到下單消息,

orderId

: f25a7127-

307

e-

45

ce-

8

f83-

6

e0a922ebb94, 商品服務減庫存 監聽到下單消息,

orderId

: d960171d-

97

c0-

4

e13-aa4a-c2b96102de4b, 商品服務減庫存 監聽到下單消息,

orderId

63

aedaa2-ce74-

4

cb7-bf58-fb6a73082a73, 商品服務減庫存 監聽到下單消息,

orderId

25764461

-

70

b2-

44

db-

8296

-

960211179

e6e, 商品服務減庫存 監聽到下單消息,

orderId

: fb319fe7-c8be-

4

edf-ae4e-

6108898068

ca, 商品服務減庫存 監聽到下單消息,

orderId

4

f61a61a-

7254

-

458

a-bc10-

9

d4006a9f581, 商品服務減庫存/<code>


分佈式事務之 RocketMQ 事務消息詳解

歡迎在留言區留下你的觀點,一起討論提高。如果今天的文章讓你有新的啟發,學習能力的提升上有新的認識,歡迎轉發

分享給更多人。

猜你還想看


阿里、騰訊、百度、華為、京東最新面試題彙集

5萬字長文!SpringBoot 操作 ElasticSearch 詳解

關於 MyBatis 我總結了 10 種通用的寫法

基於 token 的多平臺身份認證架構設計

<code> 關注訂閱號「程序員小樂」,收看更多精彩內容/<code>

嘿,你在看嗎?


分享到:


相關文章: