Delayed Message 插件實現 RabbitMQ 延遲隊列

DLX + TTL 方式存在的時序問題

對於延遲隊列不管是 AMQP 協議或者 RabbitMQ 本身是不支持的,之前有介紹過如何使用 RabbitMQ 死信隊列(DLX) + TTL 的方式來模擬實現延遲隊列,這也是通常的一種做法,可參見我的另一篇文章 利用 RabbitMQ 死信隊列和 TTL 實現定時任務。

今天我想說的是這種方式會存在一個時序問題,看下圖:

Delayed Message 插件實現 RabbitMQ 延遲隊列


左側隊列 queue1 分別兩條消息 msg1、msg2 過期時間都為 1s,輸出順序為 msg1、msg2 是沒問題的

右側隊列 queue2 分別兩條消息 msg1、msg2 注意問題來了,msg2 的消息過期時間為 1S 而 msg1 的消息過期為 2S,你可能想誰先過期就誰先消費唄,顯然不是這樣的,因為這是在同一個隊列,必須前一個消費,第二個才能消費,所以就出現了時序問題

如果你的消息過期時間是有規律的,例如,有的 1S、有的 2S,那麼我們可以以時間為維度設計為兩個隊列,如下所示:

Delayed Message 插件實現 RabbitMQ 延遲隊列


上面我們將 1S 過期的消息拆分為隊列 queue_1s,2S 過期的消息拆分為隊列 queue_2s,事情得到進一步解決。如果此時消息的過期時間不確定或者消息過期時間維度過多,在消費端我們就要去監聽多個消息隊列且對於消息過期時間不確定的也是很難去設計的。

針對消息無序的不妨看下以下解決方案。

Delayed Message 插件

這裡要感謝 @神奇的包子,掘金(juejin.im/user/5bfc1b9d6fb9a049b347a9e2) 提出的 Delayed Message 插件方案。

這裡將使用的是一個 RabbitMQ 延遲消息插件 rabbitmq-delayed-message-exchange,目前維護在 RabbitMQ 插件社區,我們可以聲明 x-delayed-message 類型的 Exchange,消息發送時指定消息頭 x-delay 以毫秒為單位將消息進行延遲投遞。

Delayed Message 插件實現 RabbitMQ 延遲隊列


實現原理

上面使用 DLX + TTL 的模式,消息首先會路由到一個正常的隊列,根據設置的 TTL 進入死信隊列,與之不同的是通過 x-delayed-message 聲明的交換機,它的消息在發佈之後不會立即進入隊列,先將消息保存至 Mnesia(一個分佈式數據庫管理系統,適合於電信和其它需要持續運行和具備軟實時特性的 Erlang 應用。目前資料介紹的不是很多)

這個插件將會嘗試確認消息是否過期,首先要確保消息的延遲範圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設置的範圍為 (2^32)-1 毫秒),如果消息過期通過 x-delayed-type 類型標記的交換機投遞至目標隊列,整個消息的投遞過程也就完成了。

插件安裝

根據你的 RabbitMQ 版本來安裝相應插件版本,RabbitMQ community-plugins 上面有版本對應信息可參考。

注意:需要 RabbitMQ 3.5.3 和更高版本。

<code># 注意要下載至你的 RabbitMQ 服務器的 plugins 目錄下,例如:/usr/local/rabbitmq/plugins

wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip


# 解壓
unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

# 解壓之後得到如下文件
rabbitmq_delayed_message_exchange-20171215-3.6.x.ez/<code>

啟用插件

使用 rabbitmq-plugins enable 命令啟用插件,啟動成功會看到如下提示:

<code>$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange
The following plugins have been enabled:
rabbitmq_delayed_message_exchange

Applying plugin configuration to rabbit@xxxxxxxx... started 1 plugin./<code>

管理控制檯聲明 x-delayed-message 交換機

在開始代碼之前先打開 RabbitMQ 的管理 UI 界面,聲明一個 x-delayed-message 類型的交換機,否則你會遇到下面的錯誤:

<code>Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange type"/<code>

這個問題困擾我了一會兒,詳情可見 Github Issues rabbitmq-delayed-message-exchange/issues/19,正確操作如下圖所示:

Delayed Message 插件實現 RabbitMQ 延遲隊列


Nodejs 代碼實踐

上面準備工作完成了,開始我們的代碼實踐吧,官方沒有提供 Nodejs 示例,只提供了 Java 示例,對於一個寫過 Spring Boot 項目的 Nodeer 這不是問題(此處,兄得你有點飄了啊 /:xx)其實如果有時間能多瞭解點些,你會發現還是有益的。

構建生產者

幾個注意點:

  • 交換機類型一定要設置為 x-delayed-message
  • 設置 x-delayed-type 為 direct,當然也可以是 topic 等
  • 發送消息時設置消息頭 headers 的 x-delay 屬性,即延遲時間,如果不設置消息將會立即投遞
<code>const amqp = require('amqplib');

async function producer(msg, expiration) {
try {
const connection = await amqp.connect('amqp://localhost:5672');
const exchange = 'my-delayed-exchange';
const exchangeType = 'x-delayed-message'; // x-delayed-message 交換機的類型
const routingKey = 'my-delayed-routingKey';

const ch = await connection.createChannel();
await ch.assertExchange(exchange, exchangeType, {
durable: true,
'x-delayed-type': 'direct'

});

console.log('producer msg:', msg);
await ch.publish(exchange, routingKey, Buffer.from(msg), {
headers: {
'x-delay': expiration, // 一定要設置,否則無效
}
});

ch.close();
} catch(err) {
console.log(err)
}
}

producer('msg0 1S Expire', 1000) // 1S
producer('msg1 30S Expire', 1000 * 30) // 30S
producer('msg2 10S Expire', 1000 * 10) // 10S
producer('msg3 5S Expire', 1000 * 5) // 5S/<code>

構建消費端

消費端改變不大,交換機聲明處同生產者保持一樣,設置交換機類型(x-delayed-message)和 x-delayed-type

<code>const amqp = require('amqplib');

async function consumer() {
const exchange = 'my-delayed-exchange';
const exchangeType = 'x-delayed-message';
const routingKey = 'my-delayed-routingKey';
const queueName = 'my-delayed-queue';

try {
const connection = await amqp.connect('amqp://localhost:5672');
const ch = await connection.createChannel();

await ch.assertExchange(exchange, exchangeType, {
durable: true,
'x-delayed-type': 'direct'
});
await ch.assertQueue(queueName);
await ch.bindQueue(queueName, exchange, routingKey);
await ch.consume(queueName, msg => {
console.log('consumer msg:', msg.content.toString());
}, { noAck: true });

} catch(err) {
console.log('Consumer Error: ', err);
}
}

consumer()/<code>

以上示例源碼地址:

<code>https://github.com/Q-Angelo/project-training/tree/master/rabbitmq/rabbitmq-delayed-message-node/<code>

最後,讓我們對以上程序做個測試,左側窗口展示了生產端信息,右側窗口展示了消費端信息,這次實現了同一個隊列裡不同過期時間的消息,可以按照我們預先設置的 TTL 時間順序性消費,我們的目的達到了。

Delayed Message 插件實現 RabbitMQ 延遲隊列


侷限性

Delayed Message 插件實現 RabbitMQ 延遲隊列這種方式也不完全是一個銀彈,它將延遲消息存在於 Mnesia 表中,並且在當前節點上具有單個磁盤副本,它們將在節點重啟之後倖存。

目前該插件的當前設計並不真正適合包含大量延遲消息(例如數十萬或數百萬)的場景,詳情參見 #/issues/72 另外該插件的一個可變性來源是依賴於 Erlang 計時器,在系統中使用了一定數量的長時間計時器之後,它們開始爭用調度程序資源,並且時間漂移不斷累積。

插件的禁用要慎重,以下方式可以實現將插件禁用,但是注意如果此時還有延遲消息未消費,那麼禁掉此插件後所有的未消費的延遲消息將丟失。

<code>rabbitmq-plugins disable rabbitmq_delayed_message_exchange/<code>

如果你採用了 Delayed Message 插件這種方式來實現,對於消息可用性要求較高的,在發現消息之前可以先落入 DB 打標記,消費之後將消息標記為已消費,中間可以加入定時任務做檢測,這可以進一步保證你的消息的可靠性。

總結

經過一番實踐測試、學習之後發現,DLX + TTLDelayed Message 插件這兩種 RabbitMQ 延遲消息解決方案都有一定的侷限性。

如果你的消息 TTL 是相同的,使用 DLX + TTL 的這種方式是沒問題的,對於我來說目前還是優選。

如果你的消息 TTL 過期值是可變的,可以嘗試下使用 Delayed Message 插件,對於某些應用而言它可能很好用,對於那些可能會達到高容量延遲消息應用而言,則不是很好。

關於 RabbitMQ 延遲隊列,如果你有更多其它實現,歡迎關注公眾號 “Nodejs技術棧” 在後臺取得我的聯繫方式進行討論,我很期待。


作者:五月君
鏈接:https://juejin.im/post/5e81e7956fb9a03c341d9176


分享到:


相關文章: