延遲任務調度系統(技術選型與設計)

延遲任務調度系統(技術選型與設計)

  • 延遲任務調度系統(技術選型與設計)延遲任務的場景是?
  • 現有的解決方案是?
  • 存在的問題是什麼?
  • 希望達到的目標是?
  • 可以實現的方案有?RabbitMQ實現通過死信和死信路由實現
  • 通過延遲消息插件來實現
  • Redis實現
  • DelayQueue實現
  • 時間輪實現單表時間輪
  • 分層時間輪
  • 之前的設計(DB/DelayQueue/ZooKeeper)
  • 另一種方案(DB/DelayQueue/ZooKeeper/MQ

延遲任務的場景是?

  • 習題考試截止前3天,給未提交用戶發送消息
  • 學習項目開課前2小時,給參與用戶發送通知
  • 問卷開始收集時,才對用戶可見
  • 問卷結束收集時,觸發一些操作
  • 指定時間發佈課件
  • 課程結束時,開始計算用戶結業信息
  • 直播時間到了,給用戶發送消息
  • 用戶下單後,30分鐘內未付款,關閉訂單
  • 用戶付款後,24小時內未發貨,提示發貨
  • 用戶打車後,48小時後自動評價為5星
  • 這類業務的特點是:延遲執行。一種比較簡單的方法是使用後臺線程掃描符合條件的業務數據,逐一處理。 這種方法掃描間隔時間不好設置,間隔時間過大影響精確度,過小則影響效率和性能。

現有的解決方案是?

  • 通過linux的crontab觸發定時任務
  • 掃描業務表,篩選出符合條件的數據對其進行操作

存在的問題是什麼?

  • 由於每種類型的任務都設有掃描間隔,任務不能精確處理
  • 掃描業務庫,影響業務正常操作
  • 任務的執行過於密集,容易導致服務器間隔性壓力
  • 存在系統單點,觸發定時調度的服務掛了,所有任務都不會執行
  • 系統不具容錯能力,一旦錯過了,任務就不會再被執行
  • 沒有統一的視圖來查看任務的執行情況
  • 沒有告警來提示失敗的任務

希望達到的目標是?

  • 精確性(可在指定時間觸發任務處理)
  • 通用性
  • 高性能(集群能力不少於1000TPS)
  • 高可用(支持多實例部署)
  • 可伸縮(增加和減少服務時,任務會重新分配)
  • 可重試(任務失敗可重試)
  • 多協議(支持http\dubbo調用)
  • 可管理(業務使用方可修改、刪除任務)
  • 能告警(失敗次數達到閾值可觸發告警)
  • 統一視圖(方便查看任務執行情況,可手動干預任務執行)

下面所討論技術方案的前提是精確觸發,所以我們不討論目前業界的一些分佈式調度系統如:elastic-job,xxl-job,tbschedule等, 這些系統解決不了延遲任務精確觸發問題。

可以實現的方案有?

RabbitMQ實現

通過死信和死信路由實現

原理如下:

延遲任務調度系統(技術選型與設計)

何為死信:

  • 消息被拒絕
  • 消息已過期
  • 隊列達到最大長度

RabbitMQ可以對隊列和消息設置x-message-tt、expiration來控制消息的存活時間,如果超時,消息變為死信。

何為死信路由:

RabbitMQ可以對隊列設置x-dead-letter-exchange和x-dead-letter-routing-key兩個參數。

當消息在一個隊列中變成死信後會按這兩個參數路由,消息就可以重新被消費。

實例操作:

  1. 創建延遲隊列(設置死信路由)
  2. 創建就緒隊列
  3. 創建死信路由
  4. 綁定死信路由與就緒隊列
  5. 發送延遲消息
  6. 消息過期後進入就緒隊列

優點:

  • 高效,可以利用RabbitMQ的分佈式特性輕易進行橫向擴展,且支持持久化

缺點:

  • 不支持對已發送的消息進行管理
  • 一個消息比在同一隊列中的其他消息提前過期,提前過期的消息也不會優先進入死信隊列。

所以需要確保業務上每個任務的延遲時間是一致的。如果有不同延時的任務,需要為每種不同延遲的任務單獨創建消息隊列,缺乏靈活性。

通過延遲消息插件來實現

原理如下:

延遲任務調度系統(技術選型與設計)

核心代碼流程:

延遲任務調度系統(技術選型與設計)

其原理是延遲消息會被保存到Mnesia表,在Exchange中根據每個message頭設置的延遲時間x-delay,消息過期後才路由到對應隊列。

實例操作:

  1. 下載插件
  2. https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
  3. 安裝插件
 docker-compose.xml(將插件安裝到容器中)
version: '2'
services:
rabbitmq:
hostname: rabbitmq
image: rabbitmq:3.6.8-management
mem_limit: 200m
ports:
- "5672:5672"
- "15672:15672"
volumes:
- ~/dockermapping/rabbitmq/lib:/var/lib/rabbitmq/
- /Users/oldlu/workspace/document/docker-compose/rabbitmq/rabbitmq_delayed_message_exchange-0.0.1.ez:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.8/plugins/rabbitmq_delayed_message_exchange-0.0.1.ez
啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. 創建類型為x-delayed-message的路由
  2. 創建就緒隊列
  3. 綁定隊列和路由
  4. 發佈延遲消息(設置x-delay=延遲的毫秒數)

源碼分析

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/blob/master/src/rabbit_delayed_message.erl

核心函數 

消息入隊:internal_delay_message
啟動Timer:maybe_delay_first
消息處理:handle_info

優點:

  • 一個消息比其他消息提前過期,提前過期的消息會被提前路由到隊列,不需要為不同延遲的消息創建單獨的消息隊列。

缺點:

  • 不支持對已發送的消息進行管理
  • 集群中只有一個數據副本(保存在當前節點下的Mnesia表中),如果節點不可用或關閉插件會丟失消息。
  • 目前該插件只支持disk節點,不支持RAM節點
  • 性能比原生差一點(普通的Exchange收到消息後直接路由到隊列,而延遲隊列需要判斷消息是否過期,未過期的需要保存在表中,時間到了再撈出來路由)

Redis實現

有序集合(Sorted Set)是Redis提供的一種數據結構,具有set和hash的特點。

其中每個元素都關聯一個score,並以這個score來排序。

其內部實現用到了兩個數據結構:hash table和 skip list(跳躍表)

延遲任務調度系統(技術選型與設計)

skip list的特點

  • 由很多層結構組成,level是通過一定的概率隨機產生的
  • 每一層都是一個有序的鏈表,默認是升序
  • 最底層的鏈表包含所有元素
  • 如果一個元素出現在Level i的鏈表中,則它在Level i之下的鏈表也都會出現
  • 每個節點包含兩個指針,一個指向同一鏈表中的下一個元素,一個指向下面一層的元素
  • 插入和刪除的時間複雜度是O(logn),當達到了一定的數據規模之後,它的效率與紅黑樹差不多

主要命令

  • zadd:向Sorted Set中添加元素
  • zrem:刪除Sorted Set中的指定元素
  • zrange:按照從小到大的順序返回指定區間內的元素

實現延遲隊列

  1. 將延遲任務加到Sorted Set,將延遲時間設為score
  2. 啟動一個線程不斷判斷Sorted Set中第一個元素的score是否大於當前時間
  3. 如果大於,從Sorted Set中移除任務並添加到執行隊列中
  4. 如果小於,進行短暫休眠後重試

實例操作

root@redis:/usr/local/bin# redis-cli
127.0.0.1:6379> zadd delayqueue 1 task1
(integer) 1
127.0.0.1:6379> zadd delayqueue 2 task2
(integer) 1
127.0.0.1:6379> zadd delayqueue 4 task4
(integer) 1
127.0.0.1:6379> zadd delayqueue 3 task3
(integer) 1
127.0.0.1:6379>
127.0.0.1:6379> zrange delayqueue 0 0 withscores
1) "task1"

優點:

  • 實現簡單
  • 任務可管理(可刪除、修改任務)

缺點:

  • 需要有短輪詢線程不斷判斷第一個元素是否過期,造成CPU空耗
  • 分佈式場景中,容易引起多個節點讀取到相同任務

DelayQueue實現

DelayQueue是一個使用優先隊列實現的BlockingQueue,優先隊列比較的是時間,內部存儲的是實現Delayed接口的對象。 只有在對象過期後才能從隊列中獲取對象。

內部結構

  • 可重入鎖
  • 用於根據delay時間排序的優先級隊列
  • 用於優化阻塞通知的線程leader
  • 用於實現阻塞和通知的Condition對象

Leader/Followers

Leader/Followers是多個工作線程輪流進行事件監聽、分發、處理的一種模式。 該模式最大的優點在於,它是自己監聽事件並處理客戶請求,從接收到處理都是在同一線程中完成, 所以不需要在線程之間傳遞數據,解決線程頻繁切換帶來的開銷。

延遲任務調度系統(技術選型與設計)

該模式工作的任何時間點,只有一個線程成為Leader ,負責事件監聽,而其他線程都是Follower,在休眠中等待成為Leader。 該模式的工作線程存在三種狀態,工作線程同一時間只能處於一種狀態,這三種狀態為:

  • Leading:線程處於領導者狀態,負責事件監聽。Leader監聽到事件後,有兩種處理方式:
  • 可以轉移至Processing狀態,自己處理該事件,並調用方法推選新領導者。
  • 也可以指定其他Follower來處理事件,此時Leader狀態不變。
  • Processing:線程正在處理事件,處理完事件如果當前線程集中沒有領導者,它將成為新領導者,否則轉為追隨者。
  • Following:線程處於追隨者狀態,等待成為新的領導者也可能被領導者指定來處理新的事件。

核心源碼分析:

  • 入隊
public boolean offer(E e) { 

final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {//入隊對象延遲時間是隊列中最短的
leader = null;//重置leader
available.signal();//喚醒一個線程去監聽新加入的對象
}
return true;
} finally {
lock.unlock();
}
}
  • 出隊
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();//隊列為空,無限等待
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)//延遲時間已過,直接返回
return q.poll();
else if (leader != null)//已有leader在監聽了,無限等待
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;//當前線程成為leader
try {
available.awaitNanos(delay);//在delay納秒後喚醒
} finally {
if (leader == thisThread)// 入隊一個最小延遲時間的對象時leader會被清空
leader = null;

}
}
}
}
} finally {
if (leader == null && q.peek() != null)//leader不存在且隊列不為空,喚醒一個follower去成為leader去監聽
available.signal();
lock.unlock();
}
}

優點:

  • 效率高,任務觸發時間延遲低

缺點:

  • 數據是保存在內存,需要自己實現持久化
  • 不具備分佈式能力,需要自己實現高可用

時間輪實現

時間輪是一種環形的數據結構,分成多個格。

每個格代表一段時間,時間越短,精度越高。

每個格上用一個鏈表保存在該格的過期任務。

指針隨著時間一格一格轉動,並執行相應格子中的到期任務。

名詞解釋:

  • 時間格:環形結構中用於存放延遲任務的區塊
  • 指針:指向當前操作的時間格,代表當前時間
  • 格數:時間輪中時間格的個數
  • 間隔:每個時間格之間的間隔,代表時間輪能達到的精度
  • 總間隔:當前時間輪總間隔,等於格數*間隔,代表時間輪能表達的時間範圍

單表時間輪

延遲任務調度系統(技術選型與設計)

以上圖為例,假設一個格子是1秒,則整個時間輪能表示的時間段為8s, 如果當前指針指向2,此時需要調度一個3s後執行的任務,需要放到第5個格子(2+3)中,指針再轉3次就可以執行了。

單表時間輪存在的問題是:

格子的數量有限,所能代表的時間有限,當要存放一個10s後到期的任務怎麼辦?這會引起時間輪溢出。

有個辦法是把輪次信息也保存到時間格鏈表的任務上。

延遲任務調度系統(技術選型與設計)

如果任務要在10s後執行,算出輪次10/8 round等1,格子10%8等於2,所以放入第二格。

檢查過期任務時應當只執行round為0的任務,鏈表中其他任務的round減1。

帶輪次單表時間輪存在的問題是:

如果任務的時間跨度很大,數量很大,單層時間輪會造成任務的round很大,單個格子的鏈表很長,每次檢查的量很大,會做很多無效的檢查。怎麼辦?

分層時間輪

延遲任務調度系統(技術選型與設計)

過期任務一定是在底層輪中被執行的,其他時間輪中的任務在接近過期時會不斷的降級進入低一層的時間輪中。

分層時間輪中每個輪都有自己的格數和間隔設置,當最低層的時間輪轉一輪時,高一層的時間輪就轉一個格子。

分層時間輪大大增加了可表示的時間範圍,同時減少了空間佔用。

舉個例子:

上圖的分層時間輪可表達8 8 8=512s的時間範圍,如果用單表時間輪可能需要512個格子, 而分層時間輪只要8+8+8=24個格子,如果要設計一個時間範圍是1天的分層時間輪,三個輪的格子分別用24、60、60即可。

工作原理:

時間輪指針轉動有兩種方式:

  • 根據自己的間隔轉動(秒鐘輪1秒轉1格;分鐘輪1分鐘轉1格;時鐘輪1小時轉1格)
  • 通過下層時間輪推動(秒鐘輪轉1圈,分鐘輪轉1格;分鐘輪轉1圈,時鐘輪轉1格)

指針轉到特定格子時有兩種處理方式:

  • 如果是底層輪,指針指向格子中鏈表上的元素均表示過期
  • 如果是其他輪,將格子上的任務移動到精度細一級的時間輪上,比如時鐘輪的任務移動到分鐘輪上

舉個例子:

  • 添加1個5s後執行的任務
  1. 算出任務應該放在秒鐘輪的第5個格子
  2. 在秒鐘輪指針進行5次轉動後任務會被執行
  • 添加一個50s後執行的任務
  1. 算出該任務的延遲時間已經溢出秒鐘輪
  2. 50/8=6,所以該任務會被保存在分鐘輪的第6個格子
  3. 在秒鐘輪走了6圈(6*8s=48s)之後,分鐘輪的指針指向第6個格子
  4. 此時該格子中的任務會被降級到秒鐘輪,並根據50%8=2,任務會被移動到秒鐘輪的第2個格子
  5. 在秒鐘輪指針又進行2次轉動後(50s)任務會被執行
  • 添加一個250s後執行的任務
  1. 算出該任務的延遲時間已經溢出分鐘輪
  2. 250/8/8=3,所以該任務會被保存在時鐘輪的第3個格子
  3. 在分鐘輪走了3圈(3*64s=192s)之後,時鐘輪的指針指向第3個格子
  4. 此時該格子中的任務會被降級到分鐘輪,並根據(250-192)/8=7,任務會被移動到分鐘輪的第7個格子
  5. 在秒鐘輪走了7圈(7*8s=56s)之後,分鐘輪的指針指向第7個格子
  6. 此時該格子中的任務會被降級到秒鐘輪,並根據(250-192-56)=2,任務會被移動到秒鐘輪的第2個格子
  7. 在秒鐘輪指針又進行2次轉動後任務會被執行

優點:

  • 高性能(插入任務、刪除任務的時間複雜度均為O(1),DelayQueue由於涉及到排序,插入和移除的複雜度是O(logn))

缺點:

  • 數據是保存在內存,需要自己實現持久化
  • 不具備分佈式能力,需要自己實現高可用
  • 延遲任務過期時間受時間輪總間隔限制

對於超出範圍的任務可放在一個緩衝區中(可用隊列、redis或數據庫實現),等最高時間輪轉到下一格子就從緩衝中取出符合範圍的任務落到時間輪中。

比如:

  • 添加一個600s後執行的任務A
  1. 算出該任務的延遲時間已經溢出時間輪
  2. 所以任務被保存到緩衝隊列中
  3. 在時鐘輪走了1格之後,會從緩衝隊列中取滿足範圍的任務落到時間輪中
  4. 緩衝隊列中的所有任務延遲時間均需減去64s,任務A減去64s後是536s,依然大於時間輪範圍,所以不會被移出隊列
  5. 在時鐘輪又走了1格之後,任務A減去64s是536-64=472s,在時間輪範圍內,會被落入時鐘輪

之前的設計(DB/DelayQueue/ZooKeeper)

調度系統提供任務操作接口供業務系統提交任務、取消任務、反饋執行結果等。

針對dubbo調用,將任務抽象成JobCallbackService接口,由業務系統實現並註冊成服務。

整體架構

延遲任務調度系統(技術選型與設計)

數據庫:

  • 負責保存所有的任務數據

內存隊列:

  • 實際為DelayQueue,延遲任務精確觸發的機制由它保證
  • 只存儲未來N分鐘內過期且最多1000個任務

ZooKeeper:

  • 管理整個調度集群
  • 存儲調度節點信息
  • 存儲節點分片信息

主節點:

  • 有新的節點上下線時對數據重新分片

調度節點:

  • 提供dubbo、http接口供業務系統調用,用於提交任務、取消任務、反饋執行結果等
  • 從ZK註冊中心獲取當前節點的分片信息,再從數據庫拉取即將過期的數據放到DelayQueue
  • 調用業務系統註冊的回調服務接口,發起調度請求
  • 接收業務系統的反饋結果,更新執行結果,移除任務或發起重試

業務系統:

  • 作為被調度的服務需要實現回調接口JobCallbackService,並註冊為dubbo服務提供者
  • 在需要延遲任務的場景調用調度系統接口操作任務

數據庫設計

延遲任務調度系統(技術選型與設計)

表說明

  • job_callback_service:服務配置表,配置業務回調服務,包括服務協議、回調服務、重試次數
  • job_delay_task:延遲任務表,用於存儲延遲任務,包括任務分片號、回調服務、調用總次數、失敗數、任務狀態、回調參數等
  • job_delay_task_execlog:延遲任務執行表,記錄調度系統發起的每一次回調
  • job_delay_task_backlog:延遲任務調度結果表,記錄任務最終狀態等信息

主從切換

利用ZooKeeper臨時序列節點特性,序號最小的節點為主節點,其他節點為從節點。

主節點監聽集群狀態,集群狀態發生變化時重新分片。

從節點監聽序號比它小的兄弟節點,兄弟節點發生變化重新尋找和建立監聽關係。

延遲任務調度系統(技術選型與設計)

數據分片

延遲任務調度系統(技術選型與設計)

任務狀態

延遲任務調度系統(技術選型與設計)

  • delay:延遲任務提交後的初始狀態
  • ready:過期時間已到,消息推入就緒隊列的狀態
  • running:業務訂閱消息,收到消息開始處理的狀態
  • finished:業務處理成功
  • failed:業務處理失敗

主要流程

延遲任務調度系統(技術選型與設計)

服務加載

  1. 從DB讀取服務配置
  2. 根據配置動態構造Consumer對象並添加到Spring容器中

提交任務

  1. 業務系統通過dubbo或http接口提交任務
  2. 判斷任務過期時間是否在一個掃描週期內
  3. 如果是,
  4. 設置分片號(從當前節點所負責的分片隨機獲取)
  5. 添加到內存隊列
  6. 任務保存到job_delay_task表
  7. 如果否,
  8. 設置分片號(根據分片總數和隨機算法算出分片號)
  9. 任務保存到delay_task表

定時器

  1. 由一個線程管理
  2. 根據配置的掃描間隔設置定時器的執行週期
  3. 根據當前時間和掃描間隔算出該時段的過期時間X-Delay
  4. 從DB獲取過期時間在X-Delay之前的所有任務,並放到DelayQueue

調度任務

  1. 由一個線程池管理
  2. 所有線程都阻塞在DelayQueue的方法take
  3. take到任務,從DB中獲取任務,判斷是否存在
  4. 如果不在,什麼也不做(任務已執行成功或已被刪除)
  5. 如果存在,判斷調用次數是否超過設置
  6. 如果不超
  7. 調用業務回調服務
  8. 從任務中取出調用的服務配置
  9. 從容器中獲取對應的Consumer對象
  10. 異步調用業務回調服務
  11. 設置下次重試時間,記錄調用日誌job_delay_task_execlog
  12. 如果超過,將任務轉移到job_delay_task_backlog

任務反饋

  1. 更新任務調用結果

優點

  • 功能全面,高可用、易伸縮、可重試

缺點

  • 略微複雜
  • 需要將服務配置動態生成為Consumer對象
  • 增加新的服務需要通知所有調度節點刷新
  • 存在一定的耦合性(直接調用業務服務,協議耦合),如果接入系統是thrift協議呢?
  • 需要處理任務的重試
  • 調度系統直接回調業務服務,如果業務服務不可用可能會造成盲目重試,不能很好的控制流量(調度系統不知道業務服務的處理能力)

如果引入MQ,使用MQ來解耦服務調用的協議,保證任務的重試,並由消費方根據自己的處理能力控制流量會不會更好呢?

另一種方案(DB/DelayQueue/ZooKeeper/MQ)

整體架構

延遲任務調度系統(技術選型與設計)

數據庫設計

延遲任務調度系統(技術選型與設計)

主要流程

延遲任務調度系統(技術選型與設計)

調度任務

  1. 由一個線程池管理
  2. 所有線程都阻塞在DelayQueue的take方法
  3. take到任務,從DB中獲取任務,判斷是否存在
  4. 如果不在,什麼也不做(任務已執行成功或已被刪除)
  5. 如果存在,將任務轉移到job_delay_task_execlog;往消息隊列投遞消息

缺點

需要業務系統依賴於MQ


分享到:


相關文章: