SpringCloud Stream消息驅動

消息驅動原理

綁定器

通過定義綁定器作為中間層,實現了應用程序與消息中間件細節之間的隔離。通過嚮應用程序暴露統一的Channel通過,是的應用程序不需要再考慮各種不同的消息中間件的實現。當需要升級消息中間件,或者是更換其他消息中間件產品時,我們需要做的就是更換對應的Binder綁定器而不需要修改任何應用邏輯 。


SpringCloud Stream消息驅動

在該模型圖上有如下幾個核心概念:

  • Source: 當需要發送消息時,我們就需要通過Source,Source將會把我們所要發送的消息(POJO對象)進行序列化(默認轉換成JSON格式字符串),然後將這些數據發送到Channel中;
  • Sink: 當我們需要監聽消息時就需要通過Sink來,Sink負責從消息通道中獲取消息,並將消息反序列化成消息對象(POJO對象),然後交給具體的消息監聽處理進行業務處理;
  • Channel: 消息通道是Stream的抽象之一。通常我們向消息中間件發送消息或者監聽消息時需要指定主題(Topic)/消息隊列名稱,但這樣一旦我們需要變更主題名稱的時候需要修改消息發送或者消息監聽的代碼,但是通過Channel抽象,我們的業務代碼只需要對Channel就可以了,具體這個Channel對應的是那個主題,就可以在配置文件中來指定,這樣當主題變更的時候我們就不用對代碼做任何修改,從而實現了與具體消息中間件的解耦;
  • Binder: Stream中另外一個抽象層。通過不同的Binder可以實現與不同消息中間件的整合,比如上面的示例我們所使用的就是針對Kafka的Binder,通過Binder提供統一的消息收發接口,從而使得我們可以根據實際需要部署不同的消息中間件,或者根據實際生產中所部署的消息中間件來調整我們的配置。

什麼是消息驅動?

SpringCloud Stream消息驅動可以簡化開發人員對消息中間件的使用複雜度,讓系統開發人員更多盡力專注與核心業務邏輯的開發。SpringCloud Stream基於SpringBoot實現,自動配置化的功能可以幫助我們快速上手學習,類似與我們之前學習的orm框架,可以平滑的切換多種不同的數據庫。

目前SpringCloud Stream 目前只支持 rabbitMQ和kafka。


生產者環境

Maven依賴信息

<parent>

<groupid>org.springframework.boot/<groupid>

<artifactid>spring-boot-starter-parent/<artifactid>

<version>2.0.1.RELEASE/<version>

<dependencies>

<dependency>

<groupid>org.springframework.boot/<groupid>

<artifactid>spring-boot-starter-web/<artifactid>

<dependency>

<groupid>org.springframework.cloud/<groupid>

<artifactid>spring-cloud-starter-stream-rabbit/<artifactid>

<version>2.0.1.RELEASE/<version>

application.yml信息


server:

port: 9000

spring:

application:

name: spingcloud-stream-producer

# rabbitmq:

# host: 192.168.174.128

# port: 5672

# username: guest

# password: guest


創建管道

// 創建管道接口

public interface SendMessageInterface {

// 創建一個輸出管道,用於發送消息

@Output("my_msg")

SubscribableChannel sendMsg();

}


發送消息

@RestController

public class SendMsgController {

@Autowired

private SendMessageInterface sendMessageInterface;

@RequestMapping("/sendMsg")

public String sendMsg() {

String msg = UUID.randomUUID().toString();

System.out.println("生產者發送內容msg:" + msg);

Message build = MessageBuilder.withPayload(msg.getBytes()).build();

sendMessageInterface.sendMsg().send(build);

return "success";

}

}

啟動服務

@SpringBootApplication

@EnableBinding(SendMessageInterface.class) // 開啟綁定

public class AppProducer {

public static void main(String[] args) {

SpringApplication.run(AppProducer.class, args);

}

}


消費者環境

Maven

<parent>

<groupid>org.springframework.boot/<groupid>

<artifactid>spring-boot-starter-parent/<artifactid>

<version>2.0.1.RELEASE/<version>

<dependencies>

<dependency>

<groupid>org.springframework.boot/<groupid>

<artifactid>spring-boot-starter-web/<artifactid>

<dependency>

<groupid>org.springframework.cloud/<groupid>

<artifactid>spring-cloud-starter-stream-rabbit/<artifactid>

<version>2.0.1.RELEASE/<version>

application.yml

server:

port: 9000

spring:

application:

name: spingcloud-stream-consumer

# rabbitmq:

# host: 192.168.174.128

# port: 5672

# username: guest

# password: guest


管道中綁定消息

public interface RedMsgInterface {

// 從管道中獲取消息

@Input("my_msg")

SubscribableChannel redMsg();

}


消費者獲取消息

@Component

public class

Consumer {

@StreamListener("my_msg")

public void listener(String msg) {

System.out.println("消費者獲取生產消息:" + msg);

}

}


啟動消費者

@SpringBootApplication

@EnableBinding(RedMsgInterface.class)

public class AppConsumer {

public static void main(String[] args) {

SpringApplication.run(AppConsumer.class, args);

}

}

消費組

在現實的業務場景中,每一個微服務應用為了實現高可用和負載均衡,都會集群部署,按照上面我們啟動了兩個應用的實例,消息被重複消費了兩次。為解決這個問題,Spring Cloud Stream 中提供了消費組,通過配置 spring.cloud.stream.bindings.myInput.group 屬性為應用指定一個組名,下面修改下配置文件,

server:

port: 8001

spring:

application:

name: spring-cloud-stream

# rabbitmq:

# host: 192.168.174.128

# port: 5672

# username: guest

# password: guest

cloud:

stream:

bindings:

mymsg: ###指定 管道名稱

#指定該應用實例屬於 stream 消費組

group: stream

修改消費者

@Component

public class Consumer {

@Value("${server.port}")

private String serverPort;

@StreamListener("my_msg")

public void listener(String msg) {

System.out.println("消費者獲取生產消息:" + msg + ",端口號:" + serverPort);

}

}


更改環境為kafka

Maven依賴

<dependency>

<groupid>org.springframework.cloud/<groupid>

<artifactid>spring-cloud-starter-stream-kafka/<artifactid>

<version>2.0.1.RELEASE/<version>


生產者配置

server:

port: 9000

spring:

cloud:

stream:

# 設置成使用kafka

kafka:

binder:

# Kafka的服務端列表,默認localhost

brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092

# Kafka服務端連接的ZooKeeper節點列表,默認localhost

zkNodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181

minPartitionCount: 1

autoCreateTopics: true

autoAddPartitions: true

消費者配置

server:

port: 8000

spring:

application:

name: springcloud_kafka_consumer

cloud:

instance-count: 1

instance-index: 0

stream:

kafka:

binder:

brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092

zk-nodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181

auto-add-partitions: true

auto-create-topics: true

min-partition-count: 1

bindings:

input:

destination: my_msg

group: s1

consumer:

autoCommitOffset: false

concurrency: 1

partitioned: false



分享到:


相關文章: