互聯網JAVA面試常問問題-帶你走入AQS同步器源碼

互聯網JAVA面試常問問題-帶你走入AQS同步器源碼

AbstractQueuedSynchronizer

隊列式的同步器,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的ReentrantLock/Semaphore/CountDownLatch。AQS的源碼如下:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
protected AbstractQueuedSynchronizer() { }
static final class Node {
........
}
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

從源碼可以看出,AQS內部使用一個int成員變量表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作,多線程爭用資源被阻塞時會進入此隊列,其中volatile修飾了以下變量,來保證了多線程之間的可見:

1.內部狀態state

2.等待隊列的頭節點head

3.等待隊列的尾節點head。

state的訪問方式有三種:

1.getState()

2.setState()

3.compareAndSetState()

其中FIFO隊列的實現如下,每個節點中,除了存儲了當前線程,前後節點的引用以外,還有一個waitStatus變量,用於描述節點當前的狀態(共四種狀態:1.CANCELLED 取消狀態;2.SIGNAL 等待觸發狀態;3.CONDITION 等待條件狀態;4.PROPAGATE 狀態需要向後傳播 )

static final class Node { 
static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile AbstractQueuedSynchronizer.Node prev;
volatile AbstractQueuedSynchronizer.Node next;
volatile Thread thread;
AbstractQueuedSynchronizer.Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
AbstractQueuedSynchronizer.Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}

AQS的不同子類爭用共享資源的方式也不同。AQS子類中,只需要實現共享資源state的獲取與釋放方式即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:

isHeldExclusively():該線程是否正在獨佔資源。只有用到condition才需要去實現它。
tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false。

在AQS中,有兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。接下來主要介紹Exclusive下是如何獲取和釋放鎖的:

鎖獲取acquire(int)

 /**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();

}

所以在獲取鎖的過程中,我們看到了如下步驟

1.tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;

2.addWaiter()將該線程加入等待隊列的尾部,並標記為獨佔模式;

3.acquireQueued()使線程在等待隊列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。

4.如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。

鎖釋放release(int)

 /**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

此方法是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列裡的其他線程來獲取資源。它調用tryRelease()來釋放資源。

小結

最後再回顧一下AQS的應用,以及ReentrantLock是如何實現的

ReentrantLock中state初始化為0,表示未鎖定狀態。當一個線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其他線程再tryAcquire()時就會失敗,直到此線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會獲取該鎖。當然,釋放鎖之前,這個線程自己是可以重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。

一般來說,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。

更多幹貨內容,轉發+關注。私信我“資料”即可獲取。


分享到:


相關文章: