心跳與超時:高併發高性能的時間輪超時器

引言

在許多業務場景中,我們都會碰到延遲任務,定時任務這種需求。特別的,在網絡連接的場景中,常常會出現一些超時控制。由於服務端的連接數量很大,這些超時任務的數量往往也是很龐大的。實現對大量任務的超時管理並不是一個容易的事情。

本章我們將介紹幾種用於實現超時任務的數據結構,並且最後分析 Netty 在超時任務上採取的結構和代碼。

JDK 原生提供的超時任務支持

java.util.Timer

JDK 在 1.3 的時候引入了Timer數據結構用於實現定時任務。Timer的實現思路比較簡單,其內部有兩個主要屬性:

  • TaskQueue:定時任務抽象類TimeTask的列表。
  • TimerThread:用於執行定時任務的線程。

Timer結構還定義了一個抽象類TimerTask並且繼承了Runnable接口。業務系統實現了這個抽象類的run方法用於提供具體的延時任務邏輯。

TaskQueue內部採用大頂堆的方式,依據任務的觸發時間進行排序。而TimerThread則以死循環的方式從TaskQueue獲取隊列頭,等待隊列頭的任務的超時時間到達後觸發該任務,並且將任務從隊列中移除。

Timer的數據結構和算法都很容易理解。所有的超時任務都首先進入延時隊列。後臺超時線程不斷的從延遲隊列中獲取任務並且等待超時時間到達後執行任務。延遲隊列採用大頂堆排序,在延遲任務的場景中有三種操作,分別是:添加任務,提取隊列頭任務,查看隊列頭任務。

查看隊列頭任務的事件複雜度是 O(1) 。而添加任務和提取隊列頭任務的時間複雜度都是 O(Log2n) 。當任務數量較大時,添加和刪除的開銷也是比較大的。此外,由於Timer內部只有一個處理線程,如果有一個延遲任務的處理消耗了較多的時間,會對應的延遲後續任務的處理。

ScheduledThreadPoolExecutor

由於Timer只有一個線程用來處理延遲任務,在任務數量很多的時候顯然是不足夠的。在 JDK1.5 引入線程池接口ExecutorService後,也對應的提供了一個用於處理延時任務的ScheduledExecutorService子類接口。該接口內部也一樣使用了一個使用小頂堆進行排序的延遲隊列存放任務。線程池中的線程會在這個隊列上等待直到有任務可以提取。

ScheduledExecutorService的實現上有一些特殊,只有一個線程能夠提取到延遲隊列頭的任務,並且根據任務的超時時間進行等待。在這個等待期間,其他的線程是無法獲取任務的。這樣的實現是為了避免多個線程同時獲取任務,導致超時時間未到達就任務觸發或者在等待任務超時時間時有新的任務被加入而無法響應。

由於ScheduledExecutorService可以使用多個線程,這樣也緩解了因為個別任務執行時間長導致的後續任務被阻塞的情況。不過延遲隊列也是一樣採用小頂堆的排序方式,因此添加任務和刪除任務的時間複雜度都是 O(Log2n) 。在任務數量很大的情況下,性能表現比較差。

更高效的數據結構

雖然Timer和ScheduledThreadPoolExecutor都提供了對延遲任務的支撐能力,但是由於新增任務和提取任務的時間複雜度都是 O(Log2n) ,在任務數量很大,比如幾萬,十幾萬的時候,性能的開銷就變得很巨大。

那麼,是否存在新增任務和提取任務比 O(Log2n) 複雜度更低的數據結構呢?答案是存在的。在論文《Hashed and Hierarchical Timing Wheels》中設計了一種名為時間輪( Timing Wheels )的數據結構,這種結構在處理延遲任務時,其新增任務和刪除任務的時間複雜度降低到了 O(1) 。

基本原理

時間輪的數據結構很類似於我們鐘錶上的數據指針,故而得名時間輪。其數據結構用圖示意如下

心跳與超時:高併發高性能的時間輪超時器

每一個時間“格子”我們稱之為槽位,槽位中存放著延遲任務隊列。槽位本身代表著一個時間單位,比如 1 秒。時間輪擁有的槽位個數就是該時間輪能夠處理的最大延遲跨度的任務,槽位的時間單位代表著時間輪的精度。這意味著小於時間單位的時間在該時間輪是無法被區分的。

槽位上的延遲任務隊列中的任務都有相同的延遲時間。每一個單位時間,指針都會移動到下一個槽位。當指針指向某一個槽位時,該槽位的延遲任務隊列中的任務都會被觸發。

當有一個延遲任務要插入時間輪時,首先計算其延遲時間與單位時間的餘值,從指針指向的當前槽位移動餘值的個數槽位,就是該延遲任務需要被放入的槽位。

舉個例子,時間輪有8個槽位,編號為 0 ~ 7 。指針當前指向槽位 2 。新增一個延遲時間為 4 秒的延遲任務,4 % 8 = 4,因此該任務會被插入 4 + 2 = 6,也就是槽位6的延遲任務隊列。

時間輪的槽位實現可以採用循環數組的方式達成,也就是讓指針在越過數組的邊界後重新回到起始下標。概括來說,可以將時間輪的算法描述為

用隊列來存儲延遲任務,同一個隊列中的任務,其延遲時間相同。用循環數組的方式來存儲元素,數組中的每一個元素都指向一個延遲任務隊列。

有一個當前指針指向數組中的某一個槽位,每間隔一個單位時間,指針就移動到下一個槽位。被指針指向的槽位的延遲隊列,其中的延遲任務全部被觸發。

在時間輪中新增一個延遲任務,將其延遲時間除以單位時間得到的餘值,從當前指針開始,移動餘值對應個數的槽位,就是延遲任務被放入的槽位。

基於這樣的數據結構,插入一個延遲任務的時間複雜度就下降到 O(1) 。而當指針指向到一個槽位時,該槽位連接的延遲任務隊列中的延遲任務全部被觸發。

延遲任務的觸發和執行不應該影響指針向後移動的時間精確性。因此一般情況下,用於移動指針的線程只負責任務的觸發,任務的執行交由其他的線程來完成。比如,可以將槽位上的延遲任務隊列放入到額外的線程池中執行,然後在槽位上新建一個空白的新的延遲任務隊列用於後續任務的添加。

支撐更多超過範圍的延遲時間

在基本原理中我們分析了時間輪的基礎結構。不過當時我們假設需要插入的延遲任務的時間不會超過時間輪的長度,也就是說每一個槽位上的延遲任務隊列中的任務的延遲時間都是相同的。

在這種情況下,要支持更大時間跨度的延遲任務,要麼增加時間輪的槽位數,要麼減少時間輪的精度,也就是每一個槽位代表的單位時間。時間輪的精度顯然是一個業務上的硬性要求,那麼只能增加槽位數。假設要求精度為 1 秒,要能支持延遲時間為 1 天的延遲任務,時間輪的槽位數需要 60 × 60 × 24 = 86400 。這就需要消耗更多的內存。顯然,單純增加槽位數並不是一個好的解決方案。

在論文中,針對大跨度的延遲任務支持,提供了兩種擴展方案。

方案一:不同輪次的延遲任務共存相同的延遲隊列

在該方案中,算法引入了“輪次”的概念,延遲任務的延遲時間除以時間輪長度得到的商值為輪次。延遲任務的延遲時間除以時間輪長度得到的餘數為要插入的槽位偏移量。

當插入延遲任務時首先計算輪次和槽位偏移量,通過槽位偏移量確定延遲任務插入的槽位。當指針指向某一個槽位時,對槽位指向的延遲任務隊列進行遍歷,其中輪次為0的延遲任務全部觸發,其餘任務則等待下一個週期。

通過引入輪次,就可以在有限的槽位上支持無窮時間範圍的延遲任務。但是雖然插入任務的時間複雜度仍然是 O(1) ,但是在延遲任務觸發時卻需要遍歷延遲任務隊列來確認其輪次是否為0。任務觸發時的時間複雜卻上升為了 O(n) 。

對於這個情況,還有一個變化的細節可以採用,就是將延遲任務隊列按照輪次進行排序,比方說使用小頂堆對延遲任務隊列進行排序。這樣,當指針指向一個槽位觸發延遲任務時,只需要不斷的從隊列頭取出任務進行輪次檢查,一旦任務輪次不等於0就可以停止。任務觸發的時間複雜度下降為 O(1) 。對應的,由於隊列是排序的了,任務插入的時候除了需要定位插入的槽位,還需要定位在隊列中的插入位置。插入的時間複雜度變化為 O(1) 和 O(Log2n) ,n 為該槽位上延遲任務隊列的長度。

方案二:多層次時間輪

看看手錶的設計,有秒針,分針,時針。像秒針與分針,雖然都有 60 格 ,但是各自的格子代表的時間長度不同。參考這個思路,我們可以聲明多個不同層級的時間輪,每一個時間輪的槽位的時間跨度是其次級時間輪的整體時間範圍。

當低層級的時間輪的指針完整的走完一圈,其對應的高層級時間輪對應的移動一個槽位。並且高層級時間輪指針指向的槽位中的任務按照延遲時間計算,重新放入到低層級時間輪的不同槽位中。這樣的方式,保證了每一個時間輪中的每一個槽位的延遲任務隊列中的任務都具備相同時間精度的延遲時間。

以精度為 1 秒,時間範圍為 1 天的時間輪為例子,可以設計三級時間輪:秒級時間輪有 60 個槽位,每個槽位的時間為 1 秒;分鐘級時間輪有 60 個槽位,每個槽位的時間為 60 秒;小時級時間輪有24個槽位,每個槽位的時間為 60 分鐘。當秒級時間輪走完 60 秒後,秒級時間輪的指針再次指向下標為0的槽位,而分鐘級時間輪的指針向後移動一個槽位,並且將該槽位上的延遲任務全部取出並且重新計算後放入秒級時間輪。

總共只需要 60 + 60 + 24 = 144 個槽位即可支撐。對比上面提到的單級時間輪需要 86400 個槽位而言,節省了相當的內存。

層級時間輪有兩種常見的做法:

  • 固定時間範圍:時間輪的個數,以及不同層級的時間輪的槽位數是通過構造方法的入參指定,這意味著時間輪整體能夠支撐的時間範圍是在構造方法的時候被確定。
  • 非固定時間範圍:定義好一個時間輪的槽位個數,以及最小的時間輪的槽位時間。當插入的延遲任務的時間超過時間輪範圍時則動態生成更高層級的時間輪。由於時間輪是在運行期生成,並且根據任務的延遲時間計算,當已經存在的時間輪不滿足其延遲時間範圍要求時,動態生成高層級時間輪,因此整體能夠支撐的時間範圍是沒有上限的。

Netty 的時間輪實現

時間輪算法的核心思想就是通過循環數組和指針移動的方式,將新增延遲任務的時間複雜度下降到 O(1) ,但是在具體實現上,包括如何處理更大時間跨度的延遲任務上,各家不同的實現都會有一些細節上的變化。下面我們以 Netty 中都時間輪實現為例子來進行代碼分析。

接口定義

Netty 的實現自定義了一個超時器的接口io.netty.util.Timer,其方法如下

<code>public interface Timer
{
//新增一個延時任務,入參為定時任務TimerTask,和對應的延遲時間
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
//停止時間輪的運行,並且返回所有未被觸發的延時任務

Set < Timeout > stop();
}
public interface Timeout
{
Timer timer();
TimerTask task();
boolean isExpired();
boolean isCancelled();
boolean cancel();
}/<code>

Timeout接口是對延遲任務的一個封裝,其接口方法說明其實現內部需要維持該延遲任務的狀態。後續我們分析其實現內部代碼時可以更容易的看到。

Timer接口有唯一實現HashedWheelTimer。首先來看其構造方法,如下

構建循環數組

<code>public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts)
{
//省略代碼,省略參數非空檢查內容。
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
//省略代碼,省略槽位時間範圍檢查,避免溢出以及小於 1 毫秒。
workerThread = threadFactory.newThread(worker);
//省略代碼,省略資源洩漏追蹤設置以及時間輪實例個數檢查
}/<code>

首先是方法createWheel,用於創建時間輪的核心數據結構,循環數組。來看下其方法內容

<code>private static HashedWheelBucket[] createWheel(int ticksPerWheel)
{

//省略代碼,確認 ticksPerWheel 處於正確的區間
//將 ticksPerWheel 規範化為 2 的次方冪大小。
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for(int i = 0; i < wheel.length; i++)
{
wheel[i] = new HashedWheelBucket();
}
return wheel;
}/<code>

數組的長度為 2 的次方冪方便進行求商和取餘計算。

HashedWheelBucket內部存儲著由HashedWheelTimeout節點構成的雙向鏈表,並且存儲著鏈表的頭節點和尾結點,方便於任務的提取和插入。

新增延遲任務

方法HashedWheelTimer#newTimeout用於新增延遲任務,下面來看下代碼

<code>public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit)
{
//省略代碼,用於參數檢查
start();
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
if(delay > 0 && deadline < 0)
{
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}/<code>

可以看到,在新增任務的時候,任務並不是直接進入到循環數組中,而是首先被放入到一個隊列,也就是屬性timeouts,該隊列是一個 MPSC 類型的隊列,採用這個模式主要出於提升併發性能考慮,因為這個隊列只有線程workerThread會進行任務提取操作。

該線程是在構造方法中通過調用workerThread = threadFactory.newThread(worker)被創建。但是創建之後並不是馬上執行線程的start方法,其啟動的時機是這個時間輪第一次新增延遲任務的時候,也就是本方法中的start方法的內容。下面是其代碼

<code>public void start()
{
switch(WORKER_STATE_UPDATER.get(this))
{
case WORKER_STATE_INIT:
if(WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED))
{
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
while(startTime == 0)
{
try
{
startTimeInitialized.await();
}
catch(InterruptedException ignore)
{
// Ignore - it will be ready very soon.
}
}
}/<code>

方法很明顯的分為兩個部分,第一部分為Switch方法塊,通過對狀態變量的 CAS 操作,確保只有一個線程能夠執行workerThread.start()方法來啟動工作線程,避免併發異常。第二部分為阻塞等待,通過CountDownLatch類型變量startTimeInitialized執行阻塞等待,用於等待工作線程workerThread真正進入工作狀態。

從newTimeout方法的角度來看,插入延遲任務首先是放入隊列中,之前分析數據結構的時候也說過任務的觸發是指針指向時間輪中某個槽位時進行,那麼必然存在一個需要將隊列中的延遲任務放入到時間輪的數組之中的工作。這個動作顯然就是就是由workerThread工作線程來完成。下面就來看下這個線程的具體代碼內容。

工作線程workerThread

工作線程是依託於HashedWheelTimer.Worker這個實現了Runnable接口的類進行工作的,那下面看下其對run方法的實現代碼,如下

<code>public void run()
{
{//代碼塊①
startTime = System.nanoTime();
if(startTime == 0)
{
//使用startTime==0 作為線程進入工作狀態模式標識,因此這裡重新賦值為1
startTime = 1;
}
//通知外部初始化工作線程的線程,工作線程已經啟動完畢
startTimeInitialized.countDown();
}
{//代碼塊②
do {
final long deadline = waitForNextTick();
if(deadline > 0)
{
int idx = (int)(tick & mask);
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];

transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
}
{//代碼塊③
for(HashedWheelBucket bucket: wheel)
{
bucket.clearTimeouts(unprocessedTimeouts);
}
for(;;)
{
HashedWheelTimeout timeout = timeouts.poll();
if(timeout == null)
{
break;
}
if(!timeout.isCancelled())
{
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
}/<code>

線程啟動與準備工作

為了方便閱讀,這邊將run方法的內容分為三個代碼塊。首先來看代碼塊①。通過系統調用System.nanoTime為啟動時間startTime設置初始值,該變量代表了時間輪的基線時間,用於後續相對時間的計算。賦值完畢後,通過startTimeInitialized變量對外部的等待線程進行通知。

驅動指針和任務觸發

接著來看代碼塊②。這是主要的工作部分,整體是在一個while循環中,確保工作線程只在時間輪沒有被終止的時候工作。首先來看方法waitForNextTick,在時間輪中,指針移動一次,稱之為一個tick,這個方法顯然內部應該是用於等待指針移動到下一個tick,來看具體代碼,如下

<code>private long waitForNextTick()
{
long deadline = tickDuration * (tick + 1);
for(;;)
{
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if(sleepTimeMs <= 0)
{
if(currentTime == Long.MIN_VALUE)
{
return -Long.MAX_VALUE;
}
else
{
return currentTime;
}
}
if(PlatformDependent.isWindows())
{
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try
{
Thread.sleep(sleepTimeMs);
}
catch(InterruptedException ignored)
{
if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN)
{
return Long.MIN_VALUE;
}
}
}
}/<code>

整個方法的思路很簡單,前面說過,時間輪每移動一次指針,意味著一個tick。這裡tick可以看成是指針移動的次數。由於槽位的時間範圍是固定的,因此可以簡單的計算出來指針移動到下一個槽位,理論上應該經過的時間,也就是long deadline = tickDuration * (tick + 1) 。之後再計算從時間輪啟動到當前,

實際經過的時間,也就是long currentTime = System.nanoTime() - startTime 。二者的差值就是線程所需要睡眠的時間。

如果差值小於0,意味著實際經過的時間超過了理論時間,此時已經超出了應該休眠的範圍,方法需要立即返回。由於在這個方法的執行過程中,可能會遇到時間輪被停止的情況,因此使用一個特殊值來表達這個事件,也就是Long.MIN_VALUE,這也是為什麼currentTime要避開這個值的原因。

還有一點需要注意,Thread.sleep方法的實現是依託於操作系統提供的中斷檢查,也就是操作系統會在每一箇中斷的時候去檢查是否有線程需要喚醒並且提供CPU資源。默認情況下 Linux 的中斷間隔是 1 毫秒,而 Windows 的中斷間隔是 10 毫秒或者 15 毫秒,具體取決於硬件識別。

如果是在 Windows 平臺下,當方法調用Thread.sleep傳入的參數不是10的整數倍時,其內部會調用系統方法timeBeginPeriod()和timeEndPeriod()來修改中斷週期為 1 毫秒,並且在休眠結束後再次設置回默認值。這樣的目的是為了保證休眠時間的準確性。但是在 Windows 平臺下,頻繁的調用修改中斷週期會導致 Windows 時鐘出現異常,大多數時候的表現是導致時鐘加快。這將導致比如嘗試休眠 10 秒時,實際上只休眠了 9 秒。所以在這裡,通過sleepTimeMs = sleepTimeMs / 10 * 10保證了sleepTimeMs 是 10 的整數倍,從而避免了 Windows 的這個 BUG 。

當方法waitForNextTick返回後,並且返回的值是正數,意味著當前tick的休眠等待已經完成,可以進行延遲任務的觸發處理了。通過int idx = (int)(tick & mask)調用,確定下一個被觸發延遲任務的槽位在循環數組中的下標。在處理觸發任務之前,首先將已經取消的延遲任務從槽位所指向的延遲任務隊列中刪除。每次調用HashedWheelTimer#newTimeout新增延遲任務時都會返回一個Timeout對象,可以通過cancle方法將這個延遲任務取消。當執行取消動作的時候,並不會直接從延遲隊列中刪除,而是將這個對象放入到取消隊列,也就是HashedWheelTimer.cancelledTimeouts屬性。在準備遍歷槽位上延遲任務隊列之前,通過方法processCancelledTasks來遍歷這個取消隊列,將其中的延遲任務從各自槽位上的延遲任務隊列中刪除。使用這種方式的好處在於延遲任務的刪除只有一個線程會進行,避免了多線程帶來的併發干擾,減少了開發難度。

在處理完取消的延遲任務後,調用方法transferTimeoutsToBuckets來將新增延遲任務隊列HashedWheelTimer.timeouts中的延遲任務分別添加到合適其延遲時間的槽位中。方法的代碼很簡單,就是循環不斷從timeouts取出任務,並且計算其延遲時間與時間輪範圍的商值和餘數,結果分別為其輪次與槽位下標。根據槽位下標將該任務添加到槽位對應的延遲任務隊列中。

在這裡可以看到 Netty 作者對時間輪這一結構的併發設計,新增任務是向 MPSC 隊列新增元素實現。而槽位上的延遲任務隊列只有時間輪本身的線程能夠進行新增和刪除,設計為了 SPSC 模式。前者是為了提高無鎖併發下的性能,後者則是通過約束,減少了設計難度。

transferTimeoutsToBuckets方法每次最多隻會轉移 100000 個延遲任務到合適的槽位中,這是為了避免外部循環添加任務導致的餓死。方法執行完畢後,就到了槽位上延遲任務的觸發處理,也就是方法HashedWheelBucket#expireTimeouts的功能,方法內的邏輯也很簡單。遍歷隊列,如果延遲任務的輪次不為 0,則減 1。否則觸發任務執行方法,也就是HashedWheelTimeout#expire。該方法內部依然通過 CAS 方式對狀態進行更新,避免方法的觸發和取消之間的競爭衝突。從這個方法的實現可以看到,Netty 採用了輪次的方式來對超出時間輪範圍的延遲時間進行支持。多層級時間輪的實現相比輪次概念的實現更為複雜,考慮到在網絡IO應用中,超出時間輪範圍的場景比較少,使用輪次的方式去支撐更大的時間,是一個相對容易實現的方案。

當需要被觸發的延遲任務都被觸發後,通過tick加 1 來表達指針移動到下一個槽位。

時間輪停止

外部線程通過調用HashedWheelTimer#stop方法來停止時間輪,停止的方式很簡單,就是通過 CAS 調用來修改時間輪的狀態屬性。而在代碼塊②中通過循環的方式在每一次tick都會檢查這個狀態位。代碼塊③的內容很簡單,遍歷所有的槽位,並且遍歷槽位的延遲任務隊列,將所有未到達延遲時間並且未取消的任務,都放入到一個集合中,最終將這個集合返回。這個集合內存儲的就是所有未能執行的延遲任務。

思考總結

在處理大量延遲任務的場景中,時間輪是一個很高效的算法與數據結構。Netty 在對時間輪的實現上,在添加任務,過期任務,刪除任務等環節進行了一些細節上的調整。實際上,不同中間件中都有對時間輪的一些實現,各自也都有區別,但是核心都是圍繞在循環數組與槽位過期這個概念上。不同的細節變化有各自適合的場景和考量。


分享到:


相關文章: