Github 博客地址
本文主要講解RabbitMQ的介紹和安裝,Spring Cloud Stream核心概念,Spring Cloud Alibaba RocketMQ學習,異步消息推送與消費
1 審核業務的實現
- com/javaedge/contentcenter/service/content/ShareService.java
假設添加積分操作很耗時,我們的主要操作是審核,而不關心積分,所以可以將其異步化
1.1 Spring實現異步的方法
◆ AsyncRestTemplate
- 參考文檔 Spring 的異步HTTP請求AsyncRestTemplate
◆ @ Async註解
- 參考文檔 https://spring.io/guides/gs/async-method/
◆ WebClient ( Spring 5.0引入 ,為取代AsyncRestTemplate)
- 參考文檔 https://docs.spring.io/spring/docs5.1. RELEASE/spring-framework-reference/web-reactive.html#webflux-client
◆ MQ 我們採用此法
2 引入MQ後的架構演進
3 MQ適用場景
- 異步處理
- 流量削峰填谷
- 解耦微服務4 MQ的選擇流行的MQ那麼多,如何選擇?
- Kafka、RabbitMQ、 RocketMQ、 ActiveMQ...
5 搭建RocketMQ
- 下載與安裝 RocketMQ實戰(一) - 簡介6 搭建RocketMQ控制檯
- 修改pom.xml版本
- 修改代碼
7 Spring消息編程模型
- 推薦Maven依賴版本分析插件
7.1 編寫生產者
content-center
開始拿出三板斧:
- 引入依賴
- 添加註解 無
- 寫配置
- 服務類添加模板類
7.2 編寫消費者
user-center
- 依賴
- 配置
- com.javaedge.contentcenter.rocketmq.AddBonusTransactionListener
小結
- RocketMQ : RocketMQMessageListener
- ActiveMQ/Artemis : JmsListener
- RabbitMQ : RabbitListener
- Kafka : KafkaListener8 分佈式事務流程剖析、概念術語、 如何實現事務呢,我們知道Spring有事務註解,那麼直接就添加@Transaction註解吧!可這樣是萬無一失了嗎?顯然不行,因為消息已經發出,沒法撤回了 那麼看看RocketMQ是怎麼解決分佈式事務問題呢8.1 實現分佈式事務流程
業務流程圖
- 半消息,雖然被存儲到MQserver,但會被標記為暫時不能投遞,所以消費者不會接受到該消息
- 半消息發送成功,開始3
- 開始執行本地事務
- 生產者根據本地事務,發送二次確認請求 MQServer如果從4中接收到的是commit,就把消息置為可投遞,這樣消費者就可消費該消息了rollback:將該消息刪除
- MQServer未收到4中的二次確認消息,就會回查
- 生產者檢查本地事務的執行結果
- 根據本地事務執行結果,發送commit/rollback消息
總體來說,就是生產者把消息發送到MQ,但MQ只是將其標記,不讓消費者消費 然後生產者就執行本地事務,執行完後就知道到底是該投遞還是丟棄該消息了! 這其實就是典型的二次確認 消費回查就是防止二次確認消息發送異常的容錯處理
8.2 關鍵概念
◆ 半消息( Half(Prepare) Message ) 暫時無法消費的消息。生產者將消息發送到了MQ server ,但這個消息會被標記為"暫不能投遞"狀態,先存儲起來;消費者不會去消費這條消息。 並不是消息的狀態,只是一種特殊的消息而已 ◆ 消息回查(Message Status Check ) 網絡斷開或生產者重啟可能導致丟失事務消息的第二次確認。當MQ Server發現消息長時間處於半消息狀態時,將向消息生產者發送請求,詢問該消息的最終狀態(提交或回滾)。
8.3 事務消息三狀態
◆ Commit 提交事務消息,消費者可以消費此消息 ◆ Rollback 回滾事務消息, broker會刪除該消息,消費者不能消費. ◆UNKNOWN broker需要回查確認該消息的狀態
9 分佈式事務 - 編碼實現
- 在內容中心新增事務日誌表rocketmq_transaction_log對照上一小節流程圖,開始code!9.1/2 發半消息改造接口
- 將原先如下代碼刪除
- 改為如下接方法9.3 執行本地事務
- 新建rocketmq包,並在其中創建一個新類AddBonusTransactionListener
- 一定要在該類上加@RocketMQTransactionListener註解 其中的txProducerGroup一定要對應哦
注意這裡的
- msg參數即對應
- args參數即對應
- 回查本地事務執行結果,即通過查詢日誌記錄表,該表在執行完本地事務後更新
- 這即可回查啦
參考
- 消息隊列對比參照表
- RocketMQ vs. ActiveMQ vs. Kafka
- 面向未來微服務:Spring Cloud Alibaba從入門到進階
閱讀更多 Java愛好者哦 的文章