併發隊列之PriorityBlockingQueue

這一篇說一下PriorityBlockingQueue,引用書中的一句話:這就是帶優先級的無界阻塞隊列,每次出隊都返回優先級最高或者最低的元素(這裡規則可以自己制定),內部是使用平衡二叉樹實現的,遍歷不保證有序;

其實也比較容易,就是基於數組實現的一個平衡二叉樹,不瞭解平衡二叉樹的可以先了解一下,別想的太難,原理跟鏈表差不多,只不過鏈表中指向下一個節點的只有一個,而平衡二叉樹中有兩個,一個左,一個右,還有左邊的節點的值小於當前節點的值,右邊節點的值大於當前節點的值;看看平衡二叉樹的增刪改查即可;

一.認識PriorityBlockingQueue

底層是以數組實現的,我們看看幾個重要的屬性:

<code>//隊列默認初始化容量private static final int DEFAULT_INITIAL_CAPACITY = 11;//數組最大容量private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;//底層實現還是數組private transient Object[] queue;//隊列容量private transient int size;//一個比較器,比較元素大小private transient Comparator super E> comparator;//一個獨佔鎖,控制同時只有一個線程在入隊和出隊private final ReentrantLock lock;//如果隊列是空的,還有線程來隊列取數據,就阻塞//這裡只有一個條件變量,因為這個隊列是無界的,向隊列中插入數據的話就用CAS操作就行了private final Condition notEmpty;//一個自旋鎖,CAS使得同時只有一個線程可以進行擴容,0表示沒有進行擴容,1表示正在進行擴容private transient volatile int allocationSpinLock;/<code>

簡單看看構造器:

<code>//默認數組大小是11public PriorityBlockingQueue() {    this(DEFAULT_INITIAL_CAPACITY, null);}//可以指定數組大小public PriorityBlockingQueue(int initialCapacity) {    this(initialCapacity, null);}//初始化數組、鎖、條件變量還有比較器public PriorityBlockingQueue(int initialCapacity,                                Comparator super E> comparator) {    if (initialCapacity < 1)        throw new IllegalArgumentException();    this.lock = new ReentrantLock();    this.notEmpty = lock.newCondition();    this.comparator = comparator;    this.queue = new Object[initialCapacity];}//這個構造器也可以傳入一個集合public PriorityBlockingQueue(Collection extends E> c) {    this.lock = new ReentrantLock();    this.notEmpty = lock.newCondition();    boolean heapify = true; // true if not known to be in heap order    boolean screen = true;  // true if must screen for nulls    if (c instanceof SortedSet>) {        SortedSet extends E> ss = (SortedSet extends E>) c;        this.comparator = (Comparator super E>) ss.comparator();        heapify = false;    }    else if (c instanceof PriorityBlockingQueue>) {        PriorityBlockingQueue extends E> pq =            (PriorityBlockingQueue extends E>) c;        this.comparator = (Comparator super E>) pq.comparator();        screen = false;        if (pq.getClass() == PriorityBlockingQueue.class) // exact match            heapify = false;    }    Object[] a = c.toArray();    int n = a.length;    // If c.toArray incorrectly doesn't return Object[], copy it.    if (a.getClass() != Object[].class)        a = Arrays.copyOf(a, n, Object[].class);    if (screen && (n == 1 || this.comparator != null)) {        for (int i = 0; i < n; ++i)            if (a[i] == null)                throw new NullPointerException();    }    this.queue = a;    this.size = n;    if (heapify)        heapify();}/<code> 

有興趣的可以看看下面這個圖,說的更詳細,個人覺得看重要的地方就行了;

併發隊列之PriorityBlockingQueue

二.offer方法

在隊列中插入一個元素,由於是無界隊列,所以一直返回true;

<code>public boolean offer(E e) {    //如果傳入的是null,就拋異常    if (e == null)        throw new NullPointerException();    final ReentrantLock lock = this.lock;    //獲取鎖    lock.lock();    int n, cap;    Object[] array;    //[1]當前數組中實際數據總數>=數組容量,就進行擴容    while ((n = size) >= (cap = (array = queue).length))        //擴容        tryGrow(array, cap);    try {        Comparator super E> cmp = comparator;        //[2]默認比較器為空時        if (cmp == null)            siftUpComparable(n, e, array);        else        //[3]默認比較器不為空就用我們傳進去的默認比較器            siftUpUsingComparator(n, e, array, cmp);        //數組實際數量加一        size = n + 1;        //喚醒notEmpty條件隊列中的線程        notEmpty.signal();    } finally {        //釋放鎖        lock.unlock();    }    return true;}/<code>

上面的代碼中,我們就關注那三個地方就行了,首先是[1]中擴容:

<code>private void tryGrow(Object[] array, int oldCap) {    //首先釋放獲取的鎖,這裡不釋放也行,只是擴容有的時候很慢,需要花時間,此時入隊和出隊操作就不能進行了,極大地降低了併發性    lock.unlock(); // must release and then re-acquire main lock    Object[] newArray = null;    //自旋鎖為0表示隊列此時沒有進行擴容,然後用CAS將自旋鎖從0該為1    if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {        try {            //用這個算法確定擴容後的數組容量,可以看到如果當前數組容量小於64,新數組容量就是2n+2,大於64,新的容量就是3n/2            int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));            //判斷新的數組容量是不是超過了最大容量,如果超過了,就嘗試在老的數組容量加一,如果還是大於最大容量,就拋異常了            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow                int minCap = oldCap + 1;                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)                    throw new OutOfMemoryError();                newCap = MAX_ARRAY_SIZE;            }            if (newCap > oldCap && queue == array)                newArray = new Object[newCap];        } finally {            //擴容完畢就將自旋鎖變為0            allocationSpinLock = 0;        }    }    //第一個線程在上面的if中執行CAS成功之後,第二個線程就會到這裡,然後執行yield方法讓出CPU,儘量讓第一個線程執行完畢;    if (newArray == null) // back off if another thread is allocating        Thread.yield();    //前面釋放鎖了,這裡要獲取鎖    lock.lock();    //將原來的數組中的元素複製到新數組中    if (newArray != null && queue == array) {        queue = newArray;        System.arraycopy(array, 0, newArray, 0, oldCap);    }}/<code> 

再看[2]中的默認的比較器:

<code>//這裡k表示數組中實際數量,x表示要插入到數組中的數據,array表示存放數據的數組private static  void siftUpComparable(int k, T x, Object[] array) {    //由此可知,我們要放進數組中的數據類型,必須要是實現了Comparable接口的    Comparable super T> key = (Comparable super T>) x;    //這裡判斷數組中有沒有數據,第一次插入數據的時候,k=0,不滿足這個循環條件,那就直接走最下面設置array[0] = key    //滿足這個條件的話,首先獲取父節點的索引,然後取出值,再比較該值和需要插入值的大小,決定是跳出循環還是繼續循環    //這裡比較重要,這個循環就是不斷的調整二叉樹平衡的,下面我們畫圖看看    while (k > 0) {        int parent = (k - 1) >>> 1;        Object e = array[parent];        if (key.compareTo((T) e) >= 0)            break;        array[k] = e;        k = parent;    }    array[k] = key;}/<code>


隨便舉個例子看看怎麼把平衡二叉樹中的元素放到數組中,節點中的數據類型就以Integer了,其實就是將每一層從做到右一次放到數組中存起來,很明顯,在數組中不是從小到大的順序的;

這裡注意一點,平衡二叉樹的存放順序不是唯一的,有很多種情況,跟你的存放順序有關!

併發隊列之PriorityBlockingQueue

所以我們看看siftUpComparable方法中的while循環是怎麼進行的?假設第一次調用offer(3),也就是調用siftUpComparable(0,3,array),這裡假設array有足夠的大小,不考慮擴容,那麼第一次會走到while循環後面執行array[0]=3,下圖所示:

併發隊列之PriorityBlockingQueue

第二次調用offer(1),也就是調用siftUpComparable(1,1,array),k=1,parent=0,所以父節點此時應該是3,然後1<3,不滿足if語句,設置array[1]=3,k=0,然後繼續循環不滿足條件,執行array[0]=1,下圖所示:

併發隊列之PriorityBlockingQueue

第三次調用offer(7),也就是調用siftUpComparable(2,7,array),k=2,parent=0,父節點為索引0的位置也就是1,因為7>1滿足if語句,所以break跳出循環,執行array[2]=7,下圖所示:

併發隊列之PriorityBlockingQueue

第四次調用offer(2),也就是調用siftUpComparable(3,2,array),k=3,parent=(k-1)>>>1=1,所以父節點表示索引為1的位置,也就是3,因為2<3,不滿足if語句,所以設置array[3]=3,k=1,再進行一次循環,parent=0,此時父節點的值是1,2<3,不滿足if,設置array[1]=1,k=0;再繼續循環不滿足循環條件,跳出循環,設置array[0] = 2

併發隊列之PriorityBlockingQueue

還是很容易的,有興趣的話再多試試添加幾個節點啊!其實還有[3]中使用我們自定義的比較器進行比較,其實i和上面代碼一樣的,另外put方法就是調用的offer方法,這裡就不多說了

三.poll方法

poll方法的作用是獲取並刪除隊列內部二叉樹的根節點,如果隊列為空,就返回nul;

<code>public E poll() {    final ReentrantLock lock = this.lock;    //獲取獨佔鎖,說明此時不能有其他線程進行入隊和出隊操作,但是可以進行擴容    lock.lock();    try {        //獲取並刪除根節點,方法如下        return dequeue();    } finally {        //釋放獨佔鎖        lock.unlock();    }}//這個方法可以好好看看,很有意思private E dequeue() {    int n = size - 1;    //如果隊列為空,就返回null    if (n < 0)        return null;    else {        //否則就先取到數組        Object[] array = queue;        //取到第0個元素,這個也就是要返回的根節點        E result = (E) array[0];        //獲取隊列實際數量的最後一個元素,並把該位置賦值為null        E x = (E) array[n];        array[n] = null;        Comparator super E> cmp = comparator;        if (cmp == null)            //默認的比較器,這裡是真正的移除根節點,然後調整在整個平衡二叉樹,使得達到平衡            siftDownComparable(0, x, array, n);        else            //我們傳入的自定義比較器            siftDownUsingComparator(0, x, array, n, cmp);        //然後數量減一        size = n;        //返回根節點        return result;    }}private static  
void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable super T> key = (Comparable super T>)x; //[1] int half = n >>> 1; // loop while a non-leaf //[2] while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; //[3] if (right < n &&((Comparable super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; //[4] if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; }}/<code>

所以我們主要的是看看siftDownComparable方法中是怎麼將一個去掉了根節點的平衡二叉樹調整平衡的;比如現在有如下所示的平衡二叉樹:

併發隊列之PriorityBlockingQueue

調用poll方法,先是把最後一個元素保存起來x=3,然後將最後一個位置設置為null,此時實際調用的是siftDownComparable(0,3,array,3),key=3,half=1,k=0,n=3,滿足[2],於是child=1,c=1,right=2,不滿足[3],不滿足[4],設置array[0]=1,k=1;繼續循環,不滿足循環條件,跳出循環,直接設置array[1]=3,最後poll方法返回的時2,下圖所示:

併發隊列之PriorityBlockingQueue

其實可以簡單的說說,最開始將數組中最後一個值X保存起來在適當時機插入到二叉樹中,什麼時候是適當時機呢?首先去掉根節點之後,得到根節點左子節點和右子節點的值leftVal和rightVal,如果X比leftVal小,那就直接把X放入到根節點的位置,整個平衡二叉樹就平衡了!如果X比leftVal大,那就將leftVal的值設置到根節點中,再以左子節點做遞歸,繼續比較X和左子節點的左節點的大小!仔細看看也沒啥。

四.take方法

這個方法作用是獲取二叉樹中的根節點,也就是數組的第一個節點,隊列為空,就阻塞;

<code>public E take() throws InterruptedException {    //獲取鎖,可中斷    final ReentrantLock lock = this.lock;    lock.lockInterruptibly();    E result;    try {        //如果二叉樹為空了,那麼dequeue方法就會返回null,然後這裡就會阻塞        while ( (result = dequeue()) == null)            notEmpty.await();    } finally {        //釋放鎖        lock.unlock();    }    return result;}//這個方法前面說過,就是刪除根節點,然後調整平衡二叉樹private E dequeue() {    int n = size - 1;    if (n < 0)        return null;    else {        Object[] array = queue;        E result = (E) array[0];        E x = (E) array[n];        array[n] = null;        Comparator super E> cmp = comparator;        if (cmp == null)            siftDownComparable(0, x, array, n);        else            siftDownUsingComparator(0, x, array, n, cmp);        size = n;        return result;    }}/<code>

五.一個簡單的例子

前面看了這個多方法,那就說說怎麼使用吧,看看PriorityBlockingQueue這個阻塞隊列怎麼使用;

<code>package com.example.demo.study;import java.util.Random;import java.util.concurrent.PriorityBlockingQueue;import lombok.Data;public class Study0208 {        @Data    static class MyTask implements Comparable<mytask>{        private int priority=0;                private String taskName;                @Override        public int compareTo(MyTask o) {            if (this.priority>o.getPriority()) {                return 1;            }            return -1;        }        }        public static void main(String[] args) {        PriorityBlockingQueue<mytask> queue = new PriorityBlockingQueue<mytask>();        Random random = new Random();        //往隊列中放是個任務,從TaskName是按照順序放進去的,優先級是隨機的        for (int i = 1; i < 11; i++) {            MyTask task = new MyTask();            task.setPriority(random.nextInt(10));            task.setTaskName("taskName"+i);            queue.offer(task);        }                //從隊列中取出任務,這裡是按照優先級去拿出來的,相當於是根據優先級做了一個排序        while(!queue.isEmpty()) {            MyTask pollTask = queue.poll();            System.out.println(pollTask.toString());        }            }}/<mytask>/<mytask>/<mytask>/<code>
併發隊列之PriorityBlockingQueue


分享到:


相關文章: