JUC 延遲隊列 DelayQueue

JUC 延遲隊列 DelayQueue

DelayQueue 一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。基於 PriorityQueue 實現的延遲隊列,用 ReentrantLock 提供線程安全性。

其元素必須實現 Delayed 接口。

該類可用來實現定時調度的功能,當前時間與任務的下次執行時間的距離作為延遲時間。

實現上採用 Leader_Follower 模式 的變體進行優化:leader 進行限時等待,其他線程作為 follower 無限等待。leader 在等待的過程中可能插入一個更快到期的元素,那麼舊 leader 就會被作廢,如果又有一個線程來獲取,那麼它會作為新的 leader 根據新的隊列頭元素進行限時等待。

public interface Delayed extends Comparable<delayed> {/<delayed>

// 返回與此對象相關的剩餘延遲時間,以給定的時間單位表示。

long getDelay(TimeUnit unit);

}

public class DelayQueue extends AbstractQueue

implements BlockingQueue {

private final transient ReentrantLock lock = new ReentrantLock();

private final PriorityQueue q = new PriorityQueue();

// 該變量被賦值給等待元素的隊列頭結點。這是 Leader_Follower 模式的變體,

// 用於減少限時等待。當一個線程成為 leader 時,它只等待下一個延遲到達,

// 但其他的線程都是無限等待的。

// leader 線程必須在從 take() 或 poll(...) 方法返回前通知其他線程,

// 除非其他線程在這個過程中成為了 leader 。

// 每當隊列的頭被一個更早到期的元素取代時,leader 字段設置為 null 表示作廢,

// 同時,一些等待線程、不一定必須是當前的 leader 被通知。

// 因此等待線程必須準備好在等待過程中獲取和失去 leadership。

private Thread leader = null;

// 當一個新的元素在隊列頭部變的可用或新的線程可能需要成為 leader 時發出通知。

private final Condition available = lock.newCondition();

public boolean offer(E e) {

final ReentrantLock lock = this.lock;

lock.lock();

try {

q.offer(e);

if (q.peek() == e) {

// 新元素成為了隊列的頭,作廢已有的 leader,通知等待線程。

// 問題1:為啥要廢棄 leader?

leader = null;

available.signal();

}

return true;

} finally {

lock.unlock();

}

}

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

for (;;) {

E first = q.peek();

if (first == null)

available.await();

// 隊列為空,只能無限等待

else {

long delay = first.getDelay(NANOSECONDS);

// 隊列頭到期了

// 此時不一定是 leader 獲得了隊列頭元素

if (delay <= 0)

return q.poll();

// 在等待的過程中不持有引用。

// 問題2:為啥專門放棄引用?

first = null;

if (leader != null)

// 如果已經有 leader 了,作為 follower 進行無限等待

available.await();

else {

// 隊列裡有未到期的元素、且沒有 leader,自己成為 leader。

Thread thisThread = Thread.currentThread();

leader = thisThread;

// 作為 leader 進行限時等待

try {

available.awaitNanos(delay);

} finally {

// 作廢 leader。

if (leader == thisThread)

// 問題3:為啥在這裡作廢 leader?

leader = null;

}

}

}

}

} finally {

// leader 不為空說明 leader 線程還在限時等待,不需要喚醒

if (leader == null && q.peek() != null)

available.signal();

lock.unlock();

}

}

}

問題回答

  • 問題1:此時可能是新元素比原來的 leader 的等待時間更短,原來的 leader 失去了 leadership。此處作廢是為了讓舊 leader 等待的元素之前的元素能夠儘快被處理。比如舊 leader 進行限時等待 1000ms,此時連續進來 30ms/50ms 的兩個元素;如果此時不作廢,後續就有線程來獲取元素,會因為有舊 leader 進入無限等待;作廢後,後續的線程可能只需要限時等待 30ms,提高了延遲隊列的準確性。
  • 問題2:因為在等待過程中,這個元素可能被其他線程處理、需要進行垃圾回收,防止被這個線程的棧引用了而沒法垃圾回收。
  • 問題3:首先,leader 可能在等待的過程中變了,因此需要先判斷 leader == thisThread。如果在 return 語句前進行作廢,對於獲取競爭不激烈的場景是不需要作廢,這樣會多出一些沒必要的判斷 leader == thisThread。另外是限時等待到時後應該都能獲得元素,線程到時喚醒後作廢 leader 也是合理的。

小結

引入 leader_follower 模式是為了優化在高併發下的等待,儘量用無限等待替代限時等待,也防止了多個線程同時競爭頭結點。

實現上做到了 GC 友好。


分享到:


相關文章: