基于RocketMq的SpringCloud Stream框架实战入门


前言

老顾前一篇介绍了SpringCloud Stream(以下简称SCS)的基本概念,今天老顾带着小伙伴们进入实战,SCS官方默认是支持Kafka的实现、RabbitMQ的实现。不过今天老顾讲的是RocketMq基于Rocketmq的实现是由alibaba完成了。

为什么基于Rocketmq,也是为了将来讲SpringCloud Alibaba作引子

Binder和Binding

org.springframework.cloud.stream.binder.Binder是Spring Cloud对消息容器的抽象,不同的消息容器有不同的实现,通过它可以屏蔽各消息容器的内部细节

基于RocketMq的SpringCloud Stream框架实战入门

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

POM

主要增加了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq 依赖,

老顾采用最新Spring Boot的2.1.8.RELEASE版本SpringCloud的Greenwich.SR2版本;Spring-Cloud-Alibaba的2.1.0.RELEASE版本。

增加依赖

基于RocketMq的SpringCloud Stream框架实战入门

基于RocketMq的SpringCloud Stream框架实战入门

EnableBinding配置

基于RocketMq的SpringCloud Stream框架实战入门

我们需要通过在配置类上使用@EnableBinding指定需要使用的Binding,它指定的是一个接口,在对应接口中会定义一些标注了@Input或@Output的方法,它们就对应一个Binding了。

@Output注解对应的是org.springframework.messaging.MessageChannel,代表发布者

@Input注解对应的是org.springframework.messaging.SubscribableChannel,代表消费者

Source内置接口

org.springframework.cloud.stream.messaging.Source是内置的Output接口

小伙伴们也可以不用内置的,模仿Source自行定义就行了

基于RocketMq的SpringCloud Stream框架实战入门

它定义了一个OUTPUT类型的Binding,名称为output,当不通过@Output指定Binding的名称时,默认会使用方法名作为Binding的名称。

Sink内置接口

Sink的定义如下,它定义了一个INPUT类型的Binding,名称为input,当不通过@Input指定Binding的名称时,默认会使用方法名作为Binding的名称。

基于RocketMq的SpringCloud Stream框架实战入门

在一个接口中可以同时定义多个Binding,只需要定义多个@Input或@Output标注的方法。Processor接口同时继承了Source和Sink接口,所以当@EnableBinding指定了Processor接口时相当于同时应用了两个Binding。

public interface Processor extends Source, Sink {
}
@EnableBinding({ Processor.class })

生产者

我们来定义一个output类型的Binding

基于RocketMq的SpringCloud Stream框架实战入门

在上面代码中我们指定了@EnableBinding接口为Source接口,即启用了名称为output的OUTPUT类型的Binding。Spring Cloud会自动实现该Binding的实现,也会提供Binding接口的实现,并注册到bean容器中。即可以在程序中自动注入Source类型的bean,也可以注入MessageChannel类型的bean

基于RocketMq的SpringCloud Stream框架实战入门

上面定义了一个生产发布服务,直接注入Source类型的bean,然后通过Source的output()获取MessageChannel实例,通过它的send()方法进行消息发送。

另一种使用方式,就是直接获取MessageChannel,如下代码,效果是一样的。

基于RocketMq的SpringCloud Stream框架实战入门

那发送的消息究竟会发送到哪里呢?这就需要我们来定义对应的Binding和实际消息容器的生产者的映射了。可以通过spring.cloud.stream.bindings.<bindingname>.*的形式定义/<bindingname>Binding的一些属性。

具体有什么属性可查看org.springframework.cloud.stream.config.BindingProperties

这里我们通过其destination属性指定该Binding对应的实际的目的地对应于RocketMQ就是一个Topic。

spring.cloud.stream.bindings.output.destination=test-topic

即我们上面发送的消息将发到RocketMQ的名为test-topic的Topic。

RocketMQ是需要指定NameServer的,所以在发送消息前,还需要基于RocketMQ这个Binder配置其NameServer的地址。

spring.cloud.stream.rocketmq.binder.namesrv-addr=192.168.31.153:9876

在启动了RocketMQ的NameServer和Broker之后,就可以利用上面的代码进行消息发送了。测试代码如下。

在测试的时候可以在启动RocketMQ时指定autoCreateTopicEnable=true

以开启自动创建Topic的功能,如mqbroker -n localhost:9876 autoCreateTopicEnable=true。

基于RocketMq的SpringCloud Stream框架实战入门

继承CommandLineRunner接口,启动就会执行run方法,就是调用ProviderService发送消息。到Rocketmq控制台查看消息

基于RocketMq的SpringCloud Stream框架实战入门

基于RocketMq的SpringCloud Stream框架实战入门

消费者

消费者接收消息和生产者类似,也需要定义相应的Binding,也需要通过@EnableBinding进行指定。Spring Cloud的Sink接口中已经定义好一个名为input的Binding,如果只需要一个接收Binding,可以直接拿来用。

基于RocketMq的SpringCloud Stream框架实战入门

作为消费者的Binding也必须指定对应的目的地,还必须指定一个消费者分组group相同group的消费者可以共同消费相同destination的消息,分担压力。

比如一个作为消费者的应用部署了三份它们的group都是一样的,如果来了三条消息,那么可能三台应用都分别消费了其中的一条消息。而如果部署三份的group都不一样,则每台应用都将消费全部的三条消息

spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group

一般生产中group的名字用项目工程的名字

spring.cloud.stream.bindings.input.group=${spring.application.name}

作为消费者的应用也需要定义Binder的相关信息,如spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876

消费者需要在方法上通过

@StreamListener进行标注表示它将监听消费某个Binding的消息对应的Binding可以通过@StreamListener的value或target属性进行指定

基于RocketMq的SpringCloud Stream框架实战入门

上面的代码指定了消费者对应的Binding是名为input的Binding。而根据上面的配置该Binding对应的destination是test-topic,对于RocketMQ来说就是从名为test-topic的Topic获取消息。

启用消费者,控制台输出

接收消息: Hello Message

基于RocketMq的SpringCloud Stream框架实战入门

总结

到这里老顾分享了SCS基于Rocketmq的快速入门,用起来是非常简单的,我们不需要知道Rocketmq的自身原生的用法,就可以做基本的发布与消费。下一篇文章老顾分享更深入的知识点。谢谢!!!


---End---

最近老顾上传了微服务网关的分享课程,请大家多多支持

1、

2、

3、

4、

5、

6、

7、

8、

9、

10、

11、

12、

13、

14、

15、

16、

17、

18、

19、

20、

21、

22、

23、

24、

25、

26、

27、

28、

29、

30、

31、

32、

33、

34、

35、

36、

37、

38、

39、

40、


分享到:


相關文章: