點擊上方 "程序員小樂"關注, 星標或置頂一起成長
每天凌晨00點00分, 第一時間與你相約
每日英文
If you wait to do everything until you're sure it's right, you'll probably never do much of anything.
如果你等到每件事都確定是對的才去做,那你也許永遠都成不了什麼事。
每日掏心話
生命的最寶貴的,就是你要去每天都有意識的生活,不要夢遊式的過你的生活。也許這個世界上有卑微的生命,但是決不應該有卑微的心靈。
來自:張申傲 | 責編:樂樂
鏈接:blog.csdn.net/weixin_34452850/article/details/88851419
程序員小樂(ID:study_tech)第 843 次推文 圖片來自百度
往日回顧:前後端分離開發,RESTful 接口應該這樣設計
正文
事務消息是RocketMQ提供的非常重要的一個特性,在4.x版本之後開源,可以利用事務消息輕鬆地實現分佈式事務。本文對RocketMQ的事務消息進行詳細介紹,並給出了代碼示例。
一. 相關概念
RocketMQ在其消息定義的基礎上,對事務消息擴展了兩個相關的概念:
- Half(Prepare) Message——半消息(預處理消息)
半消息是一種特殊的消息類型,該狀態的消息暫時不能被Consumer消費。當一條事務消息被成功投遞到Broker上,但是Broker並沒有接收到Producer發出的二次確認時,該事務消息就處於"
暫時不可被消費"狀態,該狀態的事務消息被稱為半消息。- Message Status Check——消息狀態回查
由於網絡抖動、Producer重啟等原因,可能導致Producer向Broker發送的二次確認消息沒有成功送達。如果Broker檢測到某條事務消息長時間處於半消息狀態,則會主動向Producer端發起回查操作,查詢該事務消息在Producer端的事務狀態(Commit 或 Rollback)。可以看出,Message Status Check主要用來解決分佈式事務中的超時問題。
二. 執行流程
上面是官網提供的事務消息執行流程圖,下面對具體流程進行分析:
- Step1:Producer向Broker端發送Half Message;
- Step2:Broker ACK,Half Message發送成功;
- Step3:Producer執行本地事務;
- Step4:本地事務完畢,根據事務的狀態,Producer向Broker發送二次確認消息,確認該Half Message的Commit或者Rollback狀態。Broker收到二次確認消息後,對於Commit狀態,則直接發送到Consumer端執行消費邏輯,而對於Rollback則直接標記為失敗,一段時間後清除,並不會發給Consumer。正常情況下,到此分佈式事務已經完成,剩下要處理的就是超時問題,即一段時間後Broker仍沒有收到Producer的二次確認消息;
- Step5:針對超時狀態,Broker主動向Producer發起消息回查;
- Step6:Producer處理回查消息,返回對應的本地事務的執行結果;
- Step7:Broker針對回查消息的結果,執行Commit或Rollback操作,同Step4。
三. 代碼實例
本節通過一個簡單的場景模擬RocketMQ的事務消息:存在2個微服務,分別是訂單服務和商品服務。訂單服務進行下單處理,併發送消息給商品服務,對於下單成功的商品進行減庫存。
首先是訂單服務:
<code>public
class
OrderService
{public
static
void
main
(String[] args)
throws
Exception { TransactionMQProducer producer =new
TransactionMQProducer(); producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); producer.setProducerGroup(RocketMQConstants.TRANSACTION_PRODUCER_GROUP); ThreadPoolExecutor executor =new
ThreadPoolExecutor(10
,50
,10L
, TimeUnit.SECONDS,new
ArrayBlockingQueue<>(20
), (Runnable r) ->new
Thread("Order Transaction Massage Thread"
)); producer.setExecutorService(executor); producer.setTransactionListener(new
OrderTransactionListener()); producer.start(); System.err.println("OrderService Start"
);for
(int
i =0
;i 10;i++){ String orderId = UUID.randomUUID().toString(); String payload ="下單,orderId: "
+ orderId; String tags ="Tag"
; Message message =new
Message(RocketMQConstants.TRANSACTION_TOPIC_NAME, tags, orderId, payload.getBytes(RemotingHelper.DEFAULT_CHARSET)); TransactionSendResult result = producer.sendMessageInTransaction(message, orderId); System.err.println("發送事務消息,發送結果: "
+ result); } } }/<code>
事務消息需要一個TransactionListener,主要進行本地事務的執行和事務回查,代碼如下:
<code>public
class
OrderTransactionListenerimplements
TransactionListener {private
static
final Map<String
,Boolean
> results =new
ConcurrentHashMap<>();public
LocalTransactionState executeLocalTransaction(Message msg,Object
arg) {String
orderId = (String
) arg;boolean
success = persistTransactionResult(orderId); System.err.println("訂單服務執行本地事務下單,orderId: "
+ orderId +", result: "
+ success);return
success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE; }public
LocalTransactionState checkLocalTransaction(MessageExt msg) {String
orderId = msg.getKeys(); System.err.println("執行事務消息回查,orderId: "
+ orderId);return
Boolean
.TRUE.equals(results.get(orderId)) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE; }private
boolean
persistTransactionResult(String
orderId) {boolean
success =Math
.abs(Objects.hash(orderId)) %2
==0
; results.put(orderId, success);return
success; } }/<code>
下面是商品服務及監聽器:
<code>public
class
ProductService
{public
static
void
main
(String[] args)
throws
Exception { DefaultMQPushConsumer consumer =new
DefaultMQPushConsumer(); consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); consumer.setConsumerGroup(RocketMQConstants.TRANSACTION_CONSUMER_GROUP); consumer.subscribe(RocketMQConstants.TRANSACTION_TOPIC_NAME,"*"
); consumer.registerMessageListener(new
ProductListener()); consumer.start(); System.err.println("ProductService Start"
); } }/<code>
<code>public
class
ProductListener
implements
MessageListenerConcurrently
{ @Overridepublic
ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { Optional.ofNullable(msgs).orElse(Collections.emptyList()).forEach
(m -> { String orderId = m.getKeys(); System.err.println("監聽到下單消息,orderId: "
+ orderId +", 商品服務減庫存"
); });return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }/<code>
分別運行OrderService和ProductService,可以看出只有事務執行成功的訂單才會通知商品服務進行減庫存。
<code>監聽到下單消息,orderId
: f25a7127-307
e-45
ce-8
f83-6
e0a922ebb94, 商品服務減庫存 監聽到下單消息,orderId
: d960171d-97
c0-4
e13-aa4a-c2b96102de4b, 商品服務減庫存 監聽到下單消息,orderId
:63
aedaa2-ce74-4
cb7-bf58-fb6a73082a73, 商品服務減庫存 監聽到下單消息,orderId
:25764461
-70
b2-44
db-8296
-960211179
e6e, 商品服務減庫存 監聽到下單消息,orderId
: fb319fe7-c8be-4
edf-ae4e-6108898068
ca, 商品服務減庫存 監聽到下單消息,orderId
:4
f61a61a-7254
-458
a-bc10-9
d4006a9f581, 商品服務減庫存/<code>
歡迎在留言區留下你的觀點,一起討論提高。如果今天的文章讓你有新的啟發,學習能力的提升上有新的認識,歡迎轉發 分享給更多人。
猜你還想看
阿里、騰訊、百度、華為、京東最新面試題彙集
5萬字長文!SpringBoot 操作 ElasticSearch 詳解
關於 MyBatis 我總結了 10 種通用的寫法
基於 token 的多平臺身份認證架構設計
<code> 關注訂閱號「程序員小樂」,收看更多精彩內容/<code>
嘿,你在看嗎?