Spring Cloud 微服務入門教程(七):Spring Cloud Stream

上一節的《Spring Cloud 微服務入門教程(六):Spring Cloud BUS 消息總線實現配置中心動態更新配置文件》已經安裝了RabbitMQ消息隊列,並實現了SpringCloudBus消息總線,本節介紹Spring Cloud Stream 消息隊驅動式的微服務。可以使用RabbitMQ、Apache Kafka等,用於微服務之間的異步消息傳遞和接收。

我們先規劃一下打算怎麼做,直接上代碼可能會有點難理解,我們要實現的是DemoClient給DemoService發送一條消息放入消息隊列中,然後DemoService接收消息並且給DemoClient回覆一條消息。我說一下我個人對這個Spring Cloud Stream的理解,消息被分為很多個頻道,你可以接收某個頻道,也可以對某個頻道發送消息,所以你需要知道頻道的名稱,我就統一定義在統一接口中心裡,這個統一接口中心是我自己設計的架構,並不是微服務的。在上一節我們已經配置過RabbitMQ,所以配置RabbitMQ的部分不再贅述。

修改apicenter、demoservice、democlient微服務的pom文件,增加spring-cloud-starter-stream-rabbit依賴,例如:

<code>
<project> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactid>cloud/<artifactid>
<groupid>net.renfei/<groupid>
<version>1.0.0/<version>
/<parent>
<modelversion>4.0.0/<modelversion>
<groupid>net.renfei/<groupid>
<artifactid>apicenter/<artifactid>
<version>1.0.0/<version>
<name>APICenter/<name>
<description>接口中心/<description>
<dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.cloud/<groupid>
<artifactid>spring-cloud-starter-stream-rabbit/<artifactid>
/<dependency>

/<dependencies>
/<project>/<code>

修改遠程Git上的配置文件

修改遠程Git上的application.yml,包括democlient和demoservice的配置,增加:

<code>spring:
stream:
bindings:
demoServiceMQ:
group: demo
content-type: application/json
demoClientMQ:
group: demo
content-type: application/json/<code>

bindings後面的是我們的頻道名稱,這個是自定義的,再後面的group是消費組,消費組防止消息被重複消費,微服務可能會啟動多個實例組,保證每個組中只有一個成員會收到該消息,content-type是告訴框架我們要把對象作為json格式保存,這樣方便我們在消息隊列中查看對象的內容,調試起來會很方便。

統一接口中心新增頻道名

在apicenter中新增一個net.renfei.apicenter.message.MQChannel的interface,用來規範和暴露所有微服務的頻道名稱:

<code>package net.renfei.apicenter.message;

/**
* 消息隊列頻道名稱
*
* @author RenFei
*/
public interface MQChannel {

String DEMOSERVICE = "demoServiceMQ";
String DEMOCLIENT = "demoClientMQ";
}/<code>

消息接收端

在demoservice模塊中新增net.renfei.demoservice.message.DemoServiceMessageClient作為接收客戶端:

<code>package net.renfei.demoservice.message;

import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface DemoServiceMessageClient {
@Input(MQChannel.DEMOSERVICE)
SubscribableChannel input();
}/<code>

在demoservice模塊中新增net.renfei.demoservice.message.DemoClientMessageClient作為發送客戶端:

<code>package net.renfei.demoservice.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface DemoClientMessageClient {
@Output
MessageChannel output();
}/<code>

在demoservice模塊中新增net.renfei.demoservice.message.DemoServiceReceiver作為消息監聽者:

<code>package net.renfei.demoservice.message;

import lombok.extern.slf4j.Slf4j;
import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
* 服務端監聽消息隊列
*

* @author RenFei
*/
@Slf4j
@Component
@EnableBinding({DemoServiceMessageClient.class, DemoClientMessageClient.class})
public class DemoServiceReceiver {
@StreamListener(MQChannel.DEMOSERVICE)
@SendTo(MQChannel.DEMOCLIENT)
public String process(Object message) {
log.info("Messages received by the DemoService:{}", message);
return "This is DemoServiceReceiver's reply";
}
}/<code>

消息發送端

在democlient中新增net.renfei.democlient.message.DemoClientMessageClient作為消息接收客戶端:

<code>package net.renfei.democlient.message;

import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface DemoClientMessageClient {
@Input(MQChannel.DEMOCLIENT)
SubscribableChannel input();
}/<code>

在democlient中新增net.renfei.democlient.message.DemoServiceMessageClient作為消息發送端:

<code>

在democlient中新增net.renfei.democlient.controller.SendMessageController作為消息發送的觸發入口:

<code>package net.renfei.democlient.controller;

import net.renfei.democlient.message.DemoServiceMessageClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController

public class SendMessageController {
@Autowired
private DemoServiceMessageClient demoServiceMessageClient;

@GetMapping("/sendMessage")
public void sendMessage(){
demoServiceMessageClient.output().send(
MessageBuilder.withPayload("This is a message from democlient").build()
);
}
}/<code>

運行測試

先啟動註冊中心eureka,然後啟動配置中心config,再啟動demoservice服務,最後啟動democlient,訪問我們新建的DemoClientMessageClient,觸發消息發送,演示系統的地址是:http://localhost:18081/sendMessage

Spring Cloud 微服務入門教程(七):Spring Cloud Stream

Spring Cloud 微服務入門教程(七):Spring Cloud Stream

總結

代碼已經陳述完了,做一下總結,@Input SubscribableChannel是訂閱頻道,用於接收消息;@Output MessageChannel是用來發送消息,這樣應用之間可以解耦合降低依賴,比較經典的場景是發送短信,核心的業務不需要等待短信接口的結果,直接給短信服務發送一個消息以後就去幹別的事了,短信服務接收到消息以後逐一執行發送短信的任務。


分享到:


相關文章: