Spring Cloud 微服务入门教程(七):Spring Cloud Stream

上一节的《Spring Cloud 微服务入门教程(六):Spring Cloud BUS 消息总线实现配置中心动态更新配置文件》已经安装了RabbitMQ消息队列,并实现了SpringCloudBus消息总线,本节介绍Spring Cloud Stream 消息队驱动式的微服务。可以使用RabbitMQ、Apache Kafka等,用于微服务之间的异步消息传递和接收。

我们先规划一下打算怎么做,直接上代码可能会有点难理解,我们要实现的是DemoClient给DemoService发送一条消息放入消息队列中,然后DemoService接收消息并且给DemoClient回复一条消息。我说一下我个人对这个Spring Cloud Stream的理解,消息被分为很多个频道,你可以接收某个频道,也可以对某个频道发送消息,所以你需要知道频道的名称,我就统一定义在统一接口中心里,这个统一接口中心是我自己设计的架构,并不是微服务的。在上一节我们已经配置过RabbitMQ,所以配置RabbitMQ的部分不再赘述。

修改apicenter、demoservice、democlient微服务的pom文件,增加spring-cloud-starter-stream-rabbit依赖,例如:

<code>
<project> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactid>cloud/<artifactid>
<groupid>net.renfei/<groupid>
<version>1.0.0/<version>
/<parent>
<modelversion>4.0.0/<modelversion>
<groupid>net.renfei/<groupid>
<artifactid>apicenter/<artifactid>
<version>1.0.0/<version>
<name>APICenter/<name>
<description>接口中心/<description>
<dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.cloud/<groupid>
<artifactid>spring-cloud-starter-stream-rabbit/<artifactid>
/<dependency>

/<dependencies>
/<project>/<code>

修改远程Git上的配置文件

修改远程Git上的application.yml,包括democlient和demoservice的配置,增加:

<code>spring:
stream:
bindings:
demoServiceMQ:
group: demo
content-type: application/json
demoClientMQ:
group: demo
content-type: application/json/<code>

bindings后面的是我们的频道名称,这个是自定义的,再后面的group是消费组,消费组防止消息被重复消费,微服务可能会启动多个实例组,保证每个组中只有一个成员会收到该消息,content-type是告诉框架我们要把对象作为json格式保存,这样方便我们在消息队列中查看对象的内容,调试起来会很方便。

统一接口中心新增频道名

在apicenter中新增一个net.renfei.apicenter.message.MQChannel的interface,用来规范和暴露所有微服务的频道名称:

<code>package net.renfei.apicenter.message;

/**
* 消息队列频道名称
*
* @author RenFei
*/
public interface MQChannel {

String DEMOSERVICE = "demoServiceMQ";
String DEMOCLIENT = "demoClientMQ";
}/<code>

消息接收端

在demoservice模块中新增net.renfei.demoservice.message.DemoServiceMessageClient作为接收客户端:

<code>package net.renfei.demoservice.message;

import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface DemoServiceMessageClient {
@Input(MQChannel.DEMOSERVICE)
SubscribableChannel input();
}/<code>

在demoservice模块中新增net.renfei.demoservice.message.DemoClientMessageClient作为发送客户端:

<code>package net.renfei.demoservice.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface DemoClientMessageClient {
@Output
MessageChannel output();
}/<code>

在demoservice模块中新增net.renfei.demoservice.message.DemoServiceReceiver作为消息监听者:

<code>package net.renfei.demoservice.message;

import lombok.extern.slf4j.Slf4j;
import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
* 服务端监听消息队列
*

* @author RenFei
*/
@Slf4j
@Component
@EnableBinding({DemoServiceMessageClient.class, DemoClientMessageClient.class})
public class DemoServiceReceiver {
@StreamListener(MQChannel.DEMOSERVICE)
@SendTo(MQChannel.DEMOCLIENT)
public String process(Object message) {
log.info("Messages received by the DemoService:{}", message);
return "This is DemoServiceReceiver's reply";
}
}/<code>

消息发送端

在democlient中新增net.renfei.democlient.message.DemoClientMessageClient作为消息接收客户端:

<code>package net.renfei.democlient.message;

import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface DemoClientMessageClient {
@Input(MQChannel.DEMOCLIENT)
SubscribableChannel input();
}/<code>

在democlient中新增net.renfei.democlient.message.DemoServiceMessageClient作为消息发送端:

<code>

在democlient中新增net.renfei.democlient.controller.SendMessageController作为消息发送的触发入口:

<code>package net.renfei.democlient.controller;

import net.renfei.democlient.message.DemoServiceMessageClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController

public class SendMessageController {
@Autowired
private DemoServiceMessageClient demoServiceMessageClient;

@GetMapping("/sendMessage")
public void sendMessage(){
demoServiceMessageClient.output().send(
MessageBuilder.withPayload("This is a message from democlient").build()
);
}
}/<code>

运行测试

先启动注册中心eureka,然后启动配置中心config,再启动demoservice服务,最后启动democlient,访问我们新建的DemoClientMessageClient,触发消息发送,演示系统的地址是:http://localhost:18081/sendMessage

Spring Cloud 微服务入门教程(七):Spring Cloud Stream

Spring Cloud 微服务入门教程(七):Spring Cloud Stream

总结

代码已经陈述完了,做一下总结,@Input SubscribableChannel是订阅频道,用于接收消息;@Output MessageChannel是用来发送消息,这样应用之间可以解耦合降低依赖,比较经典的场景是发送短信,核心的业务不需要等待短信接口的结果,直接给短信服务发送一个消息以后就去干别的事了,短信服务接收到消息以后逐一执行发送短信的任务。


分享到:


相關文章: