隊列同步器(AQS)的實現原理

如下圖所示是隊列同步器的基本結構,通過一個int類型的state對象來表示同步狀態,對鎖的競爭就是通過修改這個同步狀態和對這個狀態的值來進行判斷實現,使用一個FIFO雙向隊列來管理競爭同步狀態的線程,把這些線程封裝成一個隊列節點加入到隊列中。

隊列同步器(AQS)的實現原理

嘗試獨佔式獲取鎖

<code>protected boolean tryAcquire(int arg) {
throw new Uns/<code>

以上是這個方法的定義,直接拋出一個異常,這個方法是需要自己去實現的,看一下ReentrantLock中的公平鎖是怎麼實現它的。 這個方法的實現要先獲取同步狀態,進行判斷是否是預期的狀態,然後使用CAS設置同步狀態。

<code>protected final boolean tryAcquire(int acquires) {
//1. 獲取當前線程
final Thread current = Thread.currentThread();
//2. 獲取當前的同步狀態,這個狀態是通過volatile修飾的。
int c = getState();
if (c == 0) {
//3. 判斷狀態是否是0,為0表示沒有線程獲取到同步狀態(鎖),CAS設置同步狀態
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//4.已經有線程獲取了同步狀態,判斷是不是當前線程
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}/<code>

獨佔式獲取鎖

這個方法用於獨佔式獲取同步狀態,如果當前線程獲取同步狀態成功,那麼返回,否則就進入同步隊列進行等待,這個方法會調用重寫的tryAcquire(int)。

<code>public final void acquire(int arg) {
/*1. 先嚐試獲取鎖,如果成功了直接返回*/
if (!tryAcquire(arg) &&
/*2.如果失敗了,那麼把當前的節點加入同步隊列*/
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

/**
* 這個方法將當前線程構建成一個隊列的節點,然後先試用CAS操作快速的插入到隊列中,
* 如果成功了就返回,如果不成功,那麼久執行enq方法試用死循環的方式CAS設置節點,直到設置成功為止
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* 一個死循環的方式進行節點的加入操作,如果隊列為空的,那麼會先初始化隊列,然後CAS操作將當前節點加入到隊列尾部

* 這裡的這種方式保證了插入到隊列的尾部是串行的,通過所有的線程都自旋然後CAS加入來保證。
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

/**
* 這個方法主要作用就是當前的這個節點表示的線程進行自旋,然後判斷是否能夠獲取到同步狀態
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1. 獲取當前的節點的前驅節點,然後判斷前驅節點是否是head節點,如果是head節點,那麼嘗試去獲取同步狀態
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//2. 獲取同步狀態成功,當前節點設置為隊列的頭結點,所以隊列的頭節點是獲取到同步狀態的節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;

}
//3.獲取鎖失敗,判斷是否需要阻塞當前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}/<code>

以上的源碼就是獨佔式獲取同步狀態,總結下來如下:

  1. 同步隊列是通過使用一個雙向鏈表的FIFO隊列進行線程的同步狀態競爭的。
  2. 同步隊列中的head節點和tail節點,每個節點都有一個前驅和後繼節點。
  3. head表示的是當前持有同步狀態的線程的節點,獲取同步狀態失敗的節點會通過CAS的方式線程安全地加入到隊列的尾部,每個節點都會循環去獲取同步狀態。
  4. 只有當前持有同步狀態的頭節點釋放了同步狀態之後,後繼節點才會獲取到同步狀態,然後將自己設置為head節點。

流程圖如下:

隊列同步器(AQS)的實現原理

獨佔式釋放鎖

<code>public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒後繼節點
unparkSuccessor(h);
return true;
}
return false;
}/<code>

上面一段代碼是獨佔式釋放鎖的代碼,非常簡單,先嚐試釋放鎖,如果釋放成功那麼判斷隊列是否是空和等待狀態是否不為0,然後喚醒head的後繼節點。

共享式獲取鎖

共享式獲取鎖的時候如果一個線程獲取了同步狀態,那麼其他共享式獲取鎖的線程也能夠獲取到同步狀態,但是獨佔式獲取將會被拒絕。

<code>public final void acquireShared(int arg) {
//1. 嘗試共享式獲取鎖,如果獲取到就返回
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
//直接拋出了異常,需要同步組件自己去實現
throw new UnsupportedOperationException();
}複製代碼/<code>

以CountDownLatch為例分析tryAcquireShared的實現,CountDownLatch

<code>/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

//1. 初始化的時候指定有多少個線程可以同時獲取到同步狀態
Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

/**
* 2. 判斷如果同步狀態的值為0的時候,表示獲取成功,如果不為0,表示已經有線程獲取到同步狀態,獲取失敗
**/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

/**
* 嘗試釋放同步狀態,如果同步狀態為0表示已經都釋放了,返回false,如果不為0那麼CAS釋放同步狀態,對於CountDouwnLatch而言,
* 只有當所有線程都執行完成之後才能繼續,所以只有當前state=0的時候,才算釋放成功。
**/
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;

if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
/*獲取鎖,如果獲取不到那麼等待,直到獲取到為止,當state=0的時候獲取到鎖*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
//1. 獲取失敗的時候創建節點加入隊列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//2.獲取前驅節點
final Node p = node.predecessor();
if (p == head) {
//3. 如果是頭節點的後繼節點,那麼嘗試獲取共享同步狀態,如果返回值>0,那麼獲取成功,
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}

}/<code>

共享式獲取鎖和獨佔式獲取鎖的區別其實就在於嘗試共享式獲取鎖的時候返回值是否是>1的,關鍵就在於子類如何去實現了,其他實現和獨佔式獲取鎖差不多,都是節點自旋獲取鎖。

共享式釋放鎖

<code>/*先嚐試釋放鎖,如果成功那麼喚醒後繼節點*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}/<code>

總結如下:

共享式獲取鎖的關鍵要看子類同步狀態獲取成功失敗的定義,但是要遵循一個基本原則就是隻要同步狀態是>1的,那麼就表示獲取狀態是成功,通過這種方式來支持多個線程同時獲取到鎖。

《java併發編程的藝術》


分享到:


相關文章: