DelayQueue 一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。基於 PriorityQueue 實現的延遲隊列,用 ReentrantLock 提供線程安全性。
其元素必須實現 Delayed 接口。
該類可用來實現定時調度的功能,當前時間與任務的下次執行時間的距離作為延遲時間。
實現上採用 Leader_Follower 模式 的變體進行優化:leader 進行限時等待,其他線程作為 follower 無限等待。leader 在等待的過程中可能插入一個更快到期的元素,那麼舊 leader 就會被作廢,如果又有一個線程來獲取,那麼它會作為新的 leader 根據新的隊列頭元素進行限時等待。
public interface Delayed extends Comparable<delayed> {/<delayed>
// 返回與此對象相關的剩餘延遲時間,以給定的時間單位表示。
long getDelay(TimeUnit unit);
}
public class DelayQueue
implements BlockingQueue
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue
// 該變量被賦值給等待元素的隊列頭結點。這是 Leader_Follower 模式的變體,
// 用於減少限時等待。當一個線程成為 leader 時,它只等待下一個延遲到達,
// 但其他的線程都是無限等待的。
// leader 線程必須在從 take() 或 poll(...) 方法返回前通知其他線程,
// 除非其他線程在這個過程中成為了 leader 。
// 每當隊列的頭被一個更早到期的元素取代時,leader 字段設置為 null 表示作廢,
// 同時,一些等待線程、不一定必須是當前的 leader 被通知。
// 因此等待線程必須準備好在等待過程中獲取和失去 leadership。
private Thread leader = null;
// 當一個新的元素在隊列頭部變的可用或新的線程可能需要成為 leader 時發出通知。
private final Condition available = lock.newCondition();
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
// 新元素成為了隊列的頭,作廢已有的 leader,通知等待線程。
// 問題1:為啥要廢棄 leader?
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
// 隊列為空,只能無限等待
else {
long delay = first.getDelay(NANOSECONDS);
// 隊列頭到期了
// 此時不一定是 leader 獲得了隊列頭元素
if (delay <= 0)
return q.poll();
// 在等待的過程中不持有引用。
// 問題2:為啥專門放棄引用?
first = null;
if (leader != null)
// 如果已經有 leader 了,作為 follower 進行無限等待
available.await();
else {
// 隊列裡有未到期的元素、且沒有 leader,自己成為 leader。
Thread thisThread = Thread.currentThread();
leader = thisThread;
// 作為 leader 進行限時等待
try {
available.awaitNanos(delay);
} finally {
// 作廢 leader。
if (leader == thisThread)
// 問題3:為啥在這裡作廢 leader?
leader = null;
}
}
}
}
} finally {
// leader 不為空說明 leader 線程還在限時等待,不需要喚醒
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
}
問題回答
- 問題1:此時可能是新元素比原來的 leader 的等待時間更短,原來的 leader 失去了 leadership。此處作廢是為了讓舊 leader 等待的元素之前的元素能夠儘快被處理。比如舊 leader 進行限時等待 1000ms,此時連續進來 30ms/50ms 的兩個元素;如果此時不作廢,後續就有線程來獲取元素,會因為有舊 leader 進入無限等待;作廢後,後續的線程可能只需要限時等待 30ms,提高了延遲隊列的準確性。
- 問題2:因為在等待過程中,這個元素可能被其他線程處理、需要進行垃圾回收,防止被這個線程的棧引用了而沒法垃圾回收。
- 問題3:首先,leader 可能在等待的過程中變了,因此需要先判斷 leader == thisThread。如果在 return 語句前進行作廢,對於獲取競爭不激烈的場景是不需要作廢,這樣會多出一些沒必要的判斷 leader == thisThread。另外是限時等待到時後應該都能獲得元素,線程到時喚醒後作廢 leader 也是合理的。
小結
引入 leader_follower 模式是為了優化在高併發下的等待,儘量用無限等待替代限時等待,也防止了多個線程同時競爭頭結點。
實現上做到了 GC 友好。
閱讀更多 架構師的修煉之路 的文章