【Java併發編程】—–“J.U.C”:LinkedBlockingQueue

在前面的文章ArrayBlockingQueue源碼分析中,已經對JDK中的BlockingQueue中的做了一個回顧,同時對ArrayBlockingQueue中的核心方法作了說明,而LinkedBlockingQueue作為JDK中BlockingQueue家族系列中一員,由於其作為固定大小線程池(Executors.newFixedThreadPool())底層所使用的阻塞隊列,分析它的目的主要在於2點: (1) 與ArrayBlockingQueue進行類比學習,加深各種數據結構的理解 (2) 瞭解底層實現,能夠更好地理解每一種阻塞隊列對線程池性能的影響,做到真正的知其然,且知其所以然

  • 源碼分析LinkedBlockingQueue的實現
  • 與ArrayBlockingQueue進行比較
  • 說明為什麼選擇LinkedBlockingQueue作為固定大小的線程池的阻塞隊列
    如發現有分析不對或不準確的地方,請您及時糾正(在此謝過)

1.LinkedBlockingQueue深入分析

LinkedBlockingQueue,見名之意,它是由一個基於鏈表的阻塞隊列,首先看一下的核心組成:

<code>    // 所有的元素都通過Node這個靜態內部類來進行存儲,這與LinkedList的處理方式完全一樣
static class Node {
//使用item來保存元素本身
E item;
//保存當前節點的後繼節點
Node next;
Node(E x) { item = x; }
}
/**
阻塞隊列所能存儲的最大容量
用戶可以在創建時手動指定最大容量,如果用戶沒有指定最大容量
那麼最默認的最大容量為Integer.MAX_VALUE.

*/
private final int capacity;

/**
當前阻塞隊列中的元素數量
PS:如果你看過ArrayBlockingQueue的源碼,你會發現
ArrayBlockingQueue底層保存元素數量使用的是一個
普通的int類型變量。其原因是在ArrayBlockingQueue底層
對於元素的入隊列和出隊列使用的是同一個lock對象。而數
量的修改都是在處於線程獲取鎖的情況下進行操作,因此不
會有線程安全問題。
而LinkedBlockingQueue卻不是,它的入隊列和出隊列使用的是兩個
不同的lock對象,因此無論是在入隊列還是出隊列,都會涉及對元素數
量的併發修改,(之後通過源碼可以更加清楚地看到)因此這裡使用了一個原子操作類
來解決對同一個變量進行併發修改的線程安全問題。
*/
private final AtomicInteger count = new AtomicInteger(0);

/**
* 鏈表的頭部
* LinkedBlockingQueue的頭部具有一個不變性:
* 頭部的元素總是為null,head.item==null
*/
private transient Node head;

/**
* 鏈表的尾部
* LinkedBlockingQueue的尾部也具有一個不變性:
* 即last.next==null
*/
private transient Node last;

/**
元素出隊列時線程所獲取的鎖
當執行take、poll等操作時線程需要獲取的鎖
*/
private final ReentrantLock takeLock = new ReentrantLock();

/**
當隊列為空時,通過該Condition讓從隊列中獲取元素的線程處於等待狀態
*/
private final Condition notEmpty = takeLock.newCondition();

/**
元素入隊列時線程所獲取的鎖
當執行add、put、offer等操作時線程需要獲取鎖
*/
private final ReentrantLock putLock = new ReentrantLock();

/**
當隊列的元素已經達到capactiy,通過該Condition讓元素入隊列的線程處於等待狀態
*/
private final Condition notFull = putLock.newCondition();

/<code>

通過上面的分析,我們可以發現LinkedBlockingQueue在入隊列和出隊列時使用的不是同一個Lock,這也意味著它們之間的操作不會存在互斥操作。在多個CPU的情況下,它們可以做到真正的在同一時刻既消費、又生產,能夠做到並行處理。

下面讓我們看下LinkedBlockingQueue的構造方法:

<code>    /**
* 如果用戶沒有顯示指定capacity的值,默認使用int的最大值
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
可以看到,當隊列中沒有任何元素的時候,此時隊列的頭部就等於隊列的尾部,
指向的是同一個節點,並且元素的內容為null
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node(null);
}

/*
在初始化LinkedBlockingQueue的時候,還可以直接將一個集合
中的元素全部入隊列,此時隊列最大容量依然是int的最大值。
*/
public LinkedBlockingQueue(Collection extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
//獲取鎖
putLock.lock(); // Never contended, but necessary for visibility
try {
//迭代集合中的每一個元素,讓其入隊列,並且更新一下當前隊列中的元素數量
int n = 0;
for (E e : c) {
if (e == null)

throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
//參考下面的enqueue分析
enqueue(new Node(e));
++n;
}
count.set(n);
} finally {
//釋放鎖
putLock.unlock();
}
}

/**
* 我去,這代碼其實可讀性不怎麼樣啊。
* 其實下面的代碼等價於如下內容:
* last.next=node;
* last = node;
* 其實也沒有什麼花樣:
就是讓新入隊列的元素成為原來的last的next,讓進入的元素稱為last
*
*/
private void enqueue(Node node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

/<code>

在分析完LinkedBlockingQueue的核心組成之後,下面讓我們再看下核心的幾個操作方法,首先分析一下元素入隊列的過程:

<code>    public void put(E e) throws InterruptedException { 

if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var

/*注意上面這句話,約定所有的put/take操作都會預先設置本地變量,
可以看到下面有一個將putLock賦值給了一個局部變量的操作
*/
int c = -1;
Node node = new Node(e);
/*
在這裡首先獲取到putLock,以及當前隊列的元素數量
即上面所描述的預設置本地變量操作
*/
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
/*
執行可中斷的鎖獲取操作,即意味著如果線程由於獲取
鎖而處於Blocked狀態時,線程是可以被中斷而不再繼
續等待,這也是一種避免死鎖的一種方式,不會因為
發現到死鎖之後而由於無法中斷線程最終只能重啟應用。
*/
putLock.lockInterruptibly();
try {
/*
當隊列的容量到底最大容量時,此時線程將處於等待狀
態,直到隊列有空閒的位置才繼續執行。使用while判
斷依舊是為了放置線程被"偽喚醒”而出現的情況,即當

線程被喚醒時而隊列的大小依舊等於capacity時,線
程應該繼續等待。
*/
while (count.get() == capacity) {
notFull.await();
}
//讓元素進行隊列的末尾,enqueue代碼在上面分析過了
enqueue(node);
//首先獲取原先隊列中的元素個數,然後再對隊列中的元素個數+1.
c = count.getAndIncrement();
/*注:c+1得到的結果是新元素入隊列之後隊列元素的總和。
當前隊列中的總元素個數小於最大容量時,此時喚醒其他執行入隊列的線程
讓它們可以放入元素,如果新加入元素之後,隊列的大小等於capacity,
那麼就意味著此時隊列已經滿了,也就沒有必須要喚醒其他正在等待入隊列的線程,因為喚醒它們之後,它們也還是繼續等待。
*/
if (c + 1 < capacity)
notFull.signal();
} finally {
//完成對鎖的釋放
putLock.unlock();
}
/*當c=0時,即意味著之前的隊列是空隊列,出隊列的線程都處於等待狀態,
現在新添加了一個新的元素,即隊列不再為空,因此它會喚醒正在等待獲取元素的線程。

*/
if (c == 0)
signalNotEmpty();
}

/*
喚醒正在等待獲取元素的線程,告訴它們現在隊列中有元素了
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//通過notEmpty喚醒獲取元素的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

/<code>

看完put方法,下面再看看下offer是如何處理的方法:

<code>    /**
在BlockingQueue接口中除了定義put方法外(當隊列元素滿了之後就會阻塞,
直到隊列有新的空間可以方法線程才會繼續執行),還定義一個offer方法,
該方法會返回一個boolean值,當入隊列成功返回true,入隊列失敗返回false。
該方法與put方法基本操作基本一致,只是有細微的差異。
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
/*

當隊列已經滿了,它不會繼續等待,而是直接返回。
因此該方法是非阻塞的。
*/
if (count.get() == capacity)
return false;
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
/*
當獲取到鎖時,需要進行二次的檢查,因為可能當隊列的大小為capacity-1時,
兩個線程同時去搶佔鎖,而只有一個線程搶佔成功,那麼此時
當線程將元素入隊列後,釋放鎖,後面的線程搶佔鎖之後,此時隊列
大小已經達到capacity,所以將它無法讓元素入隊列。
下面的其餘操作和put都一樣,此處不再詳述
*/
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
/<code>

BlockingQueue還定義了一個限時等待插入操作,即在等待一定的時間內,如果隊列有空間可以插入,那麼就將元素入隊列,然後返回true,如果在過完指定的時間後依舊沒有空間可以插入,那麼就返回false,下面是限時等待操作的分析:

<code>        /**
通過timeout和TimeUnit來指定等待的時長
timeout為時間的長度,TimeUnit為時間的單位
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
//將指定的時間長度轉換為毫秒來進行處理
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//如果等待的剩餘時間小於等於0,那麼直接返回
if (nanos <= 0)
return false;
/*
通過condition來完成等待,此時當前線程會完成鎖的,並且處於等待狀態
直到被其他線程喚醒該線程、或者當前線程被中斷、
等待的時間截至才會返回,該返回值為從方法調用到返回所經歷的時長。
注意:上面的代碼是condition的awitNanos()方法的通用寫法,
可以參看Condition.awaitNaos的API文檔。
下面的其餘操作和put都一樣,此處不再詳述
*/
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node(e));

c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
/<code>

通過上面的分析,我們應該比較清楚地知道了LinkedBlockingQueue的入隊列的操作,其主要是通過獲取到putLock鎖來完成,當隊列的數量達到最大值,此時會導致線程處於阻塞狀態或者返回false(根據具體的方法來看);如果隊列還有剩餘的空間,那麼此時會新創建出一個Node對象,將其設置到隊列的尾部,作為LinkedBlockingQueue的last元素。

在分析完入隊列的過程之後,我們接下來看看LinkedBlockingQueue出隊列的過程;由於BlockingQueue的方法都具有對稱性,此處就只分析take方法的實現,其餘方法的實現都如出一轍:

<code>
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//通過takeLock獲取鎖,並且支持線程中斷
takeLock.lockInterruptibly();
try {
//當隊列為空時,則讓當前線程處於等待
while (count.get() == 0) {
notEmpty.await();

}
//完成元素的出隊列
x = dequeue();
/*
隊列元素個數完成原子化操作-1,可以看到count元素會
在插入元素的線程和獲取元素的線程進行併發修改操作。
*/
c = count.getAndDecrement();
/*
當一個元素出隊列之後,隊列的大小依舊大於1時
當前線程會喚醒其他執行元素出隊列的線程,讓它們也
可以執行元素的獲取
*/
if (c > 1)
notEmpty.signal();
} finally {
//完成鎖的釋放
takeLock.unlock();
}
/*
當c==capaitcy時,即在獲取當前元素之前,
隊列已經滿了,而此時獲取元素之後,隊列就會
空出一個位置,故當前線程會喚醒執行插入操作的線
程通知其他中的一個可以進行插入操作。
*/
if (c == capacity)
signalNotFull();
return x;
}


/**

* 讓頭部元素出隊列的過程
* 其最終的目的是讓原來的head被GC回收,讓其的next成為head
* 並且新的head的item為null.
* 因為LinkedBlockingQueue的頭部具有一致性:即元素為null。
*/
private E dequeue() {
Node h = head;
Node first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

/<code>

LinkedBlockingQueue出隊列大致過程.png

對於LinkedBlockingQueue的源碼分析就到這裡,下面讓我們將LinkedBlockingQueue與ArrayBlockingQueue進行一個比較。

2.LinkedBlockingQueue與ArrayBlockingQueue的比較

ArrayBlockingQueue由於其底層基於數組,並且在創建時指定存儲的大小,在完成後就會立即在內存分配固定大小容量的數組元素,因此其存儲通常有限,故其是一個“有界“的阻塞隊列;而LinkedBlockingQueue可以由用戶指定最大存儲容量,也可以無需指定,如果不指定則最大存儲容量將是Integer.MAX_VALUE,即可以看作是一個

“無界”的阻塞隊列,由於其節點的創建都是動態創建,並且在節點出隊列後可以被GC所回收,因此其具有靈活的伸縮性。但是由於ArrayBlockingQueue的有界性,因此其能夠更好的對於性能進行預測,而LinkedBlockingQueue由於沒有限制大小,當任務非常多的時候,不停地向隊列中存儲,就有可能導致內存溢出的情況發生。

其次,ArrayBlockingQueue中在入隊列和出隊列操作過程中,使用的是同一個lock,所以即使在多核CPU的情況下,其讀取和操作的都無法做到並行,而LinkedBlockingQueue的讀取和插入操作所使用的鎖是兩個不同的lock,它們之間的操作互相不受干擾,因此兩種操作可以並行完成,故LinkedBlockingQueue的吞吐量要高於ArrayBlockingQueue。

3.選擇LinkedBlockingQueue的理由

<code>    /**
下面的代碼是Executors創建固定大小線程池的代碼,其使用了
LinkedBlockingQueue來作為任務隊列。
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<runnable>());
}
/<runnable>/<code>

JDK中選用LinkedBlockingQueue作為阻塞隊列的原因就在於其無界性。因為線程大小固定的線程池,其線程的數量是不具備伸縮性的,當任務非常繁忙的時候,就勢必會導致所有的線程都處於工作狀態,如果使用一個有界的阻塞隊列來進行處理,那麼就非常有可能很快導致隊列滿的情況發生,從而導致任務無法提交而拋出RejectedExecutionException,而使用無界隊列由於其良好的存儲容量的伸縮性,可以很好的去緩衝任務繁忙情況下場景,即使任務非常多,也可以進行動態擴容,當任務被處理完成之後,隊列中的節點也會被隨之被GC回收,非常靈活


分享到:


相關文章: