本篇文章為系列文章,未讀第一集的同學請猛戳這裡:
微服務系列之Stream消息驅動(一)本篇文章講解 Stream 如何實現消息分組和消息分區。
消息分組
如果有多個消息消費者,那麼消息生產者發送的消息會被多個消費者都接收到,這種情況在某些實際場景下是有很大問題的,比如在如下場景中,訂單系統做集群部署,都會從 RabbitMQ 中獲取訂單信息,如果一個訂單消息同時被兩個服務消費,系統肯定會出現問題。為了避免這種情況,Stream 提供了消息分組來解決該問題。
在 Stream 中處於同一個 group 中的多個消費者是競爭關係,能夠保證消息只會被其中一個應用消費。不同的組是可以消費的,同一個組會發生競爭關係,只有其中一個可以消費。通過 spring.cloud.stream.bindings..group 屬性指定組名。
問題演示
在 stream-demo 項目下創建 stream-consumer02 子項目。
項目代碼使用入門案例中消息消費者的代碼。
單元測試代碼如下:
<code> package com.example; import com.example.producer.MessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MessageProducer messageProducer; @Test public void testSend() { messageProducer.send("hello spring cloud stream"); } }/<code>
測試
運行單元測試發送消息,兩個消息消費者控制檯打印結果如下:
stream-consumer 的控制檯:
<code> message = hello spring cloud stream/<code>
stream-consumer02 的控制檯:
<code> message = hello spring cloud stream/<code>
通過結果可以看到消息被兩個消費者同時消費了,原因是因為它們屬於不同的分組,默認情況下分組名稱是隨機生成的,通過 RabbitMQ 也可以得知:
配置分組
stream-consumer 的分組配置為:group-A。
<code> server: port: 8002 # 端口 spring: application: name: stream-consumer # 應用名稱 rabbitmq: host: 192.168.10.101 # 服務器 IP port: 5672 # 服務器端口 username: guest # 用戶名 password: guest # 密碼 virtual-host: / # 虛擬主機地址 cloud: stream: bindings: # 消息接收通道 # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同 input: destination: stream.message # 綁定的交換機名稱 group: group-A/<code>
stream-consumer02 的分組配置為:group-A。
<code> server: port: 8003 # 端口 spring: application: name: stream-consumer # 應用名稱 rabbitmq: host: 192.168.10.101 # 服務器 IP port: 5672 # 服務器端口 username: guest # 用戶名 password: guest # 密碼 virtual-host: / # 虛擬主機地址 cloud: stream: bindings: # 消息接收通道 # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同 input: destination: stream.message # 綁定的交換機名稱 group: group-A/<code>
測試
運行單元測試發送消息,此時多個消息消費者只有其中一個可以消費。RabbitMQ 結果如下:
消息分區
通過消息分組可以解決消息被重複消費的問題,但在某些場景下分組還不能滿足我們的需求。比如,同時有多條同一個用戶的數據發送過來,我們需要根據用戶統計,但是消息被分散到了不同的集群節點上了,這時我們就可以考慮使用消息分區了。
當生產者將消息發送給多個消費者時,保證同一消息始終由同一個消費者實例接收和處理。消息分區是對消息分組的一種補充。
問題演示
先給大家演示一下消息未分區的效果,單元測試代碼如下:
<code> package com.example; import com.example.producer.MessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MessageProducer messageProducer; @Test public void testSend() { for (int i = 1; i <= 10; i++) { messageProducer.send("hello spring cloud stream"); } } }/<code>
測試
運行單元測試發送消息,兩個消息消費者控制檯打印結果如下:
stream-consumer 的控制檯:
<code> message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream/<code>
stream-consumer02 的控制檯:
<code> message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream/<code>
假設這 10 條消息都來自同一個用戶,正確的方式應該都由一個消費者消費所有消息,否則系統肯定會出現問題。為了避免這種情況,Stream 提供了消息分區來解決該問題。
配置分區
消息生產者配置
分區鍵的表達式規則和消息分區的數量。<code> server: port: 8001 # 端口 spring: application: name: stream-producer # 應用名稱 rabbitmq: host: 192.168.10.101 # 服務器 IP port: 5672 # 服務器端口 username: guest # 用戶名 password: guest # 密碼 virtual-host: / # 虛擬主機地址 cloud: stream: bindings: # 消息發送通道 # 與 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 註解的 value 相同 output: destination: stream.message # 綁定的交換機名稱 producer: partition-key-expression: payload # 配置分區鍵的表達式規則 partition-count: 2 # 配置消息分區的數量/<code>
通過 partition-key-expression 參數指定分區鍵的表達式規則,用於區分每個消息被髮送至對應分區的輸出 channel。
該表達式作用於傳遞給 MessageChannel 的 send 方法的參數,該參數實現 org.springframework.messaging.Message 接口的 GenericMessage 類。
源碼 MessageChannel.java
<code> package org.springframework.messaging; @FunctionalInterface public interface MessageChannel { long INDEFINITE_TIMEOUT = -1L; default boolean send(Message> message) { return this.send(message, -1L); } boolean send(Message> var1, long var2); }/<code>
源碼 GenericMessage.java
<code> package org.springframework.messaging.support; import java.io.Serializable; import java.util.Map; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; public class GenericMessage implements Message, Serializable { private static final long serialVersionUID = 4268801052358035098L; private final T payload; private final MessageHeaders headers; ... }/<code>
如果 partition-key-expression 的值是 payload,將會使用所有放在 GenericMessage 中的數據作為分區數據。payload 是消息的實體類型,可以為自定義類型比如 User,Role 等等。
如果 partition-key-expression 的值是 headers["xxx"],將由 MessageBuilder 類的 setHeader() 方法完成賦值,比如:
<code> package com.example.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生產者 */ @Component @EnableBinding(Source.class) public class MessageProducer { @Autowired private Source source; /** * 發送消息 * * @param message */ public void send(String message) { source.output().send(MessageBuilder.withPayload(message).setHeader("xxx", 0).build()); } }/<code>
消息消費者配置消費者總數和當前消費者的索引並開啟分區支持。
stream-consumer 的 application.yml
<code> server: port: 8002 # 端口 spring: application: name: stream-consumer # 應用名稱 rabbitmq: host: 192.168.10.101 # 服務器 IP port: 5672 # 服務器端口 username: guest # 用戶名 password: guest # 密碼 virtual-host: / # 虛擬主機地址 cloud: stream: instance-count: 2 # 消費者總數 instance-index: 0 # 當前消費者的索引 bindings: # 消息接收通道 # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同 input: destination: stream.message # 綁定的交換機名稱 group: group-A consumer: partitioned: true # 開啟分區支持/<code>
stream-consumer02 的 application.yml
<code> server: port: 8003 # 端口 spring: application: name: stream-consumer # 應用名稱 rabbitmq: host: 192.168.10.101 # 服務器 IP port: 5672 # 服務器端口 username: guest # 用戶名 password: guest # 密碼 virtual-host: / # 虛擬主機地址 cloud: stream: instance-count: 2 # 消費者總數 instance-index: 1 # 當前消費者的索引 bindings: # 消息接收通道 # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同 input: destination: stream.message # 綁定的交換機名稱 group: group-A consumer: partitioned: true # 開啟分區支持/<code>
測試
運行單元測試發送消息,此時多個消息消費者只有其中一個可以消費所有消息。RabbitMQ 結果如下:
至此 Stream 消息驅動所有的知識點就講解結束了。
您的 點贊 和 轉發 是對我最大的支持。
掃描下方二維碼關注 哈嘍沃德先生「文檔 + 視頻」每篇文章都配有專門視頻講解,學習更輕鬆噢 ~
關鍵字: hello message springframework