【SpringBoot MQ 系列】RabbitListener 消費基本使用姿勢介紹

【SpringBoot MQ 系列】RabbitListener 消費基本使用姿勢介紹

【MQ 系列】RabbitListener 消費基本使用姿勢介紹

之前介紹了 rabbitmq 的消息發送姿勢,既然有發送,當然就得有消費者,在 SpringBoot 環境下,消費可以說比較簡單了,藉助@RabbitListener註解,基本上可以滿足你 90%以上的業務開發需求

下面我們來看一下@RabbitListener的最最常用使用姿勢

I. 配置

首先創建一個 SpringBoot 項目,用於後續的演示

  • springboot 版本為2.2.1.RELEASE
  • rabbitmq 版本為 3.7.5 (安裝教程可參考: 【MQ 系列】springboot + rabbitmq 初體驗)

依賴配置文件 pom.xml

<code><parent>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-parent/<artifactid>
<version>2.2.1.RELEASE/<version>
<relativepath>
/<parent>

<properties>
<project.build.sourceencoding>UTF-8/<project.build.sourceencoding>
<project.reporting.outputencoding>UTF-8/<project.reporting.outputencoding>

<java.version>1.8/<java.version>
/<properties>

<dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-amqp/<artifactid>
/<dependency>

<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
/<dependencies>

<build>
<pluginmanagement>
<plugins>
<plugin>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-maven-plugin/<artifactid>
/<plugin>
/<plugins>
/<pluginmanagement>
/<build>
<repositories>
<repository>
spring-snapshots
<name>Spring Snapshots/<name>
https://repo.spring.io/libs-snapshot-local
<snapshots>
<enabled>true/<enabled>
/<snapshots>
/<repository>
<repository>
spring-milestones
<name>Spring Milestones/<name>
https://repo.spring.io/libs-milestone-local
<snapshots>
<enabled>false/<enabled>

/<snapshots>
/<repository>
<repository>
spring-releases
<name>Spring Releases/<name>
https://repo.spring.io/libs-release-local
<snapshots>
<enabled>false/<enabled>
/<snapshots>
/<repository>
/<repositories>/<code>

在application.yml配置文件中,添加 rabbitmq 的相關屬性

<code>spring:
rabbitmq:
virtual-host: /
username: admin
password: admin
port: 5672
host: 127.0.0.1/<code>

II. 消費姿勢

本文將目標放在實用性上,將結合具體的場景來演示@RabbitListener的使用姿勢,因此當你發現看完本文之後這個註解裡面有些屬性還是不懂,請不要著急,下一篇會一一道來

0. mock 數據

消費消費,沒有數據,怎麼消費呢?所以我們第一步,先創建一個消息生產者,可以往 exchange 寫數據,供後續的消費者測試使用

本篇的消費主要以 topic 模式來進行說明(其他的幾個模式使用差別不大,如果有需求的話,後續補齊)

<code>@RestController
public class PublishRest {
@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping(path = "publish")
public boolean publish(String exchange, String routing, String data) {
rabbitTemplate.convertAndSend(exchange, routing, data);
return true;
}
}/<code>

提供一個簡單 rest 接口,可以指定往哪個 exchange 推送數據,並制定路由鍵

1. case1: exchange, queue 已存在

對於消費者而言其實是不需要管理 exchange 的創建/銷燬的,它是由發送者定義的;一般來講,消費者更關注的是自己的 queue,包括定義 queue 並與 exchange 綁定,而這一套過程是可以直接通過 rabbitmq 的控制檯操作的哦

【SpringBoot MQ 系列】RabbitListener 消費基本使用姿勢介紹

所以實際開發過程中,exchange 和 queue 以及對應的綁定關係已經存在的可能性是很高的,並不需要再代碼中額外處理;

在這種場景下,消費數據,可以說非常非常簡單了,如下:

<code>/**
* 當隊列已經存在時,直接指定隊列名的方式消費
*
* @param data
*/
@RabbitListener(queues = "topic.a")
public void consumerExistsQueue(String data) {
System.out.println("consumerExistsQueue: " + data);
}/<code>

直接指定註解中的queues參數即可,參數值為對列名(queueName)

2. case2: queue 不存在

當 queue 的 autoDelete 屬性為 false 時,上面的使用場景還是比較合適了;但是,當這個屬性為 true 時,沒有消費者隊列就會自動刪除了,這個時候再用上面的姿勢,可能會得到下面的異常

【SpringBoot MQ 系列】RabbitListener 消費基本使用姿勢介紹

隊列不存在

通常這種場景下,是需要我們來主動創建 Queue,並建立與 Exchange 的綁定關係,下面給出@RabbitListener的推薦使用姿勢

<code>/**
* 隊列不存在時,需要創建一個隊列,並且與exchange綁定
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topic.n1", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC),
key = "r"))
public void consumerNoQueue(String data) {
System.out.println("consumerNoQueue: " + data);
}/<code>

一個註解,內部聲明瞭隊列,並建立綁定關係,就是這麼神奇!!!

注意@QueueBinding註解的三個屬性:

  • value: @Queue 註解,用於聲明隊列,value 為 queueName, durable 表示隊列是否持久化, autoDelete 表示沒有消費者之後隊列是否自動刪除
  • exchange: @Exchange 註解,用於聲明 exchange, type 指定消息投遞策略,我們這裡用的 topic 方式
  • key: 在 topic 方式下,這個就是我們熟知的 routingKey

以上,就是在隊列不存在時的使用姿勢,看起來也不復雜

3. case3: ack

在前面 rabbitmq 的核心知識點學習過程中,會知道為了保證數據的一致性,有一個消息確認機制;

我們這裡的 ack 主要是針對消費端而言,當我們希望更改默認 ack 方式(noack, auto, manual),可以如下處理

<code>/**
* 需要手動ack,但是不ack時
*
* @param data
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n2", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL")
public void consumerNoAck(String data) {
// 要求手動ack,這裡不ack,會怎樣?
System.out.println("consumerNoAck: " + data);
}/<code>

上面的實現也比較簡單,設置ackMode=MANUAL,手動 ack

但是,請注意我們的實現中,沒有任何一個地方體現了手動 ack,這就相當於一致都沒有 ack,在後面的測試中,可以看出這種不 ack 時,會發現數據一直在unacked這一欄,當 Unacked 數量超過限制的時候,就不會再消費新的數據了

4. case4: manual ack

上面雖然選擇 ack 方式,但是還缺一步 ack 的邏輯,接下來我們看一下如何補齊

<code>/**
* 手動ack
*
* @param data
* @param deliveryTag
* @param channel
* @throws IOException
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n3", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL")
public void consumerDoAck(String data, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
throws IOException {
System.out.println("consumerDoAck: " + data);

if (data.contains("success")) {
// RabbitMQ的ack機制中,第二個參數返回true,表示需要將這條消息投遞給其他的消費者重新消費
channel.basicAck(deliveryTag, false);
} else {
// 第三個參數true,表示這個消息會重新進入隊列
channel.basicNack(deliveryTag, false, true);
}
}/<code>

請注意,方法多了兩個參數

  • deliveryTag: 相當於消息的唯一標識,用於 mq 辨別是哪個消息被 ack/nak 了
  • channel: mq 和 consumer 之間的管道,通過它來 ack/nak

當我們正確消費時,通過調用 basicAck 方法即可

<code>// RabbitMQ的ack機制中,第二個參數返回true,表示需要將這條消息投遞給其他的消費者重新消費
channel.basicAck(deliveryTag, false);/<code>

當我們消費失敗,需要將消息重新塞入隊列,等待重新消費時,可以使用 basicNack

<code>// 第三個參數true,表示這個消息會重新進入隊列
channel.basicNack(deliveryTag, false, true);/<code>

5. case5: 併發消費

當消息很多,一個消費者吭哧吭哧的消費太慢,但是我的機器性能又槓槓的,這個時候我就希望並行消費,相當於同時有多個消費者來處理數據

要支持並行消費,如下設置即可

<code>@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n4", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), concurrency = "4")
public void multiConsumer(String data) {
System.out.println("multiConsumer: " + data);
}/<code>
【SpringBoot MQ 系列】RabbitListener 消費基本使用姿勢介紹

請注意註解中的concurrency = "4"屬性,表示固定 4 個消費者;

除了上面這種賦值方式之外,還有一種 m-n 的格式,表示 m 個並行消費者,最多可以有 n 個

(額外說明:這個參數的解釋實在SimpleMessageListenerContainer的場景下的,下一篇文章會介紹它與DirectMessageListenerContainer的區別)

6. 測試

通過前面預留的消息發送接口,我們在瀏覽器中請求: http://localhost:8080/publish?exchange=topic.e&routing=r&data=wahaha

【SpringBoot MQ 系列】RabbitListener 消費基本使用姿勢介紹


然後看一下輸出,五個消費者都接收到了,特別是主動 nak 的那個消費者,一直在接收到消息;

(因為一直打印日誌,所以重啟一下應用,開始下一個測試)

然後再發送一條成功的消息,驗證下手動真確 ack,是否還會出現上面的情況,請求命令: http://localhost:8080/publish?exchange=topic.e&routing=r&data=successMsg

【SpringBoot MQ 系列】RabbitListener 消費基本使用姿勢介紹

然後再關注一下,沒有 ack 的那個隊列,一直有一個 unack 的消息

【SpringBoot MQ 系列】RabbitListener 消費基本使用姿勢介紹


分享到:


相關文章: