你真的知道怎麼實現一個延遲隊列嗎?


編者按 在日常的開發中,大家一定經常碰到延遲隊列的需求。事實上,實現方案不止一種,什麼時候該用哪種方式呢?本文作者分享了3種實現方案以及它們的優缺點,希望能幫到大家。


文 | Xie

騰訊互動娛樂 移動遊戲運營開發


你真的知道怎麼實現一個延遲隊列嗎?


延遲隊列是我們日常開發過程中,經常接觸並需要使用到的一種技術方案。前些時間在開發業務需求時,我也遇到了一個需要使用到延遲消息隊列的需求場景,因此我也在網上調研了一系列不同的延遲隊列的實現方案,在此進行了一個總結並且給大家進行分享。


本文大綱:


你真的知道怎麼實現一個延遲隊列嗎?


你真的知道怎麼實現一個延遲隊列嗎?


首先,隊列這種數據結構相信大家都不陌生,它是一種先進先出的數據結構。普通隊列中的元素是有序的,先進入隊列中的元素會被優先取出進行消費。


延時隊列相比於普通隊列最大的區別就體現在其延時的屬性上,普通隊列的元素是先進先出,按入隊順序進行處理,而延時隊列中的元素在入隊時會指定一個延遲時間,表示其希望能夠在經過該指定時間後處理。從某種意義上來講,延遲隊列的結構並不像一個隊列,而更像是一種以時間為權重的有序堆結構。


你真的知道怎麼實現一個延遲隊列嗎?


我在開發業務需求時遇到的使用場景是這樣的,用戶可以在小程序中訂閱不同的微信或者QQ的模板消息,產品同學可以在小程序的管理端新建消息推送計劃,當到達指定的時間節點的時候給所有訂閱模板消息的用戶進行消息推送。


如果僅僅是服務單一的小程序,那也許起個定時任務,或者甚至人工的定時去執行能夠最便捷最快速的去完成這項需求,但我們希望能夠抽象出一個消息訂閱的模塊服務出來給所有業務使用,這時候就需要一種通用的系統的解決方案,這時候便需要使用到延遲隊列了。


除了上述我所遇到的這樣的典型的需求以外,延遲隊列的應用場景其實也非常的廣泛,比如說以下的場景:


1. 新建的訂單,如果用戶在15分鐘內未支付,則自動取消。


2. 公司的會議預定系統,在會議預定成功後,會在會議開始前半小時通知所有預定該會議的用戶。


3. 安全工單超過24小時未處理,則自動拉企業微信群提醒相關責任人。


4. 用戶下單外賣以後,距離超時時間還有10分鐘時提醒外賣小哥即將超時。


對於數據量比較少並且時效性要求不那麼高的場景,一種比較簡單的方式是輪詢數據庫,比如每秒輪詢一下數據庫中所有數據,處理所有到期的數據,比如如果我是公司內部的會議預定系統的開發者,我可能就會採用這種方案,因為整個系統的數據量必然不會很大並且會議開始前提前30分鐘提醒與提前29分鐘提醒的差別並不大。


但是如果需要處理的數據量比較大實時性要求比較高,比如淘寶每天的所有新建訂單15分鐘內未支付的自動超時,數量級高達百萬甚至千萬,這時候如果你還敢輪詢數據庫怕是要被你老闆打死,不被老闆打死估計也要被運維同學打死。


這種場景下,就需要使用到我們今天的主角 —— 延遲隊列了。延遲隊列為我們提供了一種高效的處理大量需要延遲消費消息的解決方案。那麼話不多說,下面我們就來看一下幾種常見的延遲隊列的解決方案以及它們各自的優缺點。


你真的知道怎麼實現一個延遲隊列嗎?


Redis ZSet


我們知道Redis有一個有序集合的數據結構ZSet,ZSet中每個元素都有一個對應Score,ZSet中所有元素是按照其Score進行排序的。


那麼我們可以通過以下這幾個操作使用Redis的ZSet來實現一個延遲隊列:


1. 入隊操作:ZADD KEY timestamp task, 我們將需要處理的任務,按其需要延遲處理時間作為Score加入到ZSet中。Redis的ZAdd的時間複雜度是O(logN),N是ZSet中元素個數,因此我們能相對比較高效的進行入隊操作。


2. 起一個進程定時(比如每隔一秒)通過ZREANGEBYSCORE方法查詢ZSet中Score最小的元素,具體操作為:ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES。查詢結果有兩種情況:


a. 查詢出的分數小於等於當前時間戳,說明到這個任務需要執行的時間了,則去異步處理該任務;


b. 查詢出的分數大於當前時間戳,由於剛剛的查詢操作取出來的是分數最小的元素,所以說明ZSet中所有的任務都還沒有到需要執行的時間,則休眠一秒後繼續查詢;


同樣的,ZRANGEBYSCORE操作的時間複雜度為O(logN + M),其中N為ZSet中元素個數,M為查詢的元素個數,因此我們定時查詢操作也是比較高效的。


這裡從網上搬運了一套Redis實現延遲隊列的後端架構,其在原來Redis的ZSet實現上進行了一系列的優化,使得整個系統更穩定、更健壯,能夠應對高併發場景,並且具有更好的可擴展性,是一個挺不錯的架構設計,其整體架構圖如下:


你真的知道怎麼實現一個延遲隊列嗎?


其核心設計思路:


1. 將延遲的消息任務通過hash算法路由至不同的Redis Key上,這樣做有兩大好處:


a. 避免了當一個KEY在存儲了較多的延時消息後,入隊操作以及查詢操作速度變慢的問題(兩個操作的時間複雜度均為O(logN))。


b. 系統具有了更好的橫向可擴展性,當數據量激增時,我們可以通過增加Redis Key的數量來快速的擴展整個系統,來抗住數據量的增長。


2. 每個Redis Key都對應建立一個處理進程,稱為Event進程,通過上述步驟2中所述的ZRANGEBYSCORE方法輪詢Key,查詢是否有待處理的延遲消息。


3. 所有的Event進程只負責分發消息,具體的業務邏輯通過一個額外的消息隊列異步處理,這麼做的好處也是顯而易見的:


a. 一方面,Event進程只負責分發消息,那麼其處理消息的速度就會非常快,就不太會出現因為業務邏輯複雜而導致消息堆積的情況。


b. 另一方面,採用一個額外的消息隊列後,消息處理的可擴展性也會更好,我們可以通過增加消費者進程數量來擴展整個系統的消息處理能力。


4. Event進程採用Zookeeper選主單進程部署的方式,避免Event進程宕機後,Redis Key中消息堆積的情況。一旦Zookeeper的leader主機宕機,Zookeeper會自動選擇新的leader主機來處理Redis Key中的消息。


從上述的討論中我們可以看到,通過Redis Zset實現延遲隊列是一種理解起來較為直觀,可以快速落地的方案。並且我們可以依賴Redis自身的持久化來實現持久化,使用Redis集群來支持高併發和高可用,是一種不錯的延遲隊列的實現方案。


RabbitMQ


RabbitMQ本身並不直接提供對延遲隊列的支持,我們依靠RabbitMQ的TTL以及死信隊列功能,來實現延遲隊列的效果。那就讓我們首先來了解一下,RabbitMQ的死信隊列以及TTL功能。


死信隊列


死信隊列實際上是一種RabbitMQ的消息處理機制,當RabbmitMQ在生產和消費消息的時候,消息遇到如下的情況,就會變成“死信”:


1. 消息被拒絕basic.reject/ basic.nack 並且不再重新投遞 requeue=false


2. 消息超時未消費,也就是TTL過期了


3. 消息隊列到達最大長度


消息一旦變成一條死信,便會被重新投遞到死信交換機(Dead-Letter-Exchange),然後死信交換機根據綁定規則轉發到對應的死信隊列上,監聽該隊列就可以讓消息被重新消費。


消息生存時間TTL


TTL(Time-To-Live)是RabbitMQ的一種高級特性,表示了一條消息的最大生存時間,單位為毫秒。如果一條消息在TTL設置的時間內沒有被消費,那麼它就會變成一條死信,進入我們上面所說的死信隊列。


有兩種不同的方式可以設置消息的TTL屬性,一種方式是直接在創建隊列的時候設置整個隊列的TTL過期時間,所有進入隊列的消息,都被設置成了統一的過期時間,一旦消息過期,馬上就會被丟棄,進入死信隊列,參考代碼如下:


Map args = new HashMap();

args.put("x-message-ttl", 6000);

channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

在延遲隊列的延遲時間為固定值的時候,比較適合使用這種方式。


另一種方式是針對單條消息設置,參考代碼如下,該消息被設置了6秒的過期時間:


AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

builder.expiration("6000");

AMQP.BasicProperties properties = builder.build();

channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg content".getBytes());

如果需要不同的消息設置不同的延遲時間,上面針對隊列的TTL設置便無法滿足我們的需求,需要使用這種針對單個消息的TTL設置。


不過需要注意的是,使用這種方式設置的TTL,消息可能不會按時死亡,因為RabbitMQ只會檢查第一個消息是否過期。比如這種情況,第一個消息設置了20s的TTL,第二個消息設置了10s的TTL,那麼RabbitMQ會等到第一個消息過期之後,才會讓第二個消息過期。


解決這個問題的方法也很簡單,只需要安裝RabbitMQ的一個插件即可:https://www.rabbitmq.com/community-plugins.html 。安裝好這個插件後,所有的消息就都能按照被設置的TTL過期了。


RabbitMQ實現延遲隊列


好了,介紹完RabbitMQ的死信隊列以及TTL這兩種特性之後,我們離實現延遲隊列就只差一步之遙了。


聰明的讀者可能已經發現了,TTL不就是延遲隊列中消息要延遲的時間麼?如果我們把需要延遲的消息,將TTL設置為其延遲時間,投遞到RabbitMQ的普通隊列中,一直不去消費它,那麼經過TTL的時間後,消息就會自動被投遞到死信隊列,這時候我們使用消費者進程實時地去消費死信隊列中的消息,不就實現了延遲隊列的效果。


從下圖可以直觀的看出使用RabbitMQ實現延遲隊列的整體流程:


你真的知道怎麼實現一個延遲隊列嗎?


使用RabbitMQ來實現延遲隊列,我們可以很好的利用一些RabbitMQ的特性,比如消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,通過RabbitMQ集群的特性,可以很好的解決單點故障問題,不會因為單個節點掛掉導致延遲隊列不可用或者消息丟失。


TimeWheel


TimeWheel時間輪算法,是一種實現延遲隊列的巧妙且高效的算法,被應用在Netty,Zookeeper,Kafka等各種框架中。


時間輪


你真的知道怎麼實現一個延遲隊列嗎?


如上圖所示,時間輪是一個存儲延遲消息的環形隊列,其底層採用數組實現,可以高效循環遍歷。這個環形隊列中的每個元素對應一個延遲任務列表,這個列表是一個雙向環形鏈表,鏈表中每一項都代表一個需要執行的延遲任務。


時間輪會有錶盤指針,表示時間輪當前所指時間,隨著時間推移,該指針會不斷前進,並處理對應位置上的延遲任務列表。


添加延遲任務


由於時間輪的大小固定,並且時間輪中每個元素都是一個雙向環形鏈表,我們可以在O(1) 的時間複雜度下向時間輪中添加延遲任務。


如下圖,例如我們有一個這樣的時間輪,在錶盤指針指向當前時間為2時,我們需要新添加一個延遲3秒的任務,我們可以快速計算出延遲任務在時間輪中所對應的位置為5,並添加到位置5上任務列表尾部。


你真的知道怎麼實現一個延遲隊列嗎?


多層時間輪


到現在為止一切都非常棒,但是細心的同學可能發現了,上面的時間輪的大小是固定的,只有12秒。如果此時我們有一個需要延遲200秒的任務,我們應該怎麼處理呢?直接擴充整個時間輪的大小嗎?這顯然不可取,因為這樣做的話我們就需要維護一個非常非常大的時間輪,內存是不可接受的,而且底層數組大了之後尋址效率也會降低,影響性能。


為此,Kafka引入了多層時間輪的概念。其實多層時間輪的概念和我們的機械錶上時針、分針、秒針的概念非常類似,當僅使用秒針無法表示當前時間時,就使用分針結合秒針一起表示。同樣的,當任務的到期時間超過了當前時間輪所表示的時間範圍時,就會嘗試添加到上層時間輪中,如下圖所示:


你真的知道怎麼實現一個延遲隊列嗎?


第一層時間輪整個時間輪所表示時間範圍是0-12秒,第二層時間輪每格能表示的時間範圍是整個第一層時間輪所表示的範圍也就是12秒,所以整個第二層時間輪能表示的時間範圍即12*12=144秒,依次類推第三層時間輪能表示的範圍是1728秒,第四層為20736秒等等。


比如現在我們需要添加一個延時為200秒的延遲消息,我們發現其已經超過了第一層時間輪能表示的時間範圍,我們就需要繼續往上層時間輪看,將其添加在第二層時間輪 200/12 = 17的位置,然後我們發現17也超過了第二次時間輪的表示範圍,那麼我們就需要繼續往上層看,將其添加在第三層時間輪的 17/12 = 2 的位置。


Kafka中時間輪算法添加延遲任務以及推動時間輪滾動的核心流程如下,其中Bucket即時間輪中的延遲任務隊列,並且Kafka引入的DelayQueue解決了多數Bucket為空導致的時間輪滾動效率低下的問題:


你真的知道怎麼實現一個延遲隊列嗎?


使用時間輪實現的延遲隊列,能夠支持大量任務的高效觸發。並且在Kafka的時間輪算法的實現方案中,還引入了DelayQueue,使用DelayQueue來推送時間輪滾動,而延遲任務的添加與刪除操作都放在時間輪中,這樣的設計大幅提升了整個延遲隊列的執行效率。


你真的知道怎麼實現一個延遲隊列嗎?


延遲隊列在我們日常開發中應用非常廣泛,本文介紹了三種不同的實現延遲隊列的方案,三種方案各自有各自的特點,例如Redis的實現方案理解起來最為簡單,能夠快速落地,但Redis畢竟是基於內存的,雖然有數據持久化方案,但還是有數據丟失的可能性。而RabbitMQ的實現方案,由於RabbitMQ本身的消息可靠發送、消息可靠投遞、死信隊列等特性,可以保障消息至少被消費一次以及未被正確處理的消息不會被丟棄,讓消息的可靠性有了保障。最後Kafka的時間輪算法,個人覺得是三種實現方案中最難理解但也不失為一種非常巧妙實現方案。最後,希望以上這些內容,能幫助大家在實現自己的延遲隊列時提供一點思路。


分享到:


相關文章: