引言
AQS,即AbstractQueuedSynchronizer, 隊列同步器,它是Java併發用來構建鎖和其他同步組件的基礎框架。大多數開發者可能都不會直接使用AQS,標準同步器類的集合能夠滿足絕大多數情況的需求,但如果能瞭解標準同步器類的實現方式,那麼對於理解它們的工作原理是非常有幫助的。
AQS是一個抽象類,一般是同步組件的靜態內部類,通過組合的方式使用。AQS本身沒有實現任何同步接口的,它僅僅只是定義了同步狀態的獲取和釋放的方法來供自定義的同步組件的使用。
AQS原理簡介
AQS維護一個共享資源state,通過內置的FIFO來完成獲取資源線程的排隊工作。(這個內置的同步隊列稱為"CLH"隊列)。該隊列由一個一個的Node結點組成,每個Node結點維護一個prev引用和next引用,分別指向自己的前驅和後繼結點。AQS維護兩個指針,分別指向隊列頭部head和尾部tail。
從圖中可以看出它是一個雙向鏈表。當線程獲取資源失敗(比如tryAcquire時試圖設置state狀態失敗),會被構造成一個結點加入CLH隊列中,同時當前線程會被阻塞在隊列中(通過LockSupport.park實現,其實是等待態)。當持有同步狀態的線程釋放同步狀態時,會喚醒後繼結點,然後此結點線程繼續加入到對同步狀態的爭奪中。
首先來看AQS最主要的三個成員變量:
1 private transient volatile Node head;
2
3 private transient volatile Node tail;
4
5 private volatile int state;
int型的變量state用來表示同步狀態. head和tail分別是同步隊列的頭結點和尾結點。假設state=0表示同步狀態可用(如果用於鎖,則表示鎖可用),state=1表示同步狀態已被佔用(鎖被佔用),狀態信息通過procted類型的getState,setState,compareAndSetState進行操作.
AQS支持兩種同步方式:
- 獨佔式
- 共享式
這樣方便使用者實現不同類型的同步組件,獨佔式如ReentrantLock,共享式如Semaphore,CountDownLatch,組合式的如ReentrantReadWriteLock。總之,AQS為使用提供了底層支撐,如何組裝實現,使用者可以自由發揮。
同步器的設計是基於模板方法模式的,一般的使用方式是這樣:
- 使用者繼承AbstractQueuedSynchronizer並重寫指定的方法。
- 將AQS組合在自定義同步組件的實現中,並調用其模板方法,而這些模板方法會調用使用者重寫的方法。
AQS定義的以下可重寫的方法:
- protected boolean tryAcquire(int arg)
- 獨佔式獲取同步狀態,試著獲取,成功返回true,反之為false
- protected boolean tryRelease(int arg)
- 獨佔式釋放同步狀態,等待中的其他線程此時將有機會獲取到同步狀態;
- protected int tryAcquireShared(int arg)
- 共享式獲取同步狀態,返回值大於等於0,代表獲取成功;反之獲取失敗;
- protected boolean tryReleaseShared(int arg)
- 共享式釋放同步狀態,成功為true,失敗為false
- protected boolean isHeldExclusively()
- 是否在獨佔模式下被線程佔用。
自定義同步器
那麼我們如何使用AQS呢,首先,我們需要去繼承AbstractQueuedSynchronizer這個類,然後我們根據我們的需求去重寫相應的方法,比如要實現一個獨佔鎖,那就去重寫tryAcquire,tryRelease方法,要實現共享鎖,就去重寫tryAcquireShared,tryReleaseShared;最後,在我們的組件中調用AQS中的模板方法就可以了,而這些模板方法是會調用到我們之前重寫的那些方法的。也就是說,我們只需要很小的工作量就可以實現自己的同步組件,重寫的那些方法,僅僅是一些簡單的對於共享資源state的獲取和釋放操作,至於像是獲取資源失敗,線程需要阻塞之類的操作,自然是AQS幫我們完成了。
對於使用者來講,我們無需關心獲取資源失敗,線程排隊,線程阻塞/喚醒等一系列複雜的實現,這些都在AQS中為我們處理好了。我們只需要負責好自己的那個環節就好,也就是獲取/釋放共享資源state的姿勢T_T。很經典的模板方法設計模式的應用,AQS為我們定義好頂級邏輯的骨架,並提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列複雜邏輯的實現,將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現即可。
下面的例子是獨佔鎖的實現方式,看代碼:
1import java.util.concurrent.locks.AbstractQueuedSynchronizer;
2
3public class MyLock {
4
5
6 private final Sync sync = new Sync();
7
8 public void lock() {
9 sync.acquire(1);
10 }
11
12 public void unlock() {
13 sync.release(1);
14 }
15
16 public boolean isLocked() {
17 return sync.isHeldExclusively();
18 }
19
20
21 private static class Sync extends AbstractQueuedSynchronizer {
22 @Override
23 protected boolean tryAcquire(int arg) {
24
25 //首先判斷狀態是否等於=0,如果狀態==0,就將status設置為1
26 if (compareAndSetState(0,1)) {
27 //將當前線程賦值給獨佔模式的onwer
28 setExclusiveOwnerThread(Thread.currentThread());
29 return true;
30 }
31
32 return false;
33
34 }
35
36 @Override
37 protected boolean tryRelease(int arg) {
38 if (getState() == 0) {
39 throw new IllegalMonitorStateException();
40 }
41 setExclusiveOwnerThread(null);
42 setState(0);
43 return true;
44 }
45
46 @Override
47 protected boolean isHeldExclusively() {
48 return getState() == 1;
49 }
50 }
51
52
53}
我們測試下我們寫的自定義同步器,我們啟用30個線程,每個線程對i自加10000次,同步正常的話,最終結果應為300000;測試代碼如下:
1public class Demo {
2
3 private static CyclicBarrier barrier = new CyclicBarrier(31);
4
5 private static int count;
6
7 private static final MyLock myLock = new MyLock();
8
9
10 public static void main(String[] args) throws Exception{
11 //說明:我們啟用30個線程,每個線程對i自加10000次,同步正常的話,最終結果應為300000;
12 //未加鎖前
13 for(int i=0;i<30;i++){
14 Thread t = new Thread(new Runnable() {
15 @Override
16 public void run() {
17 for(int i=0;i<10000;i++){
18 increment1();//沒有同步措施的a++;
19 }
20 try {
21 barrier.await();//等30個線程累加完畢
22 } catch (Exception e) {
23 e.printStackTrace();
24 }
25 }
26 });
27 t.start();
28 }
29 barrier.await();
30 System.out.println("沒有加鎖,count="+count);
31 //加鎖後
32 barrier.reset();//重置CyclicBarrier
33 count=0;
34 for(int i=0;i<30;i++){
35 new Thread(new Runnable() {
36 @Override
37 public void run() {
38 for(int i=0;i<10000;i++){
39 increment2();//a++採用Mutex進行同步處理
40 }
41 try {
42 barrier.await();//等30個線程累加完畢
43 } catch (Exception e) {
44 e.printStackTrace();
45 }
46 }
47 }).start();
48 }
49 barrier.await();
50 System.out.println("加鎖後,count="+count);
51 }
52
53
54
55 /**
56 * 沒有同步措施的a++
57 * @return
58 */
59 public static void increment1(){
60 count++;
61 }
62 /**
63 * 使用自定義的Mutex進行同步處理的a++
64 */
65 public static void increment2(){
66 myLock.lock();
67 count++;
68 myLock.unlock();
69 }
70}
運行結果:
1沒有加鎖,count=260399
2加鎖後,count=300000
從運行結果可以看出,我們寫的自定義同步器的鎖生效了。
AQS源碼分析
AQS的實現依賴內部的同步隊列(FIFO雙向隊列),如果當前線程獲取同步狀態失敗,AQS會將該線程以及等待狀態等信息構造成一個Node,將其加入同步隊列的尾部,同時阻塞當前線程,當同步狀態釋放時,喚醒隊列的頭節點。
下面舉例說下獲取和釋放同步狀態的過程:
獲取同步狀態
假設線程A要獲取同步狀態(這裡想象成鎖,方便理解),初始狀態下state=0,所以線程A可以順利獲取鎖,A獲取鎖後將state置為1。在A沒有釋放鎖期間,線程B也來獲取鎖,此時因為state=1,表示鎖被佔用,所以將B的線程信息和等待狀態等信息構成出一個Node節點對象,放入同步隊列,head和tail分別指向隊列的頭部和尾部(此時隊列中有一個空的Node節點作為頭點,head指向這個空節點,空Node的後繼節點是B對應的Node節點,tail指向它),同時阻塞線程B(這裡的阻塞使用的是LockSupport.park()方法)。後續如果再有線程要獲取鎖,都會加入隊列尾部並阻塞。
釋放同步狀態
當線程A釋放鎖時,即將state置為0,此時A會喚醒頭節點的後繼節點(所謂喚醒,其實是調用LockSupport.unpark(B)方法),即B線程從LockSupport.park()方法返回,此時B發現state已經為0,所以B線程可以順利獲取鎖,B獲取鎖後B的Node節點隨之出隊。
Node 節點
Node結點是AbstractQueuedSynchronizer中的一個靜態內部類.
1static final class Node {
2 /** waitStatus值,表示線程已被取消(等待超時或者被中斷)*/
3 static final int CANCELLED = 1;
4 /** waitStatus值,表示後繼線程需要被喚醒(unpaking)*/
5 static final int SIGNAL = -1;
6 /**waitStatus值,表示結點線程等待在condition上,當被signal後,會從等待隊列轉移到同步到隊列中 */
7 /** waitStatus value to indicate thread is waiting on condition */
8 static final int CONDITION = -2;
9 /** waitStatus值,表示下一次共享式同步狀態會被無條件地傳播下去
10 static final int PROPAGATE = -3;
11 /** 等待狀態,初始為0 */
12 volatile int waitStatus;
13 /**當前結點的前驅結點 */
14 volatile Node prev;
15 /** 當前結點的後繼結點 */
16 volatile Node next;
17 /** 與當前結點關聯的排隊中的線程 */
18 volatile Thread thread;
19 /** ...... */
20 }
獨佔式
我們看下acquire方法,lock方法一般會直接代理到acquire上
1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
這段代碼很短,看著不太容易理解,我們稍微做下轉化:
1 public final void acquire(int arg) {
2 boolean hasAcquired = tryAcquire(arg);
3 if (! hasAcquired){
4 Node currentThreadNode=addWaiter(Node.EXCLUSIVE);
5 boolean interrupted = acquireQueued(currentThreadNode, arg);
6
7if (interrupted) {
8 selfInterrupt();
9 }
10
11 }
12 }
1.tryAcquire方法嘗試獲取鎖,如果成功就返回,如果不成功,走到2.
2.把當前線程和等待狀態信息構造成一個Node節點,並將結點放入同步隊列的尾部
3.該Node結點在隊列中嘗試獲取同步狀態,若獲取不到,則阻塞結點線程,直到被前驅結點喚醒或者被中斷.
addWaiter
addWaiter方式主要是將同步狀態失敗的線程,構造成一個Node結點,添加到同步隊列尾部
1 private Node addWaiter(Node mode) {
2 Node node = new Node(Thread.currentThread(), mode);
3 Node pred = tail;
4 if (pred != null) {
5 node.prev = pred;
6 if (compareAndSetTail(pred, node)) {
7 pred.next = node;
8 return node;
9 }
10 }
11 enq(node);
12 return node;
13 }
創建一個Node對象,Node中包含了當前線程和Node模式(這時是排他模式)。tail是AQS的中表示同步隊列隊尾的屬性,剛開始為null,所以進行enq(node)方法,從字面可以看出這是一個入隊操作,來看下具體入隊細節:
1private Node enq(final Node node) {
2 for (;;) {
3 Node t = tail;
4 if (t == null) { // Must initialize
5 if (compareAndSetHead(new Node()))
6 tail = head;
7 } else {
8 node.prev = t;
9 if (compareAndSetTail(t, node)) {
10 t.next = node;
11 return t;
12 }
13 }
14 }
15 }
enq方法是一個死循環,本身沒有鎖,可以多個線程併發訪問,假如某個線程進入方法,此時head, tail都為null, 進入if(t==null)區域,從方法名可以看出這裡是用CAS的方式創建一個空的Node作為頭結點,因為此時隊列中只一個頭結點,所以tail也指向它,第一次循環執行結束。注意這裡使用CAS是防止多個線程併發執行到這兒時,只有一個線程能夠執行成功,防止創建多個同步隊列。
進行第二次循環時(或者是其他線程enq時),tail不為null,進入else區域。將當前線程的Node結點(簡稱CNode)的prev指向tail,然後使用CAS將tail指向當前節點。
通過上面分析可知,AQS的寫入是一種雙向鏈表的插入操作,至此addWaiter分析完畢。
acquireQueued
看下acquireQueued方法:
1final boolean acquireQueued(final Node node, int arg) {
2 boolean failed = true;
3 try {
4 boolean interrupted = false;
5 for (;;) {//死循環
6 final Node p = node.predecessor();//找到當前結點的前驅結點
7 if (p == head && tryAcquire(arg)) {//如果前驅結點是頭結點,才tryAcquire,其他結點是沒有機會tryAcquire的。
8 setHead(node);//獲取同步狀態成功,將當前結點設置為頭結點。
9 p.next = null; // 方便GC
10 failed = false;
11 return interrupted;
12 }
13 // 如果沒有獲取到同步狀態,通過shouldParkAfterFailedAcquire判斷是否應該阻塞,parkAndCheckInterrupt用來阻塞線程
14 if (shouldParkAfterFailedAcquire(p, node) &&
15 parkAndCheckInterrupt())
16 interrupted = true;
17 }
18 } finally {
19 if (failed)
20 cancelAcquire(node);
21 }
22 }
可以看到,acquireQueued方法也是一個死循環,直到進入 if (p == head && tryAcquire(arg))條件方法塊。還是接著剛才的操作來分析。acquireQueued接收的參數是addWaiter方法的返回值。node.predecessor()返回當前節點的前置節點,在這裡也就是head節點,所以p==head成立,進而進行tryAcquire操作,即爭用鎖, 如果獲取成功,則進入if方法體,看下接下來的操作:
1) 將當前節點設置為頭節點。
2) 將當前節點的前置節點設置的next設置為null。
上面操作即完成了FIFO的出隊操作。
從上面的分析可以看出,只有隊列的第二個節點可以有機會爭用鎖,如果成功獲取鎖,則此節點晉升為頭節點。對於第三個及以後的節點,if (p == head)條件不成立,首先進行shouldParkAfterFailedAcquire(p, node)操作(爭用鎖失敗的第二個節點也如此).
shouldParkAfterFailedAcquire
shouldParkAfterFailedAcquire方法是判斷一個爭用鎖的線程是否應該被阻塞
1private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
2 //獲取前驅結點的wait值
3 int ws = pred.waitStatus;
4 if (ws == Node.SIGNAL)//若前驅結點的狀態是SIGNAL,意味著當前結點可以被安全地park
5 return true;
6 if (ws > 0) {
7 // ws>0,只有CANCEL狀態ws才大於0。若前驅結點處於CANCEL狀態,也就是此結點線程已經無效,從後往前遍歷,找到一個非CANCEL狀態的結點,將自己設置為它的後繼結點
8 do {
9 node.prev = pred = pred.prev;
10 } while (pred.waitStatus > 0);
11 pred.next = node;
12 } else {
13 // 若前驅結點為其他狀態,將其設置為SIGNAL狀態
14 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
15 }
16 return false;
17 }
shouldParkAfterFailedAcquire方法是判斷一個爭用鎖的線程是否應該被阻塞。它首先判斷一個節點的前置節點的狀態是否為Node.SIGNAL,如果是,是說明此節點已經將狀態設置如果鎖釋放,則應當通知它,所以它可以安全的阻塞了,返回true。
如果前節點的狀態大於0,即為CANCELLED狀態時,則會從前節點開始逐步循環找到一個沒有被“CANCELLED”節點設置為當前節點的前節點,返回false。在下次循環執行shouldParkAfterFailedAcquire時,返回true。這個操作實際是把隊列中CANCELLED的節點剔除掉。
前節點狀態小於0的情況是對應ReentrantLock的Condition條件等待的,這裡不進行展開。
如果shouldParkAfterFailedAcquire返回了true,則會執行:“parkAndCheckInterrupt()”方法,它是通過LockSupport.park(this)將當前線程掛起到WATING狀態,它需要等待一箇中斷、unpark方法來喚醒它,通過這樣一種FIFO的機制的等待,來實現了Lock的操作。
release
當前線程執行完自己的邏輯之後,需要釋放同步狀態,來看看release方法的邏輯:
1public final boolean release(int arg) {
2 if (tryRelease(arg)) {//調用使用者重寫的tryRelease方法,若成功,喚醒其後繼結點,失敗則返回false
3 Node h = head;
4 if (h != null && h.waitStatus != 0)
5 unparkSuccessor(h);//喚醒後繼結點
6 return true;
7 }
8 return false;
9 }
在方法unparkSuccessor(Node)中,就意味著真正要釋放鎖了,它傳入的是head節點(head節點是佔用鎖的節點),看下源碼:
1private void unparkSuccessor(Node node) {
2 //獲取wait狀態
3 int ws = node.waitStatus;
4 if (ws < 0)
5 compareAndSetWaitStatus(node, ws, 0);// 將等待狀態waitStatus設置為初始值0
6 Node s = node.next;//後繼結點
7 if (s == null || s.waitStatus > 0) {//若後繼結點為空,或狀態為CANCEL(已失效),則從後尾部往前遍歷找到一個處於正常阻塞狀態的結點 進行喚醒
8 s = null;
9 for (Node t = tail; t != null && t != node; t = t.prev)
10 if (t.waitStatus <= 0)
11 s = t;
12 }
13 if (s != null)
14 LockSupport.unpark(s.thread);//使用LockSupprot喚醒結點對應的線程
15 }
內部首先會發生的動作是獲取head節點的next節點,如果獲取到的節點不為空,則直接通過:“LockSupport.unpark()”方法來釋放對應的被掛起的線程,這樣一來將會有一個節點喚醒後繼續進入循環進一步嘗試tryAcquire()方法來獲取鎖。
共享式源碼簡單分析
共享式:共享式地獲取同步狀態。對於獨佔式同步組件來講,同一時刻只有一個線程能獲取到同步狀態,其他線程都得去排隊等待,其待重寫的嘗試獲取同步狀態的方法tryAcquire返回值為boolean,這很容易理解;對於共享式同步組件來講,同一時刻可以有多個線程同時獲取到同步狀態,這也是“共享”的意義所在。其待重寫的嘗試獲取同步狀態的方法tryAcquireShared返回值為int。
1 protected int tryAcquireShared(int arg) {
2 throw new UnsupportedOperationException();
3 }
1.當返回值大於0時,表示獲取同步狀態成功,同時還有剩餘同步狀態可供其他線程獲取;
2.當返回值等於0時,表示獲取同步狀態成功,但沒有可用同步狀態了;
3.當返回值小於0時,表示獲取同步狀態失敗。
acquireShared
1public final void acquireShared(int arg) {
2 if (tryAcquireShared(arg) < 0)//返回值小於0,獲取同步狀態失敗,排隊去;獲取同步狀態成功,直接返回去幹自己的事兒。
3 doAcquireShared(arg);
4 }
doAcquireShared
1private void doAcquireShared(int arg) {
2 final Node node = addWaiter(Node.SHARED);//構造一個共享結點,添加到同步隊列尾部。若隊列初始為空,先添加一個無意義的傀儡結點,再將新節點添加到隊列尾部。
3 boolean failed = true;//是否獲取成功
4 try {
5 boolean interrupted = false;//線程parking過程中是否被中斷過
6 for (;;) {//死循環
7 final Node p = node.predecessor();//找到前驅結點
8 if (p == head) {//頭結點持有同步狀態,只有前驅是頭結點,才有機會嘗試獲取同步狀態
9 int r = tryAcquireShared(arg);//嘗試獲取同步裝填
10 if (r >= 0) {//r>=0,獲取成功
11 setHeadAndPropagate(node, r);//獲取成功就將當前結點設置為頭結點,若還有可用資源,傳播下去,也就是繼續喚醒後繼結點
12 p.next = null; // 方便GC
13 if (interrupted)
14 selfInterrupt();
15 failed = false;
16 return;
17 }
18 }
19 if (shouldParkAfterFailedAcquire(p, node) &&//是否能安心進入parking狀態
20 parkAndCheckInterrupt())//阻塞線程
21 interrupted = true;
22 }
23 } finally {
24 if (failed)
25 cancelAcquire(node);
26 }
27 }
大體邏輯與獨佔式的acquireQueued差距不大,只不過由於是共享式,會有多個線程同時獲取到線程,也可能同時釋放線程,空出很多同步狀態,所以當排隊中的老二獲取到同步狀態,如果還有可用資源,會繼續傳播下去。
1private void setHeadAndPropagate(Node node, int propagate) {
2 Node h = head; // Record old head for check below
3 setHead(node);
4 if (propagate > 0 || h == null || h.waitStatus < 0) {
5 Node s = node.next;
6 if (s == null || s.isShared())
7 doReleaseShared();
8 }
9 }
releaseShared
releaseShared為釋放同步狀態
1public final boolean releaseShared(int arg) {
2 if (tryReleaseShared(arg)) {
3 doReleaseShared();//釋放同步狀態
4 return true;
5 }
6 return false;
7 }
doReleaseShared
1private void doReleaseShared() {
2 for (;;) {//死循環,共享模式,持有同步狀態的線程可能有多個,採用循環CAS保證線程安全
3 Node h = head;
4 if (h != null && h != tail) {
5 int ws = h.waitStatus;
6 if (ws == Node.SIGNAL) {
7 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
8 continue;
9 unparkSuccessor(h);//喚醒後繼結點
10 }
11 else if (ws == 0 &&
12 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
13 continue;
14 }
15 if (h == head)
16 break;
17 }
18 }
代碼邏輯比較容易理解,需要注意的是,共享模式,釋放同步狀態也是多線程的,此處採用了CAS自旋來保證。
總結
AQS是JUC中很多同步組件的構建基礎,簡單來講,它內部實現主要是狀態變量state和一個FIFO隊列來完成,同步隊列的頭結點是當前獲取到同步狀態的結點,獲取同步狀態state失敗的線程,會被構造成一個結點(或共享式或獨佔式)加入到同步隊列尾部(採用自旋CAS來保證此操作的線程安全),隨後線程會阻塞;釋放時喚醒頭結點的後繼結點,使其加入對同步狀態的爭奪中。
閱讀更多 科技伍小黑 的文章