基於RocketMq的SpringCloud Stream框架實戰入門


前言

老顧前一篇介紹了SpringCloud Stream(以下簡稱SCS)的基本概念,今天老顧帶著小夥伴們進入實戰,SCS官方默認是支持Kafka的實現、RabbitMQ的實現。不過今天老顧講的是RocketMq基於Rocketmq的實現是由alibaba完成了。

為什麼基於Rocketmq,也是為了將來講SpringCloud Alibaba作引子

Binder和Binding

org.springframework.cloud.stream.binder.Binder是Spring Cloud對消息容器的抽象,不同的消息容器有不同的實現,通過它可以屏蔽各消息容器的內部細節

基於RocketMq的SpringCloud Stream框架實戰入門

Binder可以生成Binding,Binding用來綁定消息容器的生產者和消費者,它有兩種類型,INPUT和OUTPUT,INPUT對應於消費者,OUTPUT對應於生產者。

POM

主要增加了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq 依賴,

老顧採用最新Spring Boot的2.1.8.RELEASE版本SpringCloud的Greenwich.SR2版本;Spring-Cloud-Alibaba的2.1.0.RELEASE版本。

增加依賴

基於RocketMq的SpringCloud Stream框架實戰入門

基於RocketMq的SpringCloud Stream框架實戰入門

EnableBinding配置

基於RocketMq的SpringCloud Stream框架實戰入門

我們需要通過在配置類上使用@EnableBinding指定需要使用的Binding,它指定的是一個接口,在對應接口中會定義一些標註了@Input或@Output的方法,它們就對應一個Binding了。

@Output註解對應的是org.springframework.messaging.MessageChannel,代表發佈者

@Input註解對應的是org.springframework.messaging.SubscribableChannel,代表消費者

Source內置接口

org.springframework.cloud.stream.messaging.Source是內置的Output接口

小夥伴們也可以不用內置的,模仿Source自行定義就行了

基於RocketMq的SpringCloud Stream框架實戰入門

它定義了一個OUTPUT類型的Binding,名稱為output,當不通過@Output指定Binding的名稱時,默認會使用方法名作為Binding的名稱。

Sink內置接口

Sink的定義如下,它定義了一個INPUT類型的Binding,名稱為input,當不通過@Input指定Binding的名稱時,默認會使用方法名作為Binding的名稱。

基於RocketMq的SpringCloud Stream框架實戰入門

在一個接口中可以同時定義多個Binding,只需要定義多個@Input或@Output標註的方法。Processor接口同時繼承了Source和Sink接口,所以當@EnableBinding指定了Processor接口時相當於同時應用了兩個Binding。

public interface Processor extends Source, Sink {
}
@EnableBinding({ Processor.class })

生產者

我們來定義一個output類型的Binding

基於RocketMq的SpringCloud Stream框架實戰入門

在上面代碼中我們指定了@EnableBinding接口為Source接口,即啟用了名稱為output的OUTPUT類型的Binding。Spring Cloud會自動實現該Binding的實現,也會提供Binding接口的實現,並註冊到bean容器中。即可以在程序中自動注入Source類型的bean,也可以注入MessageChannel類型的bean

基於RocketMq的SpringCloud Stream框架實戰入門

上面定義了一個生產發佈服務,直接注入Source類型的bean,然後通過Source的output()獲取MessageChannel實例,通過它的send()方法進行消息發送。

另一種使用方式,就是直接獲取MessageChannel,如下代碼,效果是一樣的。

基於RocketMq的SpringCloud Stream框架實戰入門

那發送的消息究竟會發送到哪裡呢?這就需要我們來定義對應的Binding和實際消息容器的生產者的映射了。可以通過spring.cloud.stream.bindings.<bindingname>.*的形式定義/<bindingname>Binding的一些屬性。

具體有什麼屬性可查看org.springframework.cloud.stream.config.BindingProperties

這裡我們通過其destination屬性指定該Binding對應的實際的目的地對應於RocketMQ就是一個Topic。

spring.cloud.stream.bindings.output.destination=test-topic

即我們上面發送的消息將發到RocketMQ的名為test-topic的Topic。

RocketMQ是需要指定NameServer的,所以在發送消息前,還需要基於RocketMQ這個Binder配置其NameServer的地址。

spring.cloud.stream.rocketmq.binder.namesrv-addr=192.168.31.153:9876

在啟動了RocketMQ的NameServer和Broker之後,就可以利用上面的代碼進行消息發送了。測試代碼如下。

在測試的時候可以在啟動RocketMQ時指定autoCreateTopicEnable=true

以開啟自動創建Topic的功能,如mqbroker -n localhost:9876 autoCreateTopicEnable=true。

基於RocketMq的SpringCloud Stream框架實戰入門

繼承CommandLineRunner接口,啟動就會執行run方法,就是調用ProviderService發送消息。到Rocketmq控制檯查看消息

基於RocketMq的SpringCloud Stream框架實戰入門

基於RocketMq的SpringCloud Stream框架實戰入門

消費者

消費者接收消息和生產者類似,也需要定義相應的Binding,也需要通過@EnableBinding進行指定。Spring Cloud的Sink接口中已經定義好一個名為input的Binding,如果只需要一個接收Binding,可以直接拿來用。

基於RocketMq的SpringCloud Stream框架實戰入門

作為消費者的Binding也必須指定對應的目的地,還必須指定一個消費者分組group相同group的消費者可以共同消費相同destination的消息,分擔壓力。

比如一個作為消費者的應用部署了三份它們的group都是一樣的,如果來了三條消息,那麼可能三臺應用都分別消費了其中的一條消息。而如果部署三份的group都不一樣,則每臺應用都將消費全部的三條消息

spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group

一般生產中group的名字用項目工程的名字

spring.cloud.stream.bindings.input.group=${spring.application.name}

作為消費者的應用也需要定義Binder的相關信息,如spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876

消費者需要在方法上通過

@StreamListener進行標註表示它將監聽消費某個Binding的消息對應的Binding可以通過@StreamListener的value或target屬性進行指定

基於RocketMq的SpringCloud Stream框架實戰入門

上面的代碼指定了消費者對應的Binding是名為input的Binding。而根據上面的配置該Binding對應的destination是test-topic,對於RocketMQ來說就是從名為test-topic的Topic獲取消息。

啟用消費者,控制檯輸出

接收消息: Hello Message

基於RocketMq的SpringCloud Stream框架實戰入門

總結

到這裡老顧分享了SCS基於Rocketmq的快速入門,用起來是非常簡單的,我們不需要知道Rocketmq的自身原生的用法,就可以做基本的發佈與消費。下一篇文章老顧分享更深入的知識點。謝謝!!!


---End---

最近老顧上傳了微服務網關的分享課程,請大家多多支持

1、

2、

3、

4、

5、

6、

7、

8、

9、

10、

11、

12、

13、

14、

15、

16、

17、

18、

19、

20、

21、

22、

23、

24、

25、

26、

27、

28、

29、

30、

31、

32、

33、

34、

35、

36、

37、

38、

39、

40、


分享到:


相關文章: