java 線程池問題,我相信很多人都一知半解的,包括我自己在仔仔細細看源碼之前,也有許多的不解,甚至有些地方我一直都沒有理解到位。
說到線程池實現,那麼就不得不涉及到各種 BlockingQueue 的實現,那麼我想就 BlockingQueue 的問題和大家分享分享我瞭解的一些知識。
本文沒有像之前分析 AQS 那樣一行一行源碼分析了,不過還是把其中最重要和最難理解的代碼說了一遍,所以不免篇幅略長。本文涉及到比較多的 Doug Lea 對 BlockingQueue 的設計思想,希望有心的讀者真的可以有一些收穫,我覺得自己還是寫了一些乾貨的。
本文直接參考 Doug Lea 寫的 Java doc 和註釋,這也是我們在學習 java 併發包時最好的材料了。希望大家能有所思、有所悟,學習 Doug Lea 的代碼風格,並將其優雅、嚴謹的作風應用到我們寫的每一行代碼中。
目錄:
- BlockingQueue
- BlockingQueue 實現之 ArrayBlockingQueue
- BlockingQueue 實現之 LinkedBlockingQueue
- BlockingQueue 實現之 SynchronousQueue
- BlockingQueue 實現之 PriorityBlockingQueue
- 總結
BlockingQueue
開篇先介紹下 BlockingQueue 這個接口的規則,後面再看其實現。
首先,最基本的來說, BlockingQueue 是一個先進先出的隊列(Queue),為什麼說是阻塞(Blocking)的呢?是因為 BlockingQueue 支持當獲取隊列元素但是隊列為空時,會阻塞等待隊列中有元素再返回;也支持添加元素時,如果隊列已滿,那麼等到隊列可以放入新元素時再放入。
BlockingQueue 是一個接口,繼承自 Queue,所以其實現類也可以作為 Queue 的實現來使用,而 Queue 又繼承自 Collection 接口。
BlockingQueue 對插入操作、移除操作、獲取元素操作提供了四種不同的方法用於不同的場景中使用:1、拋出異常;2、返回特殊值(null 或 true/false,取決於具體的操作);3、阻塞等待此操作,直到這個操作成功;4、阻塞等待此操作,直到成功或者超時指定時間。總結如下:
Throws exceptionSpecial valueBlocksTimes outInsertadd(e)offer(e)put(e)offer(e, time, unit)Removeremove()poll()take() poll(time, unit)Examineelement()peek()not applicablenot applicable
BlockingQueue 的各個實現都遵循了這些規則,當然我們也不用死記這個表格,知道有這麼回事,然後寫代碼的時候根據自己的需要去看方法的註釋來選取合適的方法即可。
對於 BlockingQueue,我們的關注點應該在 put(e) 和 take() 這兩個方法,因為這兩個方法是帶阻塞的。
BlockingQueue 不接受 null 值的插入,相應的方法在碰到 null 的插入時會拋出 NullPointerException 異常。null 值在這裡通常用於作為特殊值返回(表格中的第三列),代表 poll 失敗。所以,如果允許插入 null 值的話,那獲取的時候,就不能很好地用 null 來判斷到底是代表失敗,還是獲取的值就是 null 值。
一個 BlockingQueue 可能是有界的,如果在插入的時候,發現隊列滿了,那麼 put 操作將會阻塞。通常,在這裡我們說的無界隊列也不是說真正的無界,而是它的容量是 Integer.MAX_VALUE(21億多)。
BlockingQueue 是設計用來實現生產者-消費者隊列的,當然,你也可以將它當做普通的 Collection 來用,前面說了,它實現了 java.util.Collection 接口。例如,我們可以用 remove(x) 來刪除任意一個元素,但是,這類操作通常並不高效,所以儘量只在少數的場合使用,比如一條消息已經入隊,但是需要做取消操作的時候。
BlockingQueue 的實現都是線程安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作。如 addAll(c) 有可能在添加了一些元素後中途拋出異常,此時 BlockingQueue 中已經添加了部分元素,這個是允許的,取決於具體的實現。
BlockingQueue 不支持 close 或 shutdown 等關閉操作,因為開發者可能希望不會有新的元素添加進去,此特性取決於具體的實現,不做強制約束。
最後,BlockingQueue 在生產者-消費者的場景中,是支持多消費者和多生產者的,說的其實就是線程安全問題。
相信上面說的每一句都很清楚了,BlockingQueue 是一個比較簡單的線程安全容器,下面我會分析其具體的在 JDK 中的實現,這裡又到了 Doug Lea 表演時間了。
BlockingQueue 實現之 ArrayBlockingQueue
ArrayBlockingQueue 是 BlockingQueue 接口的有界隊列實現類,底層採用數組來實現。
其併發控制採用可重入鎖來控制,不管是插入操作還是讀取操作,都需要獲取到鎖才能進行操作。
如果讀者看過我之前寫的《一行一行源碼分析清楚 AbstractQueuedSynchronizer(二)》 的關於 Condition 的文章的話,那麼你一定能很容易看懂 ArrayBlockingQueue 的源碼,它採用一個 ReentrantLock 和相應的兩個 Condition 來實現。
ArrayBlockingQueue 共有以下幾個屬性:
- // 用於存放元素的數組
- final Object[] items;
- // 下一次讀取操作的位置
- int takeIndex;
- // 下一次寫入操作的位置
- int putIndex;
- // 隊列中的元素數量
- int count;
- // 以下幾個就是控制併發用的同步器
- final ReentrantLock lock;
- private final Condition notEmpty;
- private final Condition notFull;
我們用個示意圖來描述其同步機制:
ArrayBlockingQueue 實現併發同步的原理就是,讀操作和寫操作都需要獲取到 AQS 獨佔鎖才能進行操作。如果隊列為空,這個時候讀操作的線程進入到讀線程隊列排隊,等待寫線程寫入新的元素,然後喚醒讀線程隊列的第一個等待線程。如果隊列已滿,這個時候寫操作的線程進入到寫線程隊列排隊,等待讀線程將隊列元素移除騰出空間,然後喚醒寫線程隊列的第一個等待線程。
對於 ArrayBlockingQueue,我們可以在構造的時候指定以下三個參數:
- 隊列容量,其限制了隊列中最多允許的元素個數;
- 指定獨佔鎖是公平鎖還是非公平鎖。非公平鎖的吞吐量比較高,公平鎖可以保證每次都是等待最久的線程獲取到鎖;
- 可以指定用一個集合來初始化,將此集合中的元素在構造方法期間就先添加到隊列中。
更具體的源碼我就不進行分析了,因為它就是 AbstractQueuedSynchronizer 中 Condition 的使用,感興趣的讀者請看我寫的《一行一行源碼分析清楚 AbstractQueuedSynchronizer(二)》,因為只要看懂了那篇文章,ArrayBlockingQueue 的代碼就沒有分析的必要了,當然,如果你完全不懂 Condition,那麼基本上也就可以說看不懂 ArrayBlockingQueue 的源碼了。
BlockingQueue 實現之 LinkedBlockingQueue
底層基於單向鏈表實現的阻塞隊列,可以當做無界隊列也可以當做有界隊列來使用。看構造方法:
- // 傳說中的無界隊列
- public LinkedBlockingQueue() {
- this(Integer.MAX_VALUE);
- }
- // 傳說中的有界隊列
- public LinkedBlockingQueue(int capacity) {
- if (capacity <= 0) throw new IllegalArgumentException();
- this.capacity = capacity;
- last = head = new Node
(null); - }
我們看看這個類有哪些屬性:
- // 隊列容量
- private final int capacity;
- // 隊列中的元素數量
- private final AtomicInteger count = new AtomicInteger(0);
- // 隊頭
- private transient Node
head; - // 隊尾
- private transient Node
last; - // take, poll, peek 等讀操作的方法需要獲取到這個鎖
- private final ReentrantLock takeLock = new ReentrantLock();
- // 如果讀操作的時候隊列是空的,那麼等待 notEmpty 條件
- private final Condition notEmpty = takeLock.newCondition();
- // put, offer 等寫操作的方法需要獲取到這個鎖
- private final ReentrantLock putLock = new ReentrantLock();
- // 如果寫操作的時候隊列是滿的,那麼等待 notFull 條件
- private final Condition notFull = putLock.newCondition();
這裡用了兩個鎖,兩個 Condition,簡單介紹如下:
takeLock 和 notEmpty 怎麼搭配:如果要獲取(take)一個元素,需要獲取 takeLock 鎖,但是獲取了鎖還不夠,如果隊列此時為空,還需要隊列不為空(notEmpty)這個條件(Condition)。
putLock 需要和 notFull 搭配:如果要插入(put)一個元素,需要獲取 putLock 鎖,但是獲取了鎖還不夠,如果隊列此時已滿,還需要隊列不是滿的(notFull)這個條件(Condition)。
首先,這裡用一個示意圖來看看 LinkedBlockingQueue 的併發讀寫控制,然後再開始分析源碼:
看懂這個示意圖,源碼也就簡單了,讀操作是排好隊的,寫操作也是排好隊的,唯一的併發問題在於一個寫操作和一個讀操作同時進行,只要控制好這個就可以了。
先上構造方法:
- public LinkedBlockingQueue(int capacity) {
- if (capacity <= 0) throw new IllegalArgumentException();
- this.capacity = capacity;
- last = head = new Node
(null); - }
注意,這裡會初始化一個空的頭結點,那麼第一個元素入隊的時候,隊列中就會有兩個元素。讀取元素時,也總是獲取頭節點後面的一個節點。count 的計數值不包括這個頭節點。
我們來看下 put 方法是怎麼將元素插入到隊尾的:
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- // 如果你糾結這裡為什麼是 -1,可以看看 offer 方法。這就是個標識成功、失敗的標誌而已。
- int c = -1;
- Node
node = new Node(e); - final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- // 必須要獲取到 putLock 才可以進行插入操作
- putLock.lockInterruptibly();
- try {
- // 如果隊列滿,等待 notFull 的條件滿足。
- while (count.get() == capacity) {
- notFull.await();
- }
- // 入隊
- enqueue(node);
- // count 原子加 1,c 還是加 1 前的值
- c = count.getAndIncrement();
- // 如果這個元素入隊後,還有至少一個槽可以使用,調用 notFull.signal() 喚醒等待線程。
- // 哪些線程會等待在 notFull 這個 Condition 上呢?
- if (c + 1 < capacity)
- notFull.signal();
- } finally {
- // 入隊後,釋放掉 putLock
- putLock.unlock();
- }
- // 如果 c == 0,那麼代表隊列在這個元素入隊前是空的(不包括head空節點),
- // 那麼所有的讀線程都在等待 notEmpty 這個條件,等待喚醒,這裡做一次喚醒操作
- if (c == 0)
- signalNotEmpty();
- }
- // 入隊的代碼非常簡單,就是將 last 屬性指向這個新元素,並且讓原隊尾的 next 指向這個元素
- // 這裡入隊沒有併發問題,因為只有獲取到 putLock 獨佔鎖以後,才可以進行此操作
- private void enqueue(Node
node) { - // assert putLock.isHeldByCurrentThread();
- // assert last.next == null;
- last = last.next = node;
- }
- // 元素入隊後,如果需要,調用這個方法喚醒讀線程來讀
- private void signalNotEmpty() {
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
我們再看看 take 方法:
- public E take() throws InterruptedException {
- E x;
- int c = -1;
- final AtomicInteger count = this.count;
- final ReentrantLock takeLock = this.takeLock;
- // 首先,需要獲取到 takeLock 才能進行出隊操作
- takeLock.lockInterruptibly();
- try {
- // 如果隊列為空,等待 notEmpty 這個條件滿足再繼續執行
- while (count.get() == 0) {
- notEmpty.await();
- }
- // 出隊
- x = dequeue();
- // count 進行原子減 1
- c = count.getAndDecrement();
- // 如果這次出隊後,隊列中至少還有一個元素,那麼調用 notEmpty.signal() 喚醒其他的讀線程
- if (c > 1)
- notEmpty.signal();
- } finally {
- // 出隊後釋放掉 takeLock
- takeLock.unlock();
- }
- // 如果 c == capacity,那麼說明在這個 take 方法發生的時候,隊列是滿的
- // 既然出隊了一個,那麼意味著隊列不滿了,喚醒寫線程去寫
- if (c == capacity)
- signalNotFull();
- return x;
- }
- // 取隊頭,出隊
- private E dequeue() {
- // assert takeLock.isHeldByCurrentThread();
- // assert head.item == null;
- // 之前說了,頭結點是空的
- Node
h = head; - Node
first = h.next; - h.next = h; // help GC
- // 設置這個為新的頭結點
- head = first;
- E x = first.item;
- first.item = null;
- return x;
- }
- // 元素出隊後,如果需要,調用這個方法喚醒寫線程來寫
- private void signalNotFull() {
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- }
源碼分析就到這裡結束了吧,畢竟還是比較簡單的源碼,基本上只要讀者認真點都看得懂。
BlockingQueue 實現之 SynchronousQueue
它是一個特殊的隊列,它的名字其實就蘊含了它的特徵 - - 同步的隊列。為什麼說是同步的呢?這裡說的並不是多線程的併發問題,而是因為當一個線程往隊列中寫入一個元素時,寫入操作不會立即返回,需要等待另一個線程來將這個元素拿走;同理,當一個讀線程做讀操作的時候,同樣需要一個相匹配的寫線程的寫操作。這裡的 Synchronous 指的就是讀線程和寫線程需要同步,一個讀線程匹配一個寫線程。
我們比較少使用到 SynchronousQueue 這個類,不過它在線程池的實現類 ScheduledThreadPoolExecutor 中得到了應用,感興趣的讀者可以在看完這個後去看看相應的使用。
雖然上面我說了隊列,但是 SynchronousQueue 的隊列其實是虛的,其不提供任何空間(一個都沒有)來存儲元素。數據必須從某個寫線程交給某個讀線程,而不是寫到某個隊列中等待被消費。
你不能在 SynchronousQueue 中使用 peek 方法(在這裡這個方法直接返回 null),peek 方法的語義是隻讀取不移除,顯然,這個方法的語義是不符合 SynchronousQueue 的特徵的。SynchronousQueue 也不能被迭代,因為根本就沒有元素可以拿來迭代的。雖然 SynchronousQueue 間接地實現了 Collection 接口,但是如果你將其當做 Collection 來用的話,那麼集合是空的。當然,這個類也是不允許傳遞 null 值的(併發包中的容器類好像都不支持插入 null 值,因為 null 值往往用作其他用途,比如用於方法的返回值代表操作失敗)。
接下來,我們來看看具體的源碼實現吧,它的源碼不是很簡單的那種,我們需要先搞清楚它的設計思想。
源碼加註釋大概有 1200 行,我們先看大框架:
- // 構造時,我們可以指定公平模式還是非公平模式,區別之後再說
- public SynchronousQueue(boolean fair) {
- transferer = fair ? new TransferQueue() : new TransferStack();
- }
- abstract static class Transferer {
- // 從方法名上大概就知道,這個方法用於轉移元素,從生產者手上轉到消費者手上
- // 也可以被動地,消費者調用這個方法來從生產者手上取元素
- // 第一個參數 e 如果不是 null,代表場景為:將元素從生產者轉移給消費者
- // 如果是 null,代表消費者等待生產者提供元素,然後返回值就是相應的生產者提供的元素
- // 第二個參數代表是否設置超時,如果設置超時,超時時間是第三個參數的值
- // 返回值如果是 null,代表超時,或者中斷。具體是哪個,可以通過檢測中斷狀態得到。
- abstract Object transfer(Object e, boolean timed, long nanos);
- }
Transferer 有兩個內部實現類,是因為構造 SynchronousQueue 的時候,我們可以指定公平策略。公平模式意味著,所有的讀寫線程都遵守先來後到,FIFO 嘛,對應 TransferQueue。而非公平模式則對應 TransferStack。
我們先採用公平模式分析源碼,然後再說說公平模式和非公平模式的區別。
接下來,我們看看 put 方法和 take 方法:
- // 寫入值
- public void put(E o) throws InterruptedException {
- if (o == null) throw new NullPointerException();
- if (transferer.transfer(o, false, 0) == null) { // 1
- Thread.interrupted();
- throw new InterruptedException();
- }
- }
- // 讀取值並移除
- public E take() throws InterruptedException {
- Object e = transferer.transfer(null, false, 0); // 2
- if (e != null)
- return (E)e;
- Thread.interrupted();
- throw new InterruptedException();
- }
我們看到,寫操作 put(E o) 和讀操作 take() 都是調用 Transferer.transfer(…) 方法,區別在於第一個參數是否為 null 值。
我們來看看 transfer 的設計思路,其基本算法如下:
- 當調用這個方法時,如果隊列是空的,或者隊列中的節點和當前的線程操作類型一致(如當前操作是 put 操作,而隊列中的元素也都是寫線程)。這種情況下,將當前線程加入到等待隊列即可。
- 如果隊列中有等待節點,而且與當前操作可以匹配(如隊列中都是讀操作線程,當前線程是寫操作線程,反之亦然)。這種情況下,匹配等待隊列的隊頭,出隊,返回相應數據。
其實這裡有個隱含的條件被滿足了,隊列如果不為空,肯定都是同種類型的節點,要麼都是讀操作,要麼都是寫操作。這個就要看到底是讀線程積壓了,還是寫線程積壓了。
我們可以假設出一個男女配對的場景:一個男的過來,如果一個人都沒有,那麼他需要等待;如果發現有一堆男的在等待,那麼他需要排到隊列後面;如果發現是一堆女的在排隊,那麼他直接牽走隊頭的那個女的。
既然這裡說到了等待隊列,我們先看看其實現,也就是 QNode:
- static final class QNode {
- volatile QNode next; // 可以看出來,等待隊列是單向鏈表
- volatile Object item; // CAS'ed to or from null
- volatile Thread waiter; // 將線程對象保存在這裡,用於掛起和喚醒
- final boolean isData; // 用於判斷是寫線程節點(isData == true),還是讀線程節點
- QNode(Object item, boolean isData) {
- this.item = item;
- this.isData = isData;
- }
- ......
相信說了這麼多以後,我們再來看 transfer 方法的代碼就輕鬆多了。
- /**
- * Puts or takes an item.
- */
- Object transfer(Object e, boolean timed, long nanos) {
- QNode s = null; // constructed/reused as needed
- boolean isData = (e != null);
- for (;;) {
- QNode t = tail;
- QNode h = head;
- if (t == null || h == null) // saw uninitialized value
- continue; // spin
- // 隊列空,或隊列中節點類型和當前節點一致,
- // 即我們說的第一種情況,將節點入隊即可。讀者要想著這塊 if 裡面方法其實就是入隊
- if (h == t || t.isData == isData) { // empty or same-mode
- QNode tn = t.next;
- // t != tail 說明剛剛有節點入隊,continue 即可
- if (t != tail) // inconsistent read
- continue;
- // 有其他節點入隊,但是 tail 還是指向原來的,此時設置 tail 即可
- if (tn != null) { // lagging tail
- // 這個方法就是:如果 tail 此時為 t 的話,設置為 tn
- advanceTail(t, tn);
- continue;
- }
- //
- if (timed && nanos <= 0) // can't wait
- return null;
- if (s == null)
- s = new QNode(e, isData);
- // 將當前節點,插入到 tail 的後面
- if (!t.casNext(null, s)) // failed to link in
- continue;
- // 將當前節點設置為新的 tail
- advanceTail(t, s); // swing tail and wait
- // 看到這裡,請讀者先往下滑到這個方法,看完了以後再回來這裡,思路也就不會斷了
- Object x = awaitFulfill(s, e, timed, nanos);
- // 到這裡,說明之前入隊的線程被喚醒了,準備往下執行
- if (x == s) { // wait was cancelled
- clean(t, s);
- return null;
- }
- if (!s.isOffList()) { // not already unlinked
- advanceHead(t, s); // unlink if head
- if (x != null) // and forget fields
- s.item = s;
- s.waiter = null;
- }
- return (x != null) ? x : e;
- // 這裡的 else 分支就是上面說的第二種情況,有相應的讀或寫相匹配的情況
- } else { // complementary-mode
- QNode m = h.next; // node to fulfill
- if (t != tail || m == null || h != head)
- continue; // inconsistent read
- Object x = m.item;
- if (isData == (x != null) || // m already fulfilled
- x == m || // m cancelled
- !m.casItem(x, e)) { // lost CAS
- advanceHead(h, m); // dequeue and retry
- continue;
- }
- advanceHead(h, m); // successfully fulfilled
- LockSupport.unpark(m.waiter);
- return (x != null) ? x : e;
- }
- }
- }
- void advanceTail(QNode t, QNode nt) {
- if (tail == t)
- UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
- }
- // 自旋或阻塞,直到滿足條件,這個方法返回
- Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
- long lastTime = timed ? System.nanoTime() : 0;
- Thread w = Thread.currentThread();
- // 判斷需要自旋的次數,
- int spins = ((head.next == s) ?
- (timed ? maxTimedSpins : maxUntimedSpins) : 0);
- for (;;) {
- // 如果被中斷了,那麼取消這個節點
- if (w.isInterrupted())
- // 就是將當前節點 s 中的 item 屬性設置為 this
- s.tryCancel(e);
- Object x = s.item;
- // 這裡是這個方法的唯一的出口
- if (x != e)
- return x;
- // 如果需要,檢測是否超時
- if (timed) {
- long now = System.nanoTime();
- nanos -= now - lastTime;
- lastTime = now;
- if (nanos <= 0) {
- s.tryCancel(e);
- continue;
- }
- }
- if (spins > 0)
- --spins;
- // 如果自旋達到了最大的次數,那麼檢測
- else if (s.waiter == null)
- s.waiter = w;
- // 如果自旋到了最大的次數,那麼線程掛起,等待喚醒
- else if (!timed)
- LockSupport.park(this);
- // spinForTimeoutThreshold 這個之前講 AQS 的時候其實也說過,剩餘時間小於這個閾值的時候,就
- // 不要進行掛起了,自旋的性能會比較好
- else if (nanos > spinForTimeoutThreshold)
- LockSupport.parkNanos(this, nanos);
- }
- }
Doug Lea 的巧妙之處在於,將各個代碼湊在了一起,使得代碼非常簡潔,當然也同時增加了我們的閱讀負擔,看代碼的時候,還是得仔細想想各種可能的情況。
下面,再說說前面說的公平模式和非公平模式的區別。
相信大家心裡面已經有了公平模式的工作流程的概念了,我就簡單說說 TransferStack 的算法,就不分析源碼了。
- 當調用這個方法時,如果隊列是空的,或者隊列中的節點和當前的線程操作類型一致(如當前操作是 put 操作,而棧中的元素也都是寫線程)。這種情況下,將當前線程加入到等待棧中,等待配對。然後返回相應的元素,或者如果被取消了的話,返回 null。
- 如果棧中有等待節點,而且與當前操作可以匹配(如棧裡面都是讀操作線程,當前線程是寫操作線程,反之亦然)。將當前節點壓入棧頂,和棧中的節點進行匹配,然後將這兩個節點出棧。配對和出棧的動作其實也不是必須的,因為下面的一條會執行同樣的事情。
- 如果棧頂是進行匹配而入棧的節點,幫助其進行匹配並出棧,然後再繼續操作。
應該說,TransferStack 的源碼要比 TransferQueue 的複雜一些,如果讀者感興趣,請自行進行源碼閱讀。
BlockingQueue 實現之 PriorityBlockingQueue
帶排序的 BlockingQueue 實現,其併發控制採用的是 ReentrantLock,隊列為無界隊列(ArrayBlockingQueue 是有界隊列,LinkedBlockingQueue 也可以通過在構造函數中傳入 capacity 指定隊列最大的容量,但是 PriorityBlockingQueue 只能指定初始的隊列大小,後面插入元素的時候,如果空間不夠的話會自動擴容)。
簡單地說,它就是 PriorityQueue 的線程安全版本。不可以插入 null 值,同時,插入隊列的對象必須是可比較大小的(comparable),否則報 ClassCastException 異常。它的插入操作 put 方法不會 block,因為它是無界隊列(take 方法在隊列為空的時候會阻塞)。
它的源碼相對比較簡單,本節將介紹其核心源碼部分。
我們來看看它有哪些屬性:
- // 構造方法中,如果不指定大小的話,默認大小為 11
- private static final int DEFAULT_INITIAL_CAPACITY = 11;
- // 數組的最大容量
- private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
- // 這個就是存放數據的數組
- private transient Object[] queue;
- // 隊列當前大小
- private transient int size;
- // 大小比較器,如果按照自然序排序,那麼此屬性可設置為 null
- private transient Comparator super E> comparator;
- // 併發控制所用的鎖,所有的 public 且涉及到線程安全的方法,都必須先獲取到這個鎖
- private final ReentrantLock lock;
- // 這個很好理解,其實例由上面的 lock 屬性創建
- private final Condition notEmpty;
- // 這個也是用於鎖,用於數組擴容的時候,需要先獲取到這個鎖,才能進行擴容操作
- // 其使用 CAS 操作
- private transient volatile int allocationSpinLock;
- // 用於序列化和反序列化的時候用,對於 PriorityBlockingQueue 我們應該比較少使用到序列化
- private PriorityQueue q;
此類實現了 Collection 和 Iterator 接口中的所有接口方法,對其對象進行迭代並遍歷時,不能保證有序性。如果你想要實現有序遍歷,建議採用 Arrays.sort(queue.toArray()) 進行處理。PriorityBlockingQueue 提供了 drainTo 方法用於將部分或全部元素有序地填充(準確說是轉移,會刪除原隊列中的元素)到另一個集合中。還有一個需要說明的是,如果兩個對象的優先級相同(compare 方法返回 0),此隊列並不保證它們之間的順序。
PriorityBlockingQueue 使用了基於數組的二叉堆來存放元素,所有的 public 方法採用同一個 lock 進行併發控制。
二叉堆:一顆完全二叉樹,它非常適合用數組進行存儲,對於數組中的元素 a[i],其左子節點為 a[2i+1],其右子節點為 a[2\i + 2],其父節點為 a[(i-1)/2],其堆序性質為,每個節點的值都小於其左右子節點的值。二叉堆中最小的值就是根節點,但是刪除根節點是比較麻煩的,因為需要調整樹。
簡單用個圖解釋一下二叉堆,我就不說太多專業的嚴謹的術語了,這種數據結構的優點是一目瞭然的,最小的元素一定是根元素,它是一棵滿的樹,除了最後一層,最後一層的節點從左到右緊密排列。
下面開始 PriorityBlockingQueue 的源碼分析,首先我們來看看構造方法:
- // 默認構造方法,採用默認值(11)來進行初始化
- public PriorityBlockingQueue() {
- this(DEFAULT_INITIAL_CAPACITY, null);
- }
- // 指定數組的初始大小
- public PriorityBlockingQueue(int initialCapacity) {
- this(initialCapacity, null);
- }
- // 指定比較器
- public PriorityBlockingQueue(int initialCapacity,
- Comparator super E> comparator) {
- if (initialCapacity < 1)
- throw new IllegalArgumentException();
- this.lock = new ReentrantLock();
- this.notEmpty = lock.newCondition();
- this.comparator = comparator;
- this.queue = new Object[initialCapacity];
- }
- // 在構造方法中就先填充指定的集合中的元素
- public PriorityBlockingQueue(Collection extends E> c) {
- this.lock = new ReentrantLock();
- this.notEmpty = lock.newCondition();
- //
- boolean heapify = true; // true if not known to be in heap order
- boolean screen = true; // true if must screen for nulls
- if (c instanceof SortedSet>) {
- SortedSet extends E> ss = (SortedSet extends E>) c;
- this.comparator = (Comparator super E>) ss.comparator();
- heapify = false;
- }
- else if (c instanceof PriorityBlockingQueue>) {
- PriorityBlockingQueue extends E> pq =
- (PriorityBlockingQueue extends E>) c;
- this.comparator = (Comparator super E>) pq.comparator();
- screen = false;
- if (pq.getClass() == PriorityBlockingQueue.class) // exact match
- heapify = false;
- }
- Object[] a = c.toArray();
- int n = a.length;
- // If c.toArray incorrectly doesn't return Object[], copy it.
- if (a.getClass() != Object[].class)
- a = Arrays.copyOf(a, n, Object[].class);
- if (screen && (n == 1 || this.comparator != null)) {
- for (int i = 0; i < n; ++i)
- if (a[i] == null)
- throw new NullPointerException();
- }
- this.queue = a;
- this.size = n;
- if (heapify)
- heapify();
- }
接下來,我們來看看其內部的自動擴容實現:
- private void tryGrow(Object[] array, int oldCap) {
- // 這邊做了釋放鎖的操作
- lock.unlock(); // must release and then re-acquire main lock
- Object[] newArray = null;
- // 用 CAS 操作將 allocationSpinLock 由 0 變為 1,也算是獲取鎖
- if (allocationSpinLock == 0 &&
- UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
- 0, 1)) {
- try {
- // 如果節點個數小於 64,那麼增加的 oldCap + 2 的容量
- // 如果節點數大於等於 64,那麼增加 oldCap 的一半
- // 所以節點數較小時,增長得快一些
- int newCap = oldCap + ((oldCap < 64) ?
- (oldCap + 2) :
- (oldCap >> 1));
- // 這裡有可能溢出
- if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
- int minCap = oldCap + 1;
- if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
- throw new OutOfMemoryError();
- newCap = MAX_ARRAY_SIZE;
- }
- // 如果 queue != array,那麼說明有其他線程給 queue 分配了其他的空間
- if (newCap > oldCap && queue == array)
- // 分配一個新的大數組
- newArray = new Object[newCap];
- } finally {
- // 重置,也就是釋放鎖
- allocationSpinLock = 0;
- }
- }
- // 如果有其他的線程也在做擴容的操作
- if (newArray == null) // back off if another thread is allocating
- Thread.yield();
- // 重新獲取鎖
- lock.lock();
- // 將原來數組中的元素複製到新分配的大數組中
- if (newArray != null && queue == array) {
- queue = newArray;
- System.arraycopy(array, 0, newArray, 0, oldCap);
- }
- }
擴容方法對併發的控制也非常的巧妙,釋放了原來的獨佔鎖 lock,這樣的話,擴容操作和讀操作可以同時進行,提高吞吐量。
下面,我們來分析下寫操作 put 方法和讀操作 take 方法。
- public void put(E e) {
- // 直接調用 offer 方法,因為前面我們也說了,在這裡,put 方法不會阻塞
- offer(e);
- }
- public boolean offer(E e) {
- if (e == null)
- throw new NullPointerException();
- final ReentrantLock lock = this.lock;
- // 首先獲取到獨佔鎖
- lock.lock();
- int n, cap;
- Object[] array;
- // 如果當前隊列中的元素個數 >= 數組的大小,那麼需要擴容了
- while ((n = size) >= (cap = (array = queue).length))
- tryGrow(array, cap);
- try {
- Comparator super E> cmp = comparator;
- // 節點添加到二叉堆中
- if (cmp == null)
- siftUpComparable(n, e, array);
- else
- siftUpUsingComparator(n, e, array, cmp);
- // 更新 size
- size = n + 1;
- // 喚醒等待的讀線程
- notEmpty.signal();
- } finally {
- lock.unlock();
- }
- return true;
- }
對於二叉堆而言,插入一個節點是簡單的,插入的節點如果比父節點小,交換它們,然後繼續和父節點比較。
- // 這個方法就是將數據 x 插入到數組 array 的位置 k 處,然後再調整樹
- private static
void siftUpComparable(int k, T x, Object[] array) { - Comparable super T> key = (Comparable super T>) x;
- while (k > 0) {
- // 二叉堆中 a[k] 節點的父節點位置
- int parent = (k - 1) >>> 1;
- Object e = array[parent];
- if (key.compareTo((T) e) >= 0)
- break;
- array[k] = e;
- k = parent;
- }
- array[k] = key;
- }
我們用圖來示意一下,我們接下來要將 11 插入到隊列中,看看 siftUp 是怎麼操作的。
我們再看看 take 方法:
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- // 獨佔鎖
- lock.lockInterruptibly();
- E result;
- try {
- // dequeue 出隊
- while ( (result = dequeue()) == null)
- notEmpty.await();
- } finally {
- lock.unlock();
- }
- return result;
- }
- private E dequeue() {
- int n = size - 1;
- if (n < 0)
- return null;
- else {
- Object[] array = queue;
- // 隊頭,用於返回
- E result = (E) array[0];
- // 隊尾元素先取出
- E x = (E) array[n];
- // 隊尾置空
- array[n] = null;
- Comparator super E> cmp = comparator;
- if (cmp == null)
- siftDownComparable(0, x, array, n);
- else
- siftDownUsingComparator(0, x, array, n, cmp);
- size = n;
- return result;
- }
- }
dequeue 方法返回隊頭,並調整二叉堆的樹,調用這個方法必須先獲取獨佔鎖。
廢話不多說,出隊是非常簡單的,因為隊頭就是最小的元素,對應的是數組的第一個元素。難點是隊頭出隊後,需要調整樹。
- private static
void siftDownComparable(int k, T x, Object[] array, - int n) {
- if (n > 0) {
- Comparable super T> key = (Comparable super T>)x;
- // 這裡得到的 half 肯定是非葉節點
- // a[n] 是最後一個元素,其父節點是 a[(n-1)/2]。所以 n >>> 1 代表的節點肯定不是葉子節點
- // 下面,我們結合圖來一行行分析,這樣比較直觀簡單
- // 此時 k 為 0, x 為 17,n 為 9
- int half = n >>> 1; // 得到 half = 4
- while (k < half) {
- // 先取左子節點
- int child = (k << 1) + 1; // 得到 child = 1
- Object c = array[child]; // c = 12
- int right = child + 1; // right = 2
- // 如果右子節點存在,而且比左子節點小
- // 此時 array[right] = 20,所以條件不滿足
- if (right < n &&
- ((Comparable super T>) c).compareTo((T) array[right]) > 0)
- c = array[child = right];
- // key = 17, c = 12,所以條件不滿足
- if (key.compareTo((T) c) <= 0)
- break;
- // 把 12 填充到根節點
- array[k] = c;
- // k 賦值後為 1
- k = child;
- // 一輪過後,我們發現,12 左邊的子樹和剛剛的差不多,都是缺少根節點,接下來處理就簡單了
- }
- array[k] = key;
- }
- }
記住二叉堆是一棵完全二叉樹,那麼根節點 10 拿掉後,最後面的元素 17 必須找到合適的地方放置。首先,17 和 10 不能直接交換,那麼先將根節點 10 的左右子節點中較小的節點往上滑,即 12 往上滑,然後原來 12 留下了一個空節點,然後再把這個空節點的較小的子節點往上滑,即 13 往上滑,最後,留出了位子,17 補上即可。
我稍微調整下這個樹,以便讀者能更明白:
好了, PriorityBlockingQueue 我們也說完了。
總結
我知道本文過長,相信一字不漏看完的讀者肯定是少數。那就給點福利吧,關注筆者,私信回覆“資料”感謝閱讀。
ArrayBlockingQueue 底層是數組,有界隊列,如果我們要使用生產者-消費者模式,這是非常好的選擇。
LinkedBlockingQueue 底層是鏈表,可以當做無界和有界隊列來使用,所以大家不要以為它就是無界隊列。
SynchronousQueue 本身不帶有空間來存儲任何元素,使用上可以選擇公平模式和非公平模式。
PriorityBlockingQueue 是無界隊列,基於數組,數據結構為二叉堆,數組第一個也是樹的根節點總是最小值
閱讀更多 JavaSpring高級進階 的文章