10.22 如何搭建消息中間件應用框架之SpringCloud Stream


目錄

  1. 前言
  2. 什麼是Spring Messaging
  3. 什麼是Sping Integration
  4. SpringCloud Stream
  5. 綁定器
  6. 發佈訂閱
  7. 消費組
  8. 消費分區
  9. 總結

前言

官方對 Spring Cloud Stream 的一段介紹:Spring Cloud Stream 是一個用於構建基於消息的微服務應用框架。其目的是為了簡化消息在 Spring Cloud 應用程序中的開發。

老顧來翻譯一下,就是現在的消息中間件比較多,如:RabbitMQ、Kafka、RocketMq等;使用方法也不一樣,但是他們的本質流程是一樣,都有發佈/訂閱、消費組以及消息分區這三個核心概念。

所以SpringCloud就實現了一套輕量級的消息驅動的微服務框架

;通過使用 Spring Cloud Stream,可以忽略消息中間件的差異,有效簡化開發人員對消息中間件的使用複雜度,讓系統開發人員可以有更多的精力關注於核心業務邏輯的處理。

老顧先帶著小夥伴們瞭解幾個概念,這樣會更方便理解。

什麼是 Spring Messaging

Spring Messaging是Spring Framework中的一個模塊,其作用就是統一消息的編程模型

比如消息Messaging對應的模型就包括一個消息體Payload和消息頭Header


如何搭建消息中間件應用框架之SpringCloud Stream

如何搭建消息中間件應用框架之SpringCloud Stream

  • 消息通道MessageChannel用於接收消息,調用send方法可以將消息發送至該消息通道
如何搭建消息中間件應用框架之SpringCloud Stream

如何搭建消息中間件應用框架之SpringCloud Stream

消息通道里的消息如何被消費呢?

  • 由消息通道的子接口可訂閱的消息通道SubscribableChannel實現,被MessageHandler消息處理器所訂閱
如何搭建消息中間件應用框架之SpringCloud Stream

  • 由MessageHandler真正地
    消費/處理消息
如何搭建消息中間件應用框架之SpringCloud Stream

Spring Messaging在消息模型的基礎上衍生出了其它的一些功能,如:

1、消息接收參數及返回值處理:消息接收參數處理器HandlerMethodArgumentResolver配合@Header, @Payload等註解使用;消息接收後的返回值處理器HandlerMethodReturnValueHandler配合@SendTo註解使用。

2、消息體內容轉換器MessageConverter

3、統一抽象的消息發送模板AbstractMessageSendingTemplate

4、消息通道攔截器ChannelInterceptor

什麼是 Spring Integration

Spring Integration是一個功能強大的EIP(Enterprise Intergration Patterns,即企業集成模式)

是對Spring Messaging的擴展,它提出了不少新的概念,包括消息的路由MessageRoute、消息的分發MessageDispatcher、消息的過濾Filter、消息的轉換Transformer、消息的聚合Aggregator、消息的分割Splitter等等。

總結一句話就是對消息消費時進行額外的處理

1、消息的分割

如何搭建消息中間件應用框架之SpringCloud Stream

2、消息的聚合

如何搭建消息中間件應用框架之SpringCloud Stream

3、消息的過濾

如何搭建消息中間件應用框架之SpringCloud Stream

4、消息的分發

如何搭建消息中間件應用框架之SpringCloud Stream

我們來看一個例子

如何搭建消息中間件應用框架之SpringCloud Stream

1、步驟一先創建一個可訂閱消息通道messageChannel

2、定義一個消息消費者messagehandler,去消費通道里面的消息;用了lammda表達式實現了,簡單的

輸出一句話

3、步驟三發送一個消息

消息最終被消息通道里的 MessageHandler 所消費,最後控制檯打印出:

接收到: 第一條消息內容

我們再進入DirectChannel,內部有一個對象UnicastingDispatcher,這個是消息分發器,會分發到對應的消息通道MessageChannel中;

UnicastingDispatcher 是個單播的分發器只能選擇一個消息通道。那麼如何選擇呢? 內部提供了 LoadBalancingStrategy 負載均衡策略,默認只有輪詢的實現,可以進行擴展。

上面的代碼改動一下

如何搭建消息中間件應用框架之SpringCloud Stream

由於DirectChannel內部的消息分發器是UnicastingDispatcher單播的方式,並且採用輪詢的負載均衡策略,所以這裡兩次的消費分別對應這兩個MessageHandler。控制檯打印出:

如何搭建消息中間件應用框架之SpringCloud Stream

如果我們要實現廣播方式,也就是BroadcastingDispatcher,它被

PublishSubscribeChannel這個消息通道所使用。廣播消息分發器會把消息分發給所有的MessageHandler。

如何搭建消息中間件應用框架之SpringCloud Stream

發送兩個消息,都被所有的MessageHandler所消費。控制檯打印:

如何搭建消息中間件應用框架之SpringCloud Stream

Spring Cloud Stream

SpringCloud Stream(以下簡稱SCS)在 Spring Integration 的基礎上進行了封裝,提出了Binder, Binding, @EnableBinding, @StreamListener 等概念。另外SCS也整合了其他模塊

1、與Spring Boot Actuator整合,提供了 /bindings, /channels endpoint

2、與Spring Boot Externalized Configuration 整合,提供了BindingProperties, BinderProperties等外部化配置類

3、SCS增強了消息發送失敗的和消費失敗情況下的處理邏輯等功能

SCS 是 Spring Integration 的加強,同時與 Spring Boot 體系進行了融合,也是 Spring Cloud Bus 的基礎。它屏蔽了底層消息中間件的實現細節,希望以統一的一套API來進行消息的發送/消費,底層消息中間件的實現細節由各消息中間件的Binder 完成

Binder是提供與外部消息中間件集成的組件,為構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用於構造生產者和消費者。目前官方的實現有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 內部已經實現了RocketMQ Binder。

如何搭建消息中間件應用框架之SpringCloud Stream

從圖中可以看出,Binding是連接應用程序跟消息中間件的橋樑

,用於消息的消費和生產。

綁定器

Binder綁定器是Spring Cloud Stream中一個非常重要的概念。在沒有綁定器這個概念的情況下,我們的Spring Boot應用要直接與消息中間件進行信息交互的時候,由於各消息中間件構建的初衷不同,它們的實現細節上會有較大的差異性,這使得我們實現的消息交互邏輯就會非常笨重,因為對具體的中間件實現細節有太重的依賴,當中間件有較大的變動升級、或是更換中間件的時候,我們就需要付出非常大的代價來實施。

通過定義綁定器作為中間層,完美地實現了應用程序與消息中間件細節之間的隔離。通過嚮應用程序暴露統一的Channel通道,使得應用程序不需要再考慮各種不同的消息中間件實現。當我們需要升級消息中間件,或是更換其他消息中間件產品時,我們要做的就是

更換它們對應的Binder綁定器而不需要修改任何Spring Boot的應用邏輯

發佈-訂閱模式

在Spring Cloud Stream中的消息通信方式遵循了發佈-訂閱模式,當一條消息被投遞到消息中間件之後,它會通過共享的Topic主題進行廣播,消息消費者在訂閱的主題中收到它並觸發自身的業務邏輯處理。這裡所提到的Topic主題是Spring Cloud Stream中的一個抽象概念,用來代表發佈共享消息給消費者的地方。

在不同的消息中間件中,Topic可能對應著不同的概念,比如:在RabbitMQ中的它對應了Exchange、而在Kakfa中則對應了Kafka中的Topic

消費組

雖然Spring Cloud Stream通過發佈-訂閱模式將消息生產者與消費者做了很好的解耦,基於相同主題的消費者可以輕鬆的進行擴展,但是這些擴展都是針對不同的應用實例而言的

,在現實的微服務架構中,我們每一個微服務應用為了實現高可用和負載均衡,實際上都會部署多個實例。很多情況下,消息生產者發送消息給某個具體微服務時,只希望被消費一次,按照上面我們啟動兩個應用的例子,雖然它們同屬一個應用,但是這個消息出現了被重複消費兩次的情況。為了解決這個問題,在Spring Cloud Stream中提供了消費組的概念

如果在同一個主題上的應用需要啟動多個實例的時候,我們可以通過spring.cloud.stream.bindings.input.group屬性為應用指定一個組名,這樣這個應用的多個實例在接收到消息的時候,只會有一個成員真正的收到消息並進行處理。如下圖所示,我們為Service-A和Service-B分別啟動了兩個實例,並且根據服務名進行了分組,這樣當消息進入主題之後,Group-A和Group-B都會收到消息的副本,但是在兩個組中都只會有一個實例對其進行消費。

如何搭建消息中間件應用框架之SpringCloud Stream

消息分區

通過引入消費組的概念,我們已經能夠在多實例的情況下,保障每個消息只被組內一個實例進行消費。通過上面對消費組參數設置後的實驗,我們可以觀察到,消費組並無法控制消息具體被哪個實例消費。也就是說,對於同一條消息,它多次到達之後可能是由不同的實例進行消費的

。但是對於一些業務場景,就需要對於一些具有相同特徵的消息每次都可以被同一個消費實例處理,比如:一些用於監控服務,為了統計某段時間內消息生產者發送的報告內容,監控服務需要在自身內容聚合這些數據,那麼消息生產者可以為消息增加一個固有的特徵ID來進行分區,使得擁有這些ID的消息每次都能被髮送到一個特定的實例上實現累計統計的效果,否則這些數據就會分散到各個不同的節點導致監控結果不一致的情況。

分區概念的引入就是為了解決這樣的問題:當生產者將消息數據發送給多個消費者實例時,保證擁有共同特徵的消息數據始終是由同一個消費者實例接收和處理

Spring Cloud Stream為分區提供了通用的抽象實現,用來在消息中間件的上層實現分區

處理,所以它對於消息中間件自身是否實現了消息分區並不關心,這使得Spring Cloud Stream為不具備分區功能的消息中間件也增加了分區功能擴展

總結

今天我們介紹了SpringCloud Stream概念,說白了就是SCS就是在各個中間件對象基礎上面抽象了一層接口,這樣開發人員只要用SCS實現消息發佈/訂閱,不需要關心到底是用了哪個消息中間件。老顧下一篇就帶著小夥伴們快速實戰入門。


---End---

最近老顧上傳了微服務網關的分享課程,請大家多多支持

1、

2、

3、

4、

5、

6、

7、

8、

9、

10、

11、

12、

13、

14、

15、

16、

17、

18、

19、

20、

21、

22、

23、

24、

25、

26、

27、

28、

29、

30、

31、

32、

33、

34、

35、

36、

37、

38、

39、

40、


分享到:


相關文章: