ArrayBlockingQueue源碼分析及使用

BlockingQueue介紹與常用方法


ArrayBlockingQueue源碼分析及使用


BlockingQueue是一個阻塞隊列。在高併發場景是用得非常多的,在線程池中。如果運行線程數目大於核心線程數目時,也會嘗試把新加入的線程放到一個BlockingQueue中去。隊列的特性就是先進先出很容易理解,在java裡頭它的實現類主要有下圖的幾種,其中最常用到的是ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue這三種。


ArrayBlockingQueue源碼分析及使用

它主要的方法有


ArrayBlockingQueue源碼分析及使用

BlockingQueue的核心方法:

1、放入數據

(1) add(object)

隊列沒滿的話,放入成功。否則拋出異常。

(2)offer(object):

表示如果可能的話,將object加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法的線程)

(3)offer(E o, long timeout, TimeUnit unit)

可以設定等待的時間,如果在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。

(4)put(object)

把object加到BlockingQueue裡,如果BlockQueue沒有空間,則調用此方法的線程阻塞。直到BlockingQueue裡面有空間再繼續.

2、獲取數據

(1)poll(time)

取走BlockingQueue裡排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null;

(2)poll(long timeout, TimeUnit unit)

從BlockingQueue取出一個隊首的對象,如果在指定時間內,隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗。

(3)take()

取走BlockingQueue裡排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入;

(4)drainTo()

一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。

ArrayBlockingQueue

一個由數組支持的有界阻塞隊列。它的本質是一個基於數組的BlockingQueue的實現。

它的容納大小是固定的。此隊列按 FIFO(先進先出)原則對元素進行排序。

隊列的頭部 是在隊列中存在時間最長的元素。隊列的尾部 是在隊列中存在時間最短的元素。

新元素插入到隊列的尾部,隊列檢索操作則是從隊列頭部開始獲得元素。

這是一個典型的“有界緩存區”,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。

一旦創建了這樣的緩存區,就不能再增加其容量。

試圖向已滿隊列中放入元素會導致放入操作受阻塞,直到BlockingQueue裡有新的喚空間才會被醒繼續操作;

試圖從空隊列中檢索元素將導致類似阻塞,直到BlocingkQueue進了新貨才會被喚醒。

此類支持對等待的生產者線程和使用者線程進行排序的可選公平策略。

默認情況下,不保證是這種排序。然而,通過在構造函數將公平性 (fairness) 設置為 true 而構造的隊列允許按照 FIFO 順序訪問線程。

公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。

此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選 方法。

注意1:它是有界阻塞隊列。它是數組實現的,是一個典型的“有界緩存區”。數組大小在構造函數指定,而且從此以後不可改變。

注意2:是它線程安全的,是阻塞的,具體參考BlockingQueue的“注意4”。

注意3:不接受 null 元素

注意4:公平性 (fairness)可以在構造函數中指定。


ArrayBlockingQueue源碼分析及使用

如果為true,則按照 FIFO 順序訪問插入或移除時受阻塞線程的隊列;如果為 false,則訪問順序是不確定的。

注意5:它實現了BlockingQueue接口。

注意6:此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選 方法。

注意7:其容量在構造函數中指定。容量不可以自動擴展,也沒提供手動擴展的接口。

注意8:在JDK5/6中,LinkedBlockingQueue和ArrayBlocingQueue等對象的poll(long timeout, TimeUnit unit)存在內存洩露

Leak的對象是AbstractQueuedSynchronizer.Node,

據稱JDK5會在Update12裡Fix,JDK6會在Update2裡Fix。

源碼分析:

一個基本數組的阻塞隊列。可以設置列隊的大小。

它的基本原理實際還是數組,只不過存、取、刪時都要做隊列是否滿或空的判斷。然後加鎖訪問。

<code>package java.util.concurrent;  
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.lang.ref.WeakReference;
import java.util.Spliterators;
import java.util.Spliterator;


public class ArrayBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {

private static final long serialVersionUID = -817911632652898426L;

/** 真正存入數據的數組*/

final Object[] items;

/** take, poll, peek or remove的下一個索引 */
int takeIndex;

/** put, offer, or add的下一個索引 */
int putIndex;

/**隊列中元素個數*/
int count;


/**可重入鎖 */
final ReentrantLock lock;

/** 隊列不為空的條件 */
private final Condition notEmpty;

/** 隊列未滿的條件 */
private final Condition notFull;

transient Itrs itrs = null;


/**
*當前元素個數-1
*/
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}

/**
* 返回對應索引上的元素
*/
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}

/**
* 非空檢查
*
* @param v the element
*/
private static void checkNotNull(Object v) {

if (v == null)
throw new NullPointerException();
}

/**
* 元素放入隊列,注意調用這個方法時都要先加鎖
*
*/
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;//當前擁有元素個數加1
notEmpty.signal();//有一個元素加入成功,那肯定隊列不為空
}

/**
* 元素出隊,注意調用這個方法時都要先加鎖
*
*/
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;/當前擁有元素個數減1
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//有一個元素取出成功,那肯定隊列不滿
return x;
}

/**
* 指定刪除索引上的元素
*
*/
void removeAt(final int removeIndex) {

final Object[] items = this.items;
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();//有一個元素刪除成功,那肯定隊列不滿
}

/**
*
* 構造函數,設置隊列的初始容量
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

/**
* 構造函數。capacity設置數組大小 ,fair設置是否為公平鎖
* capacity and the specified access policy.
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();

this.items = new Object[capacity];
lock = new ReentrantLock(fair);//是否為公平鎖,如果是的話,那麼先到的線程先獲得鎖對象。
//否則,由操作系統調度由哪個線程獲得鎖,一般為false,性能會比較高
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

/**
*構造函數,帶有初始內容的隊列
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection extends E> c) {
this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); //要給數組設置內容,先上鎖
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;//依次拷貝內容
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;//如果putIndex大於數組大小 ,那麼從0重新開始
} finally {
lock.unlock();//最後一定要釋放鎖
}
}

/**
* 添加一個元素,其實super.add裡面調用了offer方法
*/
public boolean add(E e) {
return super.add(e);

}

/**
*加入成功返回true,否則返回false
*
*/
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();//上鎖
try {
if (count == items.length) //超過數組的容量
return false;
else {
enqueue(e); //放入元素
return true;
}
} finally {
lock.unlock();
}
}

/**
* 如果隊列已滿的話,就會等待
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//和lock()方法的區別是讓它在阻塞時也可拋出異常跳出
try {
while (count == items.length)
notFull.await(); //這裡就是阻塞了,要注意。如果運行到這裡,那麼它會釋放上面的鎖,一直等到notify
enqueue(e);
} finally {
lock.unlock();
}
}

/**
* 帶有超時時間的插入方法,unit表示是按秒、分、時哪一種

*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);//帶有超時等待的阻塞方法
}
enqueue(e);//入隊
return true;
} finally {
lock.unlock();
}
}

//實現的方法,如果當前隊列為空,返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//實現的方法,如果當前隊列為空,一直阻塞
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();//隊列為空,阻塞方法
return dequeue();
} finally {
lock.unlock();
}
}
//帶有超時時間的取元素方法,否則返回Null

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);//超時等待
}
return dequeue();//取得元素
} finally {
lock.unlock();
}
}
//只是看一個隊列最前面的元素,取出是不刪除隊列中的原來元素。隊列為空時返回null
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // 隊列為空時返回null
} finally {
lock.unlock();
}
}

/**
* 返回隊列當前元素個數
*
*/
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}

/**
* 返回當前隊列再放入多少個元素就滿隊
*/

public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}

/**
* 從隊列中刪除一個元素的方法。刪除成功返回true,否則返回false
*/
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i); //真正刪除的方法
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);//一直不斷的循環取出來做判斷
}
return false;
} finally {
lock.unlock();
}
}

/**
* 是否包含一個元素
*/
public boolean contains(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();

try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}

/**
* 清空隊列
*
*/
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
items[i] = null;
if (++i == items.length)
i = 0;
} while (i != putIndex);
takeIndex = putIndex;
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}

/**
* 取出所有元素到集合

*/
public int drainTo(Collection super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}

/**
* 取出所有元素到集合
*/
public int drainTo(Collection super E> c, int maxElements) {
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
while (i < n) {
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x);
items[take] = null;
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
lock.unlock();
}

}


}
/<code>

使用實例:

生產者-消費者模型

大量的實現ArrayBlockingQueue已經做掉了,包括判空,線程掛起等操作都封裝在ArrayBlockingQueue中。生產者只需要關心生產,消費者只需要關心消費。而如果不使用ArrayBlockingQueue的話,具體的生產者還需要去通知消費者,還需要關心整個容器是否滿了。從這裡可以看出ArrayBlockingQueue是一種比較好的實現方式,高度的內聚。

Producer.java

<code>public class Producer implements Runnable{  

//容器
private final ArrayBlockingQueue<bread> queue;

public Producer(ArrayBlockingQueue<bread> queue){
this.queue = queue;
}

/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
while(true){
produce();
}
}

public void produce(){
/**

* put()方法是如果容器滿了的話就會把當前線程掛起
* offer()方法是容器如果滿的話就會返回false。
*/
try {
Bread bread = new Bread();
queue.put(bread);
System.out.println("Producer:"+bread);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} /<bread>/<bread>/<code>

Consumer.java

<code>public class Consumer implements Runnable{  

//容器
private final ArrayBlockingQueue<bread> queue;

public Consumer(ArrayBlockingQueue<bread> queue){
this.queue = queue;
}

/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
while(true){
consume();
}
}

public void consume(){
/**
* take()方法和put()方法是對應的,從中拿一個數據,如果拿不到線程掛起
* poll()方法和offer()方法是對應的,從中拿一個數據,如果沒有直接返回null
*/
try {
Bread bread = queue.take();

System.out.println("consumer:"+bread);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} /<bread>/<bread>/<code>

Client.java

<code>public class Client {  

/**
* @param args
*/
public static void main(String[] args) {
int capacity = 10;
ArrayBlockingQueue<bread> queue = new ArrayBlockingQueue<bread>(capacity);

new Thread(new Producer(queue)).start();
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}

} /<bread>/<bread>/<code>

JAVA進階架構程序員福利:我這裡還總結整理了比較全面的JAVA相關的面試資料,都已經整理成了

PDF版,這些都可以分享給大家,關注私信我:【806】,免費領取!


分享到:


相關文章: