時間輪算法(TimingWheel)是如何實現的?

前言

我在 2. SOFAJRaft源碼分析—JRaft的定時任務調度器是怎麼做的?

這篇文章裡已經講解過時間輪算法在JRaft中是怎麼應用的,但是我感覺我並沒有講解清楚這個東西,導致看了這篇文章依然和沒看是一樣的,所以我打算重新說透時間輪算法。

時間輪的應用並非 JRaft 獨有,其應用場景還有很多,在 Netty、Akka、Quartz、ZooKeeper 、Kafka等組件中都存在時間輪的蹤影。

我們下面講解的時間輪的實現以JRaft中的為例子進行講解,因為JRaft這部分的代碼是參考Netty的,所以大家也可以去Netty中去尋找源碼實現。

時間輪用來解決什麼問題?

如果一個系統中存在著大量的調度任務,而大量的調度任務如果每一個都使用自己的調度器來管理任務的生命週期的話,浪費cpu的資源並且很低效。

時間輪是一種高效來利用線程資源來進行批量化調度的一種調度模型。把大批量的調度任務全部都綁定到同一個的調度器上面,使用這一個調度器來進行所有任務的管理(manager),觸發(trigger)以及運行(runnable)。能夠高效的管理各種延時任務,週期任務,通知任務等等。

不過,時間輪調度器的時間精度可能不是很高,對於精度要求特別高的調度任務可能不太適合。因為時間輪算法的精度取決於,時間段“指針”單元的最小粒度大小,比如時間輪的格子是一秒跳一次,那麼調度精度小於一秒的任務就無法被時間輪所調度。

時間輪結構

時間輪算法(TimingWheel)是如何實現的?

如圖,JRaft中時間輪(HashedWheelTimer)是一個存儲定時任務的環形隊列,底層採用數組實現,數組中的每個元素可以存放一個定時任務列表(HashedWheelBucket),HashedWheelBucket是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項(HashedWheelTimeout),其中封裝了真正的定時任務(TimerTask)。

時間輪由多個時間格組成,每個時間格代表當前時間輪的基本時間跨度(tickDuration)。時間輪的時間格個數是固定的,可用 wheel.length 來表示。

時間輪還有一個錶盤指針(tick),用來表示時間輪當前指針跳動的次數,可以用tickDuration * (tick + 1)來表示下一次到期的任務,需要處理此時間格所對應的 HashedWheelBucket 中的所有任務。

時間輪運行邏輯

時間輪在啟動的時候會記錄一下當前啟動的時間賦值給startTime。時間輪在添加任務的時候首先會計算延遲時間(deadline),比如一個任務的延遲時間為24ms,那麼會將當前的時間(currentTime)+24ms-時間輪啟動時的時間(startTime)。然後將任務封裝成HashedWheelTimeout加入到timeouts隊列中,作為緩存。

時間輪在運行的時候會將timeouts中緩存的HashedWheelTimeout任務取10萬個出來進行遍歷。

然後需要計算出幾個參數值:

  1. HashedWheelTimeout的總共延遲的次數:將每個任務的延遲時間(deadline)/tickDuration 計算出tick需要總共跳動的次數;
  2. 計算時間輪round次數:根據計算的需要走的(總次數- 當前tick數量)/ 時間格個數(wheel.length)。比如tickDuration為1ms,時間格個數為20個,那麼時間輪走一圈需要20ms,那麼添加進一個延時為24ms的數據,如果當前的tick為0,那麼計算出的輪數為1,指針沒運行一圈就會將round取出來減一,所以需要轉動到第二輪之後才可以將輪數round減為0之後才會運行
  3. 計算出該任務需要放置到時間輪(wheel)的槽位,然後加入到槽位鏈表最後

將timeouts中的數據放置到時間輪wheel中之後,計算出當前時針走到的槽位的位置,並取出槽位中的鏈表數據,將deadline和當前的時間做對比,運行過期的數據。

源碼分析

構造器

<code>public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
long maxPendingTimeouts) {

if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
//unit = MILLISECONDS
if (unit == null) {
throw new NullPointerException("unit");

}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}

// Normalize ticksPerWheel to power of two and initialize the wheel.
// 創建一個HashedWheelBucket數組
// 創建時間輪基本的數據結構,一個數組。長度為不小於ticksPerWheel的最小2的n次方
wheel = createWheel(ticksPerWheel);
// 這是一個標示符,用來快速計算任務應該呆的格子。
// 我們知道,給定一個deadline的定時任務,其應該呆的格子=deadline%wheel.length.但是%操作是個相對耗時的操作,所以使用一種變通的位運算代替:
// 因為一圈的長度為2的n次方,mask = 2^n-1後低位將全部是1,然後deadline&mast == deadline%wheel.length
// java中的HashMap在進行hash之後,進行index的hash尋址尋址的算法也是和這個一樣的
mask = wheel.length - 1;

// Convert tickDuration to nanos.
//tickDuration傳入是1的話,這裡會轉換成1000000
this.tickDuration = unit.toNanos(tickDuration);

// Prevent overflow.
// 校驗是否存在溢出。即指針轉動的時間間隔不能太長而導致tickDuration*wheel.length>Long.MAX_VALUE
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE
/ wheel.length));
}
//將worker包裝成thread

workerThread = threadFactory.newThread(worker);
//maxPendingTimeouts = -1
this.maxPendingTimeouts = maxPendingTimeouts;

//如果HashedWheelTimer實例太多,那麼就會打印一個error日誌
if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT
&& warnedTooManyInstances.compareAndSet(false, true)) {
reportTooManyInstances();
}
}/<code>

在這個構造器中有幾個細節需要注意:

  1. 調用createWheel方法創建的wheel數組一定是2次方數,比如傳入的ticksPerWheel是6,那麼初始化的wheel長度一定是8。這樣做是為了讓mask & tick 來計算出槽位
  2. tickDuration用的是納秒
  3. 在構造裡面並不會裡面啟動時間輪,而是要等到有第一個任務加入到時間輪的時候才啟動。在構造器裡面會將工作線程worker封裝成workerThread

放入任務到時間輪中

<code>public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}

long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();

throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
+ ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 如果時間輪沒有啟動,則啟動
start();

// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// Guard against overflow.
//在delay為正數的情況下,deadline是不可能為負數
//如果為負數,那麼說明超過了long的最大值
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 這裡定時任務不是直接加到對應的格子中,而是先加入到一個隊列裡,然後等到下一個tick的時候,
// 會從隊列裡取出最多100000個任務加入到指定的格子中
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
//Worker會去處理timeouts隊列裡面的數據
timeouts.add(timeout);
return timeout;
}/<code>
  1. 如果時間輪沒有啟動,那麼就調用start方法啟動時間輪,啟動時間輪之後會為startTime設置為當前時間
  2. 計算延遲時間deadline
  3. 將task任務封裝到HashedWheelTimeout中,然後添加到timeouts隊列中進行緩存

start

<code>private final CountDownLatch                                     startTimeInitialized   = new CountDownLatch(1);

public void start() {
//workerState一開始的時候是0(WORKER_STATE_INIT),然後才會設置為1(WORKER_STATE_STARTED)
switch (workerStateUpdater.get(this)) {
case WORKER_STATE_INIT:
//使用cas來獲取啟動調度的權力,只有競爭到的線程允許來進行實例啟動
if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
//如果成功設置了workerState,那麼就調用workerThread線程
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}

// 等待worker線程初始化時間輪的啟動時間
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
//這裡使用countDownLauch來確保調度的線程已經被啟動
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}/<code>

start方法會根據當前的workerState狀態來啟動時間輪。並且用了startTimeInitialized來控制線程的運行,如果workerThread沒有啟動起來,那麼newTimeout方法會一直阻塞在運行start方法中。如果不阻塞,newTimeout方法會獲取不到startTime。

啟動時間輪

時間輪的啟動在HashedWheelTimer的內部類Worker中。調用workerThread#start方法會調用Worker的run方法啟動時間輪。

下面我們看時間輪啟動做了什麼,下面的分析不考慮任務被取消的情況。

Worker#run

<code>public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}

//HashedWheelTimer的start方法會繼續往下運行
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();

do {
//返回的是當前的nanoTime- startTime
//也就是返回的是 每 tick 一次的時間間隔
final long deadline = waitForNextTick();
if (deadline > 0) {
//算出時間輪的槽位
int idx = (int) (tick & mask);
//移除cancelledTimeouts中的bucket
// 從bucket中移除timeout
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
// 將newTimeout()方法中加入到待處理定時任務隊列中的任務加入到指定的格子中
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;

}
// 校驗如果workerState是started狀態,那麼就一直循環
} while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
//如果有沒有被處理的timeout,那麼加入到unprocessedTimeouts對列中
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
//處理被取消的任務
processCancelledTasks();
}/<code>
  1. 時間輪運行的時候首先會記錄一下啟動時間(startTime),然後調用startTimeInitialized釋放外層的等待線程;
  2. 進入dowhile循環,調用waitForNextTick睡眠等待到下一次的tick指針的跳動,並返回當前時間減去startTime作為deadline
  3. 由於mask= wheel.length -1 ,wheel是2的次方數,所以可以直接用tick & mask 計算出此次在wheel中的槽位
  4. 調用processCancelledTasks將cancelledTimeouts隊列中的任務取出來,並將當前的任務從時間輪中移除
  5. 調用transferTimeoutsToBuckets方法將timeouts隊列中緩存的數據取出加入到時間輪中
  6. 運行目前指針指向的槽位中的bucket鏈表數據

時間輪指針跳動

waitForNextTick

<code>//sleep, 直到下次tick到來, 然後返回該次tick和啟動時間之間的時長
private long waitForNextTick() {
//tickDuration這裡是100000
//tick表示總tick數
long deadline = tickDuration * (tick + 1);

for (;;) {
final long currentTime = System.nanoTime() - startTime;
// 計算需要sleep的時間, 之所以加999999後再除10000000,前面是1所以這裡需要減去1,
// 才能計算準確,還有通過這裡可以看到 其實線程是以睡眠一定的時候再來執行下一個ticket的任務的,
//這樣如果ticket的間隔設置的太小的話,系統會頻繁的睡眠然後啟動,
//其實感覺影響部分的性能,所以為了更好的利用系統資源步長可以稍微設置大點
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
//sleepTimeMs小於零表示走到了下一個時間輪位置
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}

// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (Platform.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}

try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}/<code>

可以想象一下在時鐘的秒鐘上面秒與秒之間的時間是需要等待的,那麼waitForNextTick這個方法就是根據當前的時間計算出跳動到下個時間的間隔時間,並進行sleep操作,然後返回當前時間距離時間輪啟動時間的時間段。

轉移任務到時間輪中

在調用時間輪的方法加入任務的時候並沒有直接加入到時間輪中,而是緩存到了timeouts隊列中,所以在運行的時候需要將timeouts隊列中的任務轉移到時間輪數據的鏈表中

transferTimeoutsToBuckets

<code>private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
// 每次tick只處理10w個任務,以免阻塞worker線程
for (int i = 0; i < 100000; i++) {

HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
//已經被取消了;
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
//calculated = tick 次數
long calculated = timeout.deadline / tickDuration;
// 計算剩餘的輪數, 只有 timer 走夠輪數, 並且到達了 task 所在的 slot, task 才會過期
timeout.remainingRounds = (calculated - tick) / wheel.length;
//如果任務在timeouts隊列裡面放久了, 以至於已經過了執行時間, 這個時候就使用當前tick, 也就是放到當前bucket, 此方法調用完後就會被執行
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
//// 算出任務應該插入的 wheel 的 slot, slotIndex = tick 次數 & mask, mask = wheel.length - 1
int stopIndex = (int) (ticks & mask);

HashedWheelBucket bucket = wheel[stopIndex];
//將timeout加入到bucket鏈表中
bucket.addTimeout(timeout);
}
}/<code>

在這個轉移方法中,寫死了一個循環,每次都只轉移10萬個任務。

然後根據HashedWheelTimeout的deadline延遲時間計算出時間輪需要運行多少次才能運行當前的任務,如果當前的任務延遲時間大於時間輪跑一圈所需要的時間,那麼就計算需要跑幾圈才能到這個任務運行。

最後計算出該任務在時間輪中的槽位,添加到時間輪的鏈表中。

運行時間輪中的任務

當指針跳到時間輪的槽位的時間,會將槽位的HashedWheelBucket取出來,然後遍歷鏈表,運行其中到期的任務。

expireTimeouts

<code>// 過期並執行格子中的到期任務,tick到該格子的時候,worker線程會調用這個方法
//根據deadline和remainingRounds判斷任務是否過期
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;

// process all timeouts
//遍歷格子中的所有定時任務
while (timeout != null) {
// 先保存next,因為移除後next將被設置為null
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
//從bucket鏈表中移除當前timeout,並返回鏈表中下一個timeout
next = remove(timeout);
//如果timeout的時間小於當前的時間,那麼就調用expire執行task
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
//不可能發生的情況,就是說round已經為0了,deadline卻>當前槽的deadline
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)",
timeout.deadline, deadline));
}

} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
//因為當前的槽位已經過了,說明已經走了一圈了,把輪數減一
timeout.remainingRounds--;
}
//把指針放置到下一個timeout
timeout = next;
}
}/<code>

HashedWheelBucket是一個鏈表,所以我們需要從head節點往下進行遍歷。如果鏈表沒有遍歷到鏈表尾部那麼就繼續往下遍歷。

獲取的timeout節點節點,如果剩餘輪數remainingRounds大於0,那麼就說明要到下一圈才能運行,所以將剩餘輪數減一;

如果當前剩餘輪數小於等於零了,那麼就將當前節點從bucket鏈表中移除,並判斷一下當前的時間是否大於timeout的延遲時間,如果是則調用timeout的expire執行任務。


分享到:


相關文章: