java 數據結構-隊列詳解

各位志同道合的朋友們大家好,我是一個一直在一線互聯網踩坑十餘年的編碼愛好者,現在將我們的各種經驗以及架構實戰分享出來,如果大家喜歡,就關注我,一起將技術學深學透,我會每一篇分享結束都會預告下一專題

隊列(Queue):與棧相對的一種數據結構, 集合(Collection)的一個子類。隊列允許在一端進行插入操作,而在另一端進行刪除操作的線性表,棧的特點是後進先出,而隊列的特點是先進先出。隊列的用處很大,比如實現消息隊列。

Queue 類關係圖,如下圖所示:

java 數據結構-隊列詳解

java 數據結構-隊列詳解

注:為了讓讀者更直觀地理解,上圖為精簡版的 Queue 類關係圖。本文如無特殊說明,內容都是基於 Java 1.8 版本。

隊列(Queue)

1)Queue 分類

從上圖可以看出 Queue 大體可分為以下三類。

  • 雙端隊列:雙端隊列(Deque)是 Queue 的子類也是 Queue 的補充類,頭部和尾部都支持元素插入和獲取。
  • 阻塞隊列:阻塞隊列指的是在元素操作時(添加或刪除),如果沒有成功,會阻塞等待執行。例如,當添加元素時,如果隊列元素已滿,隊列會阻塞等待直到有空位時再插入。
  • 非阻塞隊列:非阻塞隊列和阻塞隊列相反,會直接返回操作的結果,而非阻塞等待。雙端隊列也屬於非阻塞隊列。

2)Queue 方法說明

Queue 常用方法,如下圖所示:

java 數據結構-隊列詳解

java 數據結構-隊列詳解

方法說明:

  • add(E):添加元素到隊列尾部,成功返回 true
  • offer(E):添加元素到隊列尾部,成功返回 true
  • remove():刪除元素,成功返回 true,失敗返回 false
  • poll():獲取並移除此隊列的第一個元素,若隊列為空,則返回 null
  • peek():獲取但不移除此隊列的第一個元素,若隊列為空,則返回 null
  • element():獲取但不移除此隊列的第一個元素,若隊列為空,則拋異常

3)Queue 使用實例

<code>Queue<string> linkedList = new LinkedList<>();
linkedList.add("Dog");
linkedList.add("Camel");
linkedList.add("Cat");
while (!linkedList.isEmpty()) {
    System.out.println(linkedList.poll());
}/<string>/<code>
java 數據結構-隊列詳解

程序執行結果:

Dog

Camel

Cat

阻塞隊列

1)BlockingQueue

BlockingQueue 在 java.util.concurrent 包下,其他阻塞類都實現自 BlockingQueue 接口,BlockingQueue 提供了線程安全的隊列訪問方式,當向隊列中插入數據時,如果隊列已滿,線程則會阻塞等待隊列中元素被取出後再插入;當從隊列中取數據時,如果隊列為空,則線程會阻塞等待隊列中有新元素再獲取。

BlockingQueue 核心方法

插入方法:

  • add(E):添加元素到隊列尾部,成功返回 true
  • offer(E):添加元素到隊列尾部,成功返回 true
  • put(E):將元素插入到隊列的尾部,如果該隊列已滿,則一直阻塞

刪除方法:

  • remove(Object):移除指定元素,成功返回 true,失敗返回 false
  • poll(): 獲取並移除隊列的第一個元素,如果隊列為空,則返回 null
  • take():獲取並移除隊列第一個元素,如果沒有元素則一直阻塞

檢查方法:

  • peek():獲取但不移除隊列的第一個元素,若隊列為空,則返回 null

2)LinkedBlockingQueue

LinkedBlockingQueue 是一個由鏈表實現的有界阻塞隊列,容量默認值為 Integer.MAX_VALUE,也可以自定義容量,建議指定容量大小,默認大小在添加速度大於刪除速度情況下有造成內存溢出的風險,LinkedBlockingQueue 是先進先出的方式存儲元素。

3)ArrayBlockingQueue

ArrayBlockingQueue 是一個有邊界的阻塞隊列,它的內部實現是一個數組。它的容量是有限的,我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。

ArrayBlockingQueue 也是先進先出的方式存儲數據,ArrayBlockingQueue 內部的阻塞隊列是通過重入鎖 ReenterLock 和 Condition 條件隊列實現的,因此 ArrayBlockingQueue 中的元素存在公平訪問與非公平訪問的區別,對於公平訪問隊列,被阻塞的線程可以按照阻塞的先後順序訪問隊列,即先阻塞的線程先訪問隊列。而非公平隊列,當隊列可用時,阻塞的線程將進入爭奪訪問資源的競爭中,也就是說誰先搶到誰就執行,沒有固定的先後順序。

示例代碼如下:

<code>// 默認非公平阻塞隊列
ArrayBlockingQueue queue = new ArrayBlockingQueue(6);
// 公平阻塞隊列
ArrayBlockingQueue queue2 = new ArrayBlockingQueue(6,true);
 
// ArrayBlockingQueue 源碼展示
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}/<code>
java 數據結構-隊列詳解

4)DelayQueue

DelayQueue 是一個支持延時獲取元素的無界阻塞隊列,隊列中的元素必須實現 Delayed 接口,在創建元素時可以指定延遲時間,只有到達了延遲的時間之後,才能獲取到該元素。

實現了 Delayed 接口必須重寫兩個方法 ,getDelay(TimeUnit) 和 compareTo(Delayed),如下代碼所示:

<code>class DelayElement implements Delayed {
        @Override
        // 獲取剩餘時間
        public long getDelay(TimeUnit unit) {
            // do something
        }
        @Override
        // 隊列裡元素的排序依據
        public int compareTo(Delayed o) {
            // do something
        }
    }/<code>
java 數據結構-隊列詳解

DelayQueue 使用的完整示例,請參考以下代碼:

<code>public class DelayTest {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue delayQueue = new DelayQueue();
        delayQueue.put(new DelayElement(1000));
        delayQueue.put(new DelayElement(3000));
        delayQueue.put(new DelayElement(5000));
        System.out.println("開始時間:" +  DateFormat.getDateTimeInstance().format(new Date()));
        while (!delayQueue.isEmpty()){
            System.out.println(delayQueue.take());
        }
        System.out.println("結束時間:" +  DateFormat.getDateTimeInstance().format(new Date()));
    }
 
    static class DelayElement implements Delayed {
        // 延遲截止時間(單面:毫秒)
        long delayTime = System.currentTimeMillis();
        public DelayElement(long delayTime) {
            this.delayTime = (this.delayTime + delayTime);
        }
        @Override
        // 獲取剩餘時間
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        @Override
        // 隊列裡元素的排序依據
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS)                 return -1;
            } else {
                return 0;
            }
        }
        @Override
        public String toString() {
            return DateFormat.getDateTimeInstance().format(new Date(delayTime));
        }
    }
}/<code>
java 數據結構-隊列詳解

程序執行結果:

開始時間:2019-6-13 20:40:38

2019-6-13 20:40:39

2019-6-13 20:40:41

2019-6-13 20:40:43

結束時間:2019-6-13 20:40:43

非阻塞隊列

ConcurrentLinkedQueue 是一個基於鏈接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當我們添加一個元素的時候,它會添加到隊列的尾部;當我們獲取一個元素時,它會返回隊列頭部的元素。

它的入隊和出隊操作均利用 CAS(Compare And Set)更新,這樣允許多個線程併發執行,並且不會因為加鎖而阻塞線程,使得併發性能更好。

ConcurrentLinkedQueue 使用示例:

<code>ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
concurrentLinkedQueue.add("Dog");
concurrentLinkedQueue.add("Cat");
while (!concurrentLinkedQueue.isEmpty()) {
    System.out.println(concurrentLinkedQueue.poll());
}/<code>
java 數據結構-隊列詳解

執行結果:

Dog

Cat

可以看出不管是阻塞隊列還是非阻塞隊列,使用方法都是類似的,區別是底層的實現方式。

優先級隊列

PriorityQueue 一個基於優先級堆的無界優先級隊列。優先級隊列的元素按照其自然順序進行排序,或者根據構造隊列時提供的 Comparator 進行排序,具體取決於所使用的構造方法。優先級隊列不允許使用 null 元素。

PriorityQueue 代碼使用示例:

<code>Queue<integer> priorityQueue = new PriorityQueue(new Comparator<integer>() {
    @Override
    public int compare(Integer o1, Integer o2) {
        // 非自然排序,數字倒序
        return o2 - o1;
    }
});
priorityQueue.add(3);
priorityQueue.add(1);
priorityQueue.add(2);
while (!priorityQueue.isEmpty()) {
    Integer i = priorityQueue.poll();
    System.out.println(i);
}/<integer>/<integer>/<code>
java 數據結構-隊列詳解

程序執行的結果是:

3

2

1

PriorityQueue 注意的點:

  • PriorityQueue 是非線程安全的,在多線程情況下可使用 PriorityBlockingQueue 類替代;
  • PriorityQueue 不允許插入 null 元素。

相關面試題

1.ArrayBlockingQueue 和 LinkedBlockingQueue 的區別是什麼?

答:ArrayBlockingQueue 和 LinkedBlockingQueue 都實現自阻塞隊列 BlockingQueue,它們的區別主要體現在以下幾個方面:

  • ArrayBlockingQueue 使用時必須指定容量值,LinkedBlockingQueue 可以不用指定;
  • ArrayBlockingQueue 的最大容量值是使用時指定的,並且指定之後就不允許修改;而 LinkedBlockingQueue 最大的容量為 Integer.MAX_VALUE;
  • ArrayBlockingQueue 數據存儲容器是採用數組存儲的;而 LinkedBlockingQueue 採用的是 Node 節點存儲的。

2.LinkedList 中 add() 和 offer() 有什麼關係?

答:add() 和 offer() 都是添加元素到隊列尾部。offer 方法是基於 add 方法實現的,Offer 的源碼如下:

<code>public boolean offer(E e) { 
   return add(e);
}/<code>

3.Queue 和 Deque 有什麼區別?

答:Queue 屬於一般隊列,Deque 屬於雙端隊列。一般隊列是先進先出,也就是隻有先進的才能先出;而雙端隊列則是兩端都能插入和刪除元素。

4.LinkedList 屬於一般隊列還是雙端隊列?

答:LinkedList 實現了 Deque 屬於雙端隊列,因此擁有 addFirst(E)、addLast(E)、getFirst()、getLast() 等方法。

5.以下說法錯誤的是?

A:DelayQueue 內部是基於 PriorityQueue 實現的B:PriorityBlockingQueue 不是先進先出的數據存儲方式C:LinkedBlockingQueue 默認容量是無限大的D:ArrayBlockingQueue 內部的存儲單元是數組,初始化時必須指定隊列容量

答:C

題目解析:LinkedBlockingQueue 默認容量是 Integer.MAX_VALUE,並不是無限大的。

6.關於 ArrayBlockingQueue 說法不正確的是?

A:ArrayBlockingQueue 是線程安全的B:ArrayBlockingQueue 元素允許為 nullC:ArrayBlockingQueue 主要應用場景是“生產者-消費者”模型D:ArrayBlockingQueue 必須顯示地設置容量

答:B

題目解析:ArrayBlockingQueue 不允許元素為 null,如果添加一個 null 元素,會拋 NullPointerException 異常。

7.以下程序執行的結果是什麼?

<code>PriorityQueue priorityQueue = new PriorityQueue();
priorityQueue.add(null);
System.out.println(priorityQueue.size());/<code>
java 數據結構-隊列詳解

答:程序執行報錯,PriorityQueue 不能插入 null。

8.Java 中常見的阻塞隊列有哪些?

答:Java 中常見的阻塞隊列如下:

  • ArrayBlockingQueue,由數組結構組成的有界阻塞隊列;
  • PriorityBlockingQueue,支持優先級排序的無界阻塞隊列;
  • SynchronousQueue,是一個不存儲元素的阻塞隊列,會直接將任務交給消費者,必須等隊列中的添加元素被消費後才能繼續添加新的元素;
  • LinkedBlockingQueue,由鏈表結構組成的阻塞隊列;
  • DelayQueue,支持延時獲取元素的無界阻塞隊列。

9.有界隊列和無界隊列有哪些區別?

答:有界隊列和無界隊列的區別如下。

  • 有界隊列:有固定大小的隊列叫做有界隊列,比如:new ArrayBlockingQueue(6),6 就是隊列的大小。
  • 無界隊列:指的是沒有設置固定大小的隊列,這些隊列的特點是可以直接入列,直到溢出。它們並不是真的無界,它們最大值通常為 Integer.MAX_VALUE,只是平常很少能用到這麼大的容量(超過 Integer.MAX_VALUE),因此從使用者的體驗上,就相當於 “無界”。

10.如何手動實現一個延遲消息隊列?

答:說到延遲消息隊列,我們應該可以第一時間想到要使用 DelayQueue 延遲隊列來解決這個問題。實現思路,消息隊列分為生產者和消費者,生產者用於增加消息,消費者用於獲取並消費消息,我們只需要生產者把消息放入到 DelayQueue 隊列並設置延遲時間,消費者循環使用 take() 阻塞獲取消息即可。完整的實現代碼如下:

<code>public class CustomDelayQueue {
    // 消息編號
    static AtomicInteger MESSAGENO = new AtomicInteger(1);
 
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<delayedelement> delayQueue = new DelayQueue<>();
        // 生產者1
        producer(delayQueue, "生產者1");
        // 生產者2
        producer(delayQueue, "生產者2");
        // 消費者
        consumer(delayQueue);
    }
 

    //生產者
    private static void producer(DelayQueue<delayedelement> delayQueue, String name) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    // 產生 1~5 秒的隨機數
                    long time = 1000L * (new Random().nextInt(5) + 1);
                    try {
                        Thread.sleep(time);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 組合消息體
                    String message = String.format("%s,消息編號:%s 發送時間:%s 延遲:%s 秒",
                            name, MESSAGENO.getAndIncrement(), DateFormat.getDateTimeInstance().format(new Date()), time / 1000);
                    // 生產消息
                    delayQueue.put(new DelayedElement(message, time));
                }
            }
        }).start();
    }
 
    //消費者
    private static void consumer(DelayQueue<delayedelement> delayQueue) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    DelayedElement element = null;
                    try {
                        // 消費消息
                        element = delayQueue.take();
                        System.out.println(element);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
 
    // 延遲隊列對象
    static class DelayedElement implements Delayed {
        // 過期時間(單位:毫秒)

        long time = System.currentTimeMillis();
        // 消息體
        String message;
        // 參數:delayTime 延遲時間(單位毫秒)
        public DelayedElement(String message, long delayTime) {
            this.time += delayTime;
            this.message = message;
        }
        @Override
        // 獲取過期時間
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        @Override
        // 隊列元素排序
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
                return 1;
            else if (this.getDelay(TimeUnit.MILLISECONDS)                 return -1;
            else
                return 0;
        }
        @Override
        public String toString() {
            // 打印消息
            return message + " |執行時間:" + DateFormat.getDateTimeInstance().format(new Date());
        }
    }
}/<delayedelement>/<delayedelement>/<delayedelement>/<code>
java 數據結構-隊列詳解

以上程序支持多生產者,執行的結果如下:

生產者1,消息編號:1 發送時間:2019-6-12 20:38:37 延遲:2 秒 |執行時間:2019-6-12 20:38:39

生產者2,消息編號:2 發送時間:2019-6-12 20:38:37 延遲:2 秒 |執行時間:2019-6-12 20:38:39

生產者1,消息編號:3 發送時間:2019-6-12 20:38:41 延遲:4 秒 |執行時間:2019-6-12 20:38:45

生產者1,消息編號:5 發送時間:2019-6-12 20:38:43 延遲:2 秒 |執行時間:2019-6-12 20:38:45

......

總結

隊列(Queue)按照是否阻塞可分為:阻塞隊列 BlockingQueue 和 非阻塞隊列。其中,雙端隊列 Deque 也屬於非阻塞隊列,雙端隊列除了擁有隊列的先進先出的方法之外,還擁有自己獨有的方法,如 addFirst()、addLast()、getFirst()、getLast() 等,支持首未插入和刪除元素。隊列中比較常用的兩個隊列還有 PriorityQueue(優先級隊列)和 DelayQueue(延遲隊列),可使用延遲隊列來實現延遲消息隊列,這也是面試中比較常考的問題之一。需要面試朋友對延遲隊列一定要做到心中有數,動手寫一個消息隊列也是非常有必要的。

下一篇:Java IO 詳解

往期精選(全集在wx公號)

分佈式數據之緩存技術,一起來揭開其神秘面紗

分佈式數據複製技術,今天就教你真正分身術

數據分佈方式之哈希與一致性哈希,我就是個神算子

分佈式存儲系統三要素,掌握這些就離成功不遠了

想要設計一個好的分佈式系統,必須搞定這個理論

分佈式通信技術之發佈訂閱,乾貨滿滿

分佈式通信技術之遠程調用:RPC

消息隊列Broker主從架構詳細設計方案,這一篇就搞定主從架構

消息中間件路由中心你會設計嗎,不會就來學學

消息隊列消息延遲解決方案,跟著做就行了

秒殺系統每秒上萬次下單請求,我們該怎麼去設計

【分佈式技術】分佈式系統調度架構之單體調度,非掌握不可

CDN加速技術,作為開發的我們真的不需要懂嗎?

煩人的緩存穿透問題,今天教就你如何去解決

分佈式緩存高可用方案,我們都是這麼幹的

每天百萬交易的支付系統,生產環境該怎麼設置JVM堆內存大小

你的成神之路我已替你鋪好,沒鋪你來捶我


分享到:


相關文章: