微服務系列之Stream消息驅動(二)

本篇文章為系列文章,未讀第一集的同學請猛戳這裡:

微服務系列之Stream消息驅動(一)

本篇文章講解 Stream 如何實現消息分組和消息分區。


消息分組

  


  如果有多個消息消費者,那麼消息生產者發送的消息會被多個消費者都接收到,這種情況在某些實際場景下是有很大問題的,比如在如下場景中,訂單系統做集群部署,都會從 RabbitMQ 中獲取訂單信息,如果一個訂單消息同時被兩個服務消費,系統肯定會出現問題。為了避免這種情況,Stream 提供了消息分組來解決該問題。

微服務系列之Stream消息驅動(二)

  在 Stream 中處於同一個 group 中的多個消費者是競爭關係,能夠保證消息只會被其中一個應用消費。不同的組是可以消費的,同一個組會發生競爭關係,只有其中一個可以消費。通過 spring.cloud.stream.bindings..group 屬性指定組名。

微服務系列之Stream消息驅動(二)

  

問題演示

  

  在 stream-demo 項目下創建 stream-consumer02 子項目。

  項目代碼使用入門案例中消息消費者的代碼。

  單元測試代碼如下:

<code> package com.example;
 ​
 import com.example.producer.MessageProducer;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 ​
 @SpringBootTest(classes = {StreamProducerApplication.class})
 public class MessageProducerTest {
 ​
     @Autowired
     private MessageProducer messageProducer;
 ​
     @Test
     public void testSend() {
         messageProducer.send("hello spring cloud stream");
     }
 ​
 }/<code>

  

測試

  

  運行單元測試發送消息,兩個消息消費者控制檯打印結果如下:

  stream-consumer 的控制檯:

<code> message = hello spring cloud stream/<code>

  stream-consumer02 的控制檯:

<code> message = hello spring cloud stream/<code>

  通過結果可以看到消息被兩個消費者同時消費了,原因是因為它們屬於不同的分組,默認情況下分組名稱是隨機生成的,通過 RabbitMQ 也可以得知:

微服務系列之Stream消息驅動(二)

  

配置分組

  

  stream-consumer 的分組配置為:group-A。

<code> server:
   port: 8002 # 端口
 ​
 spring:
   application:
     name: stream-consumer # 應用名稱
   rabbitmq:
     host: 192.168.10.101  # 服務器 IP
     port: 5672            # 服務器端口
     username: guest       # 用戶名
     password: guest       # 密碼
     virtual-host: /       # 虛擬主機地址
   cloud:
     stream:
       bindings:
         # 消息接收通道
         # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同
         input:
           destination: stream.message # 綁定的交換機名稱
           group: group-A/<code>

  

  stream-consumer02 的分組配置為:group-A。

<code> server:
   port: 8003 # 端口
 ​
 spring:
   application:
     name: stream-consumer # 應用名稱
   rabbitmq:
     host: 192.168.10.101  # 服務器 IP
     port: 5672            # 服務器端口
     username: guest       # 用戶名
     password: guest       # 密碼
     virtual-host: /       # 虛擬主機地址
   cloud:
     stream:
       bindings:
         # 消息接收通道
         # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同
         input:
           destination: stream.message # 綁定的交換機名稱
           group: group-A/<code>

  

測試

  

  運行單元測試發送消息,此時多個消息消費者只有其中一個可以消費。RabbitMQ 結果如下:

微服務系列之Stream消息驅動(二)

  

消息分區

  

  通過消息分組可以解決消息被重複消費的問題,但在某些場景下分組還不能滿足我們的需求。比如,同時有多條同一個用戶的數據發送過來,我們需要根據用戶統計,但是消息被分散到了不同的集群節點上了,這時我們就可以考慮使用消息分區了。

  當生產者將消息發送給多個消費者時,保證同一消息始終由同一個消費者實例接收和處理。消息分區是對消息分組的一種補充。

微服務系列之Stream消息驅動(二)

問題演示

  

  先給大家演示一下消息未分區的效果,單元測試代碼如下:

<code> package com.example;
 ​
 import com.example.producer.MessageProducer;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 ​
 @SpringBootTest(classes = {StreamProducerApplication.class})
 public class MessageProducerTest {
 ​
     @Autowired
     private MessageProducer messageProducer;
 ​
     @Test
     public void testSend() {
         for (int i = 1; i <= 10; i++) {
             messageProducer.send("hello spring cloud stream");
         }
     }
 ​
 }/<code>

  

測試

  

  運行單元測試發送消息,兩個消息消費者控制檯打印結果如下:

  stream-consumer 的控制檯:

<code> message = hello spring cloud stream
 message = hello spring cloud stream
 message = hello spring cloud stream
 message = hello spring cloud stream
 message = hello spring cloud stream/<code>

  stream-consumer02 的控制檯:

<code> message = hello spring cloud stream
 message = hello spring cloud stream
 message = hello spring cloud stream
 message = hello spring cloud stream
 message = hello spring cloud stream/<code>

  假設這 10 條消息都來自同一個用戶,正確的方式應該都由一個消費者消費所有消息,否則系統肯定會出現問題。為了避免這種情況,Stream 提供了消息分區來解決該問題。

  

配置分區

  

  消息生產者配置

分區的表達式規則消息分區的數量

<code> server:
   port: 8001 # 端口
 ​
 spring:
   application:
     name: stream-producer # 應用名稱
   rabbitmq:
     host: 192.168.10.101  # 服務器 IP
     port: 5672            # 服務器端口
     username: guest       # 用戶名
     password: guest       # 密碼
     virtual-host: /       # 虛擬主機地址
   cloud:
     stream:
       bindings:
         # 消息發送通道
         # 與 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 註解的 value 相同
         output:
           destination: stream.message # 綁定的交換機名稱
           producer:
             partition-key-expression: payload # 配置分區鍵的表達式規則
             partition-count: 2 # 配置消息分區的數量/<code>

  通過 partition-key-expression 參數指定分區鍵的表達式規則,用於區分每個消息被髮送至對應分區的輸出 channel。

  該表達式作用於傳遞給 MessageChannel 的 send 方法的參數,該參數實現 org.springframework.messaging.Message 接口的 GenericMessage 類。

  

  源碼 MessageChannel.java

<code> package org.springframework.messaging;
 ​
 @FunctionalInterface
 public interface MessageChannel {
     long INDEFINITE_TIMEOUT = -1L;
 ​
     default boolean send(Message> message) {
         return this.send(message, -1L);
     }
 ​
     boolean send(Message> var1, long var2);
 }/<code>

  源碼 GenericMessage.java

<code> package org.springframework.messaging.support;
 ​
 import java.io.Serializable;
 import java.util.Map;
 import org.springframework.lang.Nullable;
 import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageHeaders;
 import org.springframework.util.Assert;
 import org.springframework.util.ObjectUtils;
 ​
 public class GenericMessage implements Message, Serializable {
     private static final long serialVersionUID = 4268801052358035098L;
     private final T payload;
     private final MessageHeaders headers;
     
     ...
 ​
 }/<code>

  

  如果 partition-key-expression 的值是 payload,將會使用所有放在 GenericMessage 中的數據作為分區數據。payload 是消息的實體類型,可以為自定義類型比如 User,Role 等等。

  如果 partition-key-expression 的值是 headers["xxx"],將由 MessageBuilder 類的 setHeader() 方法完成賦值,比如:

<code> package com.example.producer;
 ​
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.cloud.stream.annotation.EnableBinding;
 import org.springframework.cloud.stream.messaging.Source;
 import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.stereotype.Component;
 ​
 /**
  * 消息生產者
  */
 @Component
 @EnableBinding(Source.class)
 public class MessageProducer {
 ​
     @Autowired
     private Source source;
 ​
     /**
      * 發送消息
      *
      * @param message
      */
     public void send(String message) {
         source.output().send(MessageBuilder.withPayload(message).setHeader("xxx", 0).build());
     }
 ​
 }/<code>

  

  消息消費者配置消費者總數當前消費者的索引開啟分區支持

  stream-consumer 的 application.yml

<code> server:
   port: 8002 # 端口
 ​
 spring:
   application:
     name: stream-consumer # 應用名稱
   rabbitmq:
     host: 192.168.10.101  # 服務器 IP
     port: 5672            # 服務器端口
     username: guest       # 用戶名
     password: guest       # 密碼
     virtual-host: /       # 虛擬主機地址
   cloud:
     stream:
       instance-count: 2 # 消費者總數
       instance-index: 0 # 當前消費者的索引
       bindings:
         # 消息接收通道
         # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同
         input:
           destination: stream.message # 綁定的交換機名稱
           group: group-A
           consumer:
             partitioned: true # 開啟分區支持/<code>

  stream-consumer02 的 application.yml

<code> server:
   port: 8003 # 端口
 ​
 spring:
   application:
     name: stream-consumer # 應用名稱
   rabbitmq:
     host: 192.168.10.101  # 服務器 IP
     port: 5672            # 服務器端口
     username: guest       # 用戶名
     password: guest       # 密碼
     virtual-host: /       # 虛擬主機地址
   cloud:
     stream:
       instance-count: 2 # 消費者總數
       instance-index: 1 # 當前消費者的索引
       bindings:
         # 消息接收通道
         # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同
         input:
           destination: stream.message # 綁定的交換機名稱
           group: group-A
           consumer:
             partitioned: true # 開啟分區支持/<code>

  

測試

  

  運行單元測試發送消息,此時多個消息消費者只有其中一個可以消費所有消息。RabbitMQ 結果如下:

微服務系列之Stream消息驅動(二)

  至此 Stream 消息驅動所有的知識點就講解結束了。

微服務系列之Stream消息驅動(二)

您的 點贊 和 轉發 是對我最大的支持。

掃描下方二維碼關注 哈嘍沃德先生「文檔 + 視頻」每篇文章都配有專門視頻講解,學習更輕鬆噢 ~

微服務系列之Stream消息驅動(二)

微服務系列之Stream消息驅動(二)


分享到:


相關文章: