併發——抽象隊列同步器AQS的實現原理

一、前言

這段時間在研究 Java 併發相關的內容,一段時間下來算是小有收穫了。 ReentrantLock 是 Java 併發中的重要部分,所以也是我的首要研究對象,在學習它的過程中,我發現它是基於

抽象隊列同步器AQS 實現的,所以我花了點時間學習了一下 AQS 的實現原理。這篇博客就來說一說 AQS 的作用,以及它是如何實現的。


併發——抽象隊列同步器AQS的實現原理


二、正文

2.1 什麼是AQS

AQS 全稱抽象隊列同步器(AbstractQuenedSynchronizer),它是一個可以用來實現線程同步的基礎框架。當然,它不是我們理解的 Spring 這種框架,它是一個類,類名就是 AbstractQuenedSynchronizer ,如果我們想要實現一個能夠完成線程同步的鎖或者類似的同步組件,就可以在使用 AQS 來實現,因為它封裝了線程同步的方式,我們在自己的類中使用它,就可以很方便的實現一個我們自己的鎖。

2.2 如何使用AQS

AQS 封裝了很多方法,如獲取獨佔鎖,釋放獨佔鎖,獲取共享鎖,釋放共享鎖......我們可以通過在自己的實現的同步組件中調用 AQS 的這些方法來實現一個線程同步的功能。但是,根據 AQS 的名稱也能夠想到,我們不能直接創建 AQS 的對象,調用這些方法,因為 AQS 是一個抽象類,我們需要繼承 AQS ,創建它的子類對象來使用它。在實際使用中,一般是在我們自己的類中,以內部類的方式繼承 AQS ,然後在內部創建一個對象,在這個類內部使用,比如 ReentrantLock 中就是定義了一個抽象內部類 Sync ,繼承 AQS ,然後定義了一個 NonfairSync 類,繼承 Sync , NonfairSync 是一個

非公平鎖 ;同時又定義了一個 FairSync 類繼承 Sync , FairSync 是一個 公平鎖

公平鎖:多個線程按照申請鎖的順序去獲得鎖,後申請鎖的線程需要排隊,等它之前的線程獲得鎖並釋放後,它才能獲得鎖;

非公平鎖:線程獲得鎖的順序於申請鎖的順序無關,申請鎖的線程可以直接嘗試獲得鎖,誰搶到就是誰的;

我們繼承了 AQS ,就可以直接調用它的方法了嗎?當然不是。 Java 中提供的抽象組件,都是幫我們寫好了通用的部分,但是一些具體的部分,還需要我們自己實現。舉個比較簡單的例子, Java 中對自定義類型數組的排序,可以直接調用工具類的 sort 方法, sort 方法已經實現了排序的算法,但是其中的比較過程是抽象的,需要我們自己實現,所以我們一般需要提供一個比較器(Comparator),或者讓自定義類實現 Comparable 接口。這就是 模板方法 設計模式。

模板方法:在一個方法中實現了一個算法的流程,但是其中的一些步驟是抽象的,需要在子類中實現,或者具體使用時實現。模板方法可以提高算法的複用性,提供了算法的彈性,對於不同的需求,可以通用同一份代碼。

而 AQS 的實現就是封裝了一系列的模板方法,包括獲取鎖、釋放鎖等,這些都是模板方法。這些方法中調用的一些方法並沒有具體實現,需要使用者根據自己的需求,在子類中進行實現。下面我們就來看看 AQS 中的這些方法。

2.3 AQS中的方法

AQS底層維護一個int類型的變量state來表示當前的同步狀態,根據當前state的值,來判斷當前釋放處於鎖定狀態,或者是其他狀態。而 state 的每一個值具體是什麼含義,是由我們自己實現的。我們繼承 AQS 時,根據自己的需求,實現一些方法,其中就是通過修改 state 的值來維持同步狀態。而關於 state ,主要有以下三個方法:

  • **int getState() **:獲取當前同步狀態 state 的值;
  • **void setState(int newState) **:設置當前同步狀態 state 的值;
  • **boolean compareAndSetState(int expect, int update) **:使用 CAS 設置當前同步狀態的值,方法能夠保證設置同步狀態時的原子性;參數 expect 為 state 的預期舊值,而 update 是需要修改的新值,若設置成功,方法返回 true ,否則 false ;

CAS是一種樂觀鎖,若不瞭解,可以看看這篇博客: 併發——詳細介紹CAS機制

接下來我們再看一看在繼承 AQS 時,我們可以重寫的方法:

併發——抽象隊列同步器AQS的實現原理

併發——抽象隊列同步器AQS的實現原理

以上這些方法將會在 AQS 的模板方法中被調用,我們根據自己的需求,重寫上述方法,控制同步狀態 state 的值,即可控制線程同步的方式。下面再來看看 AQS 提供的模板方法:

併發——抽象隊列同步器AQS的實現原理

AQS 提供的模板方法主要分為三類:

<code>AQS
/<code>

下面我們就來具體說一說 AQS 是如何實現線程同步的。

2.4 AQS如何實現線程同步

前面提過, AQS 通過一個 int 類型的變量 state 來記錄當前的同步狀態,也可以理解為鎖的狀態,根據 state 的值的不同,可以判斷當前鎖是否已經被獲取。就拿獨佔鎖來說,若我們要實現的是一個獨佔鎖,則鎖被獲取後,其他線程將無法獲取鎖,需要進入阻塞狀態,等待鎖被釋放。而線程獲取鎖就是通過修改 state 的值來實現的,一個線程修改 state 成功,則表示它成功獲得了鎖;若失敗,則表示已經有其他線程獲得了鎖,則它需要進入阻塞狀態。下面我們就來聊一聊 AQS 如何實現維持多個線程等待的。

首先說明結論: AQS通過一個同步隊列來維護當前獲取鎖失敗,進入阻塞狀態的線程 。這個同步隊列是一個雙向鏈表,獲取鎖失敗的線程會被封裝成一個鏈表節點,加入鏈表的尾部排隊,而 AQS 保存了鏈表的頭節點的引用 head 以及鏈表的尾節點引用 tail 。這個同步隊列如下所示:

併發——抽象隊列同步器AQS的實現原理

在這個同步隊列中,每個節點對應一個線程,每個節點都有一個 next 指針指向它的下一個節點,以及一個 prev 指針指向它的上一個節點。隊列中的頭節點 head 就是當前已經獲取了鎖,正在執行的線程對應的節點;而之後的這些節點,則對應著獲取鎖失敗,正在排隊的線程。

當一個線程獲取鎖失敗,它會被封裝成一個 Node ,加入同步隊列的尾部排隊,同時線程會進入阻塞狀態。也就是說,在同步隊列中,除了頭節點對應的線程是運行狀態,其餘的線程都是等待睡眠狀態。而當頭節點對應的線程釋放鎖時,它會喚醒它的下一個節點(也就是上圖中的第二個節點),被喚醒的節點對應的線程開始嘗試獲取鎖,若獲取成功,它就會將自己置為 head ,然後將原來的 head 移出隊列。接下來我們就通過源碼,具體分析一下 AQS 的實現過程。

2.5 獨佔鎖的獲取與釋放過程

(1)獲取鎖的實現

AQS 的鎖功能齊全,它既可以用來實現獨佔鎖,也可以用來實現共享鎖。

獨佔鎖:也叫排他鎖,即鎖只能由一個線程獲取,若一個線程獲取了鎖,則其他想要獲取鎖的線程只能等待,直到鎖被釋放。比如說寫鎖,對於寫操作,每次只能由一個線程進行,若多個線程同時進行寫操作,將很可能出現線程安全問題;

共享鎖:鎖可以由多個線程同時獲取,鎖被獲取一次,則鎖的計數器+1。比較典型的就是讀鎖,讀操作並不會產生副作用,所以可以允許多個線程同時對數據進行讀操作,而不會有線程安全問題,當然,前提是這個過程中沒有線程在進行寫操作;

我們首先分析一下獨佔鎖。在 AQS 中,通過方法 acquire 來獲取獨佔鎖, acquire 方法的代碼如下:

<code>public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}/<code>

上面的方法執行流程如下:

  1. 首先調用 tryAcquire 嘗試獲取一次鎖,若返回 true ,表示獲取成功,則 acquire 方法將直接返回;若返回 false ,則會繼續向後執行 acquireQueued 方法;
  2. tryAcquire 返回 false 後,將執行 acquireQueued ,但是這個方法傳入的參數調用了 addWaiter 方法;
  3. addWaiter 方法的作用是將當前線封裝成同步隊列的節點,然後加入到同步隊列的尾部進行排隊,並返回此節點;
  4. addWaiter 方法執行完成後,將它的返回值作為參數,調用 acquireQueued 方法。 acquireQueued 方法的作用是讓當前線程在同步隊列中阻塞,然後在被其他線程喚醒時去獲取鎖;
  5. 若線程被喚醒併成功獲取鎖後,將從 acquireQueued 方法中退出,同時返回一個 boolean 值表示當前線程是否被中斷,若被中斷,則會執行下面的 selfInterrupt 方法,響應中斷;下面我們就來具體分析這個方法中調用的幾個方法的執行流程。首先第一個 tryAcquire 方法:
<code>protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}/<code>

可以看到,這個方法的實現僅僅只是拋出了一個異常。我們之前提過, AQS 是基於模板方法設計模式實現的,在其中定義了許多模板方法,在模板方法中會調用一些沒有實現的方法,這些方法需要使用者根據自己的需求實現。而 acquire 方法就是一個模板方法,其中調用的 tryAcquire 方法就是需要我們自己實現的方法。 tryAcquire 的作用就是嘗試修改 state 值,也就是獲取鎖,若修改成功,則返回 true ,否則返回 false 。它的實現需要根據 AQS 的子類具體分析,比如 ReentrantLock 中的 Sync ,這裡我就不詳細敘述了,後面寫一篇專門講 ReentrantLock 的博客。下面來看看 addWaiter 的源碼:

<code>// 將線程封裝成一個節點,放入同步隊列的尾部
private Node addWaiter(Node mode) {
// 當前線程封裝成同步隊列的一個節點Node
Node node = new Node(Thread.currentThread(), mode);
// 這個節點需要插入到原尾節點的後面,所以我們在這裡先記下原來的尾節點
Node pred = tail;
// 判斷尾節點是否為空,若為空表示隊列中還沒有節點,則不執行以下步驟
if (pred != null) {
// 記錄新節點的前一個節點為原尾節點
node.prev = pred;
// 將新節點設置為新尾節點,使用CAS操作保證了原子性
if (compareAndSetTail(pred, node)) {
// 若設置成功,則讓原來的尾節點的next指向新尾節點
pred.next = node;
return node;
}
}
// 若以上操作失敗,則調用enq方法繼續嘗試(enq方法見下面)
enq(node);
return node;
}

private Node enq(final Node node) {
// 使用死循環不斷嘗試
for (;;) {
// 記錄原尾節點
Node t = tail;
// 若原尾節點為空,則必須先初始化同步隊列,初始化之後,下一次循環會將新節點加入隊列

if (t == null) {
// 使用CAS設置創建一個默認的節點作為首屆點
if (compareAndSetHead(new Node()))
// 首尾指向同一個節點
tail = head;
} else {
// 以下操作與addWaiter方法中的if語句塊內一致
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}/<code>

以上就是 addWaiter 方法的實現過程,我在代碼中使用註釋對每一步進行了詳細的解析,它的執行過程大致可以總結為: 將新線程封裝成一個節點,加入到同步隊列的尾部,若同步隊列為空,則先在其中加入一個默認的節點,再進行加入;若加入失敗,則使用死循環(也叫自旋)不斷嘗試,直到成功為止 。這個過程中使用 CAS 保證了添加節點的原子性。下面看看 acquireQueued 方法的源碼:

<code>/**
* 讓線程不間斷地獲取鎖,若線程對應的節點不是頭節點的下一個節點,則會進入等待狀態
* @param node the node
*/
final boolean acquireQueued(final Node node, int arg) {
// 記錄失敗標誌
boolean failed = true;
try {

// 記錄中斷標誌,初始為true
boolean interrupted = false;
// 循環執行,因為線程在被喚醒後,可能再次獲取鎖失敗,需要重寫進入等待
for (;;) {
// 獲取當前線程節點的前一個節點
final Node p = node.predecessor();
// 若前一個節點是頭節點,則tryAcquire嘗試獲取鎖,若獲取成功,則執行if中的代碼
if (p == head && tryAcquire(arg)) {
// 將當前節點設置為頭節點
setHead(node);
// 將原來的頭節點移出同步隊列
p.next = null; // help GC
// 失敗標誌置為false
failed = false;
// 返回中斷標誌,acquire方法可以根據返回的中斷標誌,判斷當前線程是否被中斷
return interrupted;
}
// shouldParkAfterFailedAcquire方法判斷當前線程是否能夠進入等待狀態,
// 若當前線程的節點不是頭節點的下一個節點,則需要進入等待狀態,
// 在此方法內部,當前線程會找到它的前驅節點中,第一個還在正常等待或執行的節點,
// 讓其作為自己的直接前驅,然後在需要時將自己喚醒(因為其中有些線程可能被中斷),
// 若找到,則返回true,表示自己可以進入等待狀態了;

// 則繼續調用parkAndCheckInterrupt方法,當前線程在這個方法中等待,
// 直到被其他線程喚醒,或者被中斷後返回,返回時將返回一個boolean值,
// 表示這個線程是否被中斷,若為true,則將執行下面一行代碼,將中斷標誌置為true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 上面代碼中只有一個return語句,且return的前一句就是failed = false;
// 所以只有當異常發生時,failed才會保持true的狀態運行到此處;
// 異常可能是線程被中斷,也可能是其他方法中的異常,
// 比如我們自己實現的tryAcquire方法
// 此時將取消線程獲取鎖的動作,將它從同步隊列中移除
if (failed)
cancelAcquire(node);
}
}/<code>

以上就是 acquireQueued 方法的源碼分析。這個方法的作用可以概括為: 讓線程在同步隊列中阻塞,直到它成為頭節點的下一個節點,被頭節點對應的線程喚醒,然後開始獲取鎖,若獲取成功才會從方法中返回 。這個方法會返回一個 boolean 值,表示這個正在同步隊列中的線程是否被中斷。

到此,獲取獨佔鎖的實現就分析完畢了。需要注意的是,這些過程中使用的 compareAndSetXXX 這種形式的方法,都是基於 CAS 機制實現的,保證了這些操作的原子性。

(2)釋放鎖的實現

分析完獲取獨佔鎖的代碼後,我們再來看看釋放鎖的實現。釋放獨佔鎖是通過 release 方法實現的:

<code>public final boolean release(int arg) {
// 調用tryRelease嘗試修改state釋放鎖,若成功,將返回true,否則false
if (tryRelease(arg)) {
// 若修改state成功,則表示釋放鎖成功,需要將當前線程移出同步隊列
// 當前線程在同步隊列中的節點就是head,所以此處記錄head
Node h = head;
// 若head不是null,且waitStatus不為0,表示它是一個裝有線程的正常節點,
// 在之前提到的addWaiter方法中,若同步隊列為空,則會創建一個默認的節點放入head
// 這個默認的節點不包含線程,它的waitStatus就是0,所以不能釋放鎖
if (h != null && h.waitStatus != 0)
// 若head是一個正常的節點,則調用unparkSuccessor喚醒它的下一個節點所對應的線程
unparkSuccessor(h);
// 釋放成功
return true;
}

// 釋放鎖失敗
return false;
}/<code>

以上就是同步隊列中頭節點對應的線程釋放鎖的過程。 release 也是一個模板方法,其中通過調用 tryRelease 嘗試釋放鎖,而 tryRelease 也需要使用者自己實現。在之前也說過,頭節點釋放鎖時,需要喚醒它的下一個節點對應的線程,讓這個線程不再等待,去獲取鎖,而這個過程就是通過 unparkSuccessor 方法實現的。

2.6 共享鎖的獲取與釋放過程

前面提到過, AQS 不僅僅可以用來實現獨佔鎖,還可以用來實現共享鎖,下面我們就來看看 AQS 中,有關共享鎖的模板方法的實現。首先是獲取共享鎖的實現,在 AQS 中,定義了 acquireShared 方法用來獲取共享鎖:

<code>public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}/<code>

可以看到,這個方法比較簡短。首先調用 tryAcquireShared 方法嘗試獲取一次共享鎖,即修改 state 的值,若返回值 >=0 ,則表示獲取成功,線程不受影響,繼續向下執行;若返回值小於 0 ,表示獲取共享鎖失敗,則線程需要進入到同步隊列中等待,調用 doAcquireShared 方法。 acquireShared 方法也是 AQS 的一個模板方法,而其中的 tryAcquireShared 方法就是需要使用者自己實現的方法。下面我們來看看 doAcquireShared 方法的實現:

<code>/**
* 不間斷地獲取共享鎖,若線程對應的節點不是頭節點的下一個節點,將進入等待狀態
* 實現與acquireQueued非常類似
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 往同步隊列的尾部添加一個默認節點,Node.SHARED是一個Node常量,
// 它的值就是一個不帶任何參數的Node對象,也就是new Node();
final Node node = addWaiter(Node.SHARED);
// 失敗標誌,默認為true
boolean failed = true;
try {
// 中斷標誌,用來判斷線程在等待的過程中釋放被中斷
boolean interrupted = false;
// 死循環不斷嘗試獲取共享鎖
for (;;) {
// 獲取默認節點的前一個節點
final Node p = node.predecessor();
// 判斷當前節點的前一個節點是否為head節點
if (p == head) {
// 嘗試獲取共享鎖
int r = tryAcquireShared(arg);
// 若r>0,表示獲取成功
if (r >= 0) {
// 當前線程獲取鎖成功後,調用setHeadAndPropagate方法將當前線程設置為head
// 同時,若共享鎖還能被其他線程獲取,則在這個方法中也會向後傳遞,喚醒後面的線程

setHeadAndPropagate(node, r);
// 將原來的head的next置為null
p.next = null; // help GC
// 判斷當前線程是否中斷,若被中斷,則調用selfInterrupt方法響應中斷
if (interrupted)
selfInterrupt();
// 失敗標誌置為false
failed = false;
return;
}
}
// 以下代碼和獲取獨佔鎖的acquireQueued方法相同,即讓當前線程進入等待狀態
// 具體解析可以看上面acquireQueued方法的解析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}/<code>

doAcquireShared 方法的實現和獲取獨佔鎖中的 acquireQueued 方法很類似,但是主要有一點不同,那就是 線程在被喚醒後,若成功獲取到了共享鎖,還需要判斷共享鎖是否還能被其他線程獲取,若可以,則繼續向後喚醒它的下一個節點對應的線程 。下面再看看釋放共享鎖的代碼,釋放共享鎖時通過方法 releaseShared :

<code>public final boolean releaseShared(int arg) {
// 嘗試修改state的值釋放鎖
if (tryReleaseShared(arg)) {
// 若成功,則調用以下方法喚醒後繼節點中的線程

doReleaseShared();
return true;
}
return false;
}/<code>

releaseShared 也是一個模板方法,它通過調用使用者自己實現的 tryReleaseShared 方法嘗試釋放鎖,修改 state 的值,若返回 true ,表示修改成功,則繼續向下調用 doReleaseShared 喚醒 head 的下一個節點對應的線程,讓它開始嘗試獲取鎖;若修改 state 失敗,則返回 false 。

2.7 使用AQS實現一個鎖

介紹完上面的內容,下面我們就來基於 AQS 實現一個自己的同步器,或者說鎖。我們需要實現的鎖要求如下:

實現一個鎖,它是一個共享鎖,但是每次至多支持兩個線程同時獲取鎖,若當前已經有兩個線程獲取了鎖,則其他獲取鎖的線程需要等待。

實現代碼如下:

<code>/**
* 抽象隊列同步器(AQS)使用:
* 實現一個同一時刻至多隻支持兩個線程同時執行的同步器
*/

// 讓當前類繼承Lock接口
public class TwinLock implements Lock {

// 定義鎖允許的最大線程數

private final static int DEFAULT_SYNC_COUNT = 2;
// 創建一個鎖對象,用以進行線程同步,Sync繼承自AQS
private final Sync sync = new Sync(DEFAULT_SYNC_COUNT);

// 以內部類的形式實現一個同步器類,也就是鎖,這個鎖繼承自AQS
private static final class Sync extends AbstractQueuedSynchronizer {

// 構造方法中指定鎖支持的線程數量
Sync(int count) {
// 若count小於0,則默認為2
if (count <= 0) {
count = DEFAULT_SYNC_COUNT;
}
// 設置初始同步狀態
setState(count);
}

/**
* 重寫tryAcquireShared方法,這個方法用來修改同步狀態state,也就是獲取鎖
*/
@Override
protected int tryAcquireShared(int arg) {
// 循環嘗試
for (; ; ) {
// 獲取當前的同步狀態
int nowState = getState();
// 計算當前線程獲取鎖後,新的同步狀態
// 注意這裡使用了減法,因為此時的state表示的是還能支持多少個線程
// 而當前線程如果獲得了鎖,則state就要減小
int newState = nowState - arg;

// 如果newState小於0,表示當前已經沒有剩餘的資源了

// 則當前線程不能獲取鎖,此時將直接返回小於0的newState;
// 或者newState>0,就會執行compareAndSetState方法修改state的值,
// 若修改成功將,將返回大於0的newState;
// 若修改失敗,則表示有其他線程也在嘗試修改state,此時循環一次後,再次嘗試
if (newState < 0 || compareAndSetState(nowState, newState)) {
return newState;
}
}
}

/**
* 嘗試釋放同步狀態
*/
@Override
protected boolean tryReleaseShared(int arg) {
for (; ; ) {
// 獲取當前同步狀態
int nowState = getState();
// 計算釋放後的新同步狀態,這裡使用加法,
// 表示有線程釋放鎖後,當前鎖可以支持的線程數量增加了
int newState = nowState + arg;
// 使用CAS修改同步狀態,若成功則返回true,否則自旋
if (compareAndSetState(nowState, newState)) {
return true;
}
}
}

}


/**
* 獲取鎖的方法
*/

@Override
public void lock() {
// 這裡調用的是AQS的模板方法acquireShared,
// 在acquireShared中將調用我們重寫的tryAcquireShared方法
// 傳入參數為1表示當前線程,當前線程獲取鎖後,state將-1
sync.acquireShared(1);
}

/**
* 解鎖
*/
@Override
public void unlock() {
// 這裡調用的是AQS的模板方法releaseShared,
// 在acquireShared中將調用我們重寫的tryReleaseShared方法
// 傳入參數為1表示當前線程,當前線程釋放鎖後,state將+1
sync.releaseShared(1);
}

/*******************其他需要實現的方法省略***************************/

}/<code>

以上就實現了一個支持兩個線程同時允許的共享鎖,下面我們通過一個測試代碼來測試效果:

<code>public static void main(String[] args) throws InterruptedException {
\t// 創建一個我們自定義的鎖對象
Lock lock = new TwinLock();

// 啟動10個線程去嘗試獲取鎖
for (int i = 0; i < 10; i++) {
Thread t = new Thread(()->{
// 循環執行

while (true) {
// 獲取鎖
lock.lock();
try {
// 休眠1秒
Thread.sleep(1000);
// 輸出線程名稱
System.out.println(Thread.currentThread().getName());
// 再次休眠一秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 釋放鎖
lock.unlock();
}
}
});
\t\t// 將線程設置為守護線程,主線程結束後,收穫線程自動結束
t.setDaemon(true);
t.start();
}

\t// 主線程每隔1秒輸出一個分割行
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println("********************************");
}
}/<code>

以上測試代碼運行後,在每兩個分割行之間,最多不會輸出超過兩個線程的名稱,線程名稱的輸出將會以兩個一隊出現。我的輸出結果如下:

<code>********************************
Thread-1
Thread-0
********************************
********************************
Thread-2

Thread-1
********************************
********************************
Thread-2
Thread-1
********************************
********************************
Thread-2
Thread-3
********************************
********************************
Thread-3
Thread-4
********************************/<code>

2.8 AQS如何實現線程等待

在研究 AQS 的過程中,我一直有這個疑惑—— AQS 如何讓線程阻塞,直到最後才知道有一個叫 LockSupport 的工具類。這個工具類定義了很多靜態方法,當需要讓一個阻塞,或者喚醒一個線程時,就可以調用這個類中的方法,它的底層實現是通過一個 sun.misc.Unsafe 類的對象, unsafe 類的方法都是本地方法,由其他語言實現,這個類是給不支持地址操作的 Java ,提供的一個操作內存地址的後門。

AQS 中通過以下兩個方法來阻塞和喚醒線程:

  • LockSupport.park() :阻塞當前線程;
  • LockSupport.unpark(Thread thread) :將參數中傳入的線程喚醒;

前面講解 AQS 的代碼中,用到了方法 unparkSuccessor ,它的主要作用就是喚醒當前節點的下一個節點對應的線程,我們可以看看它的部分實現:

<code>private void unparkSuccessor(Node node) {

// ...........省略其他代碼............

// 以下代碼即為喚醒當前節點的下一個節點對應的線程
Node s = node.next;
if (s != null)
LockSupport.unpark(s.thread);\t// 使用LockSupport
}/<code>

三、總結

其實 AQS 還支持一些其他的方法,比如說在獲取鎖時設置超時時間等,這些方法的實現與上面介紹的幾種大同小異,限於篇幅,這裡就不進行敘述了。以上內容對 AQS 的實現原理以及主要方法的實現做了一個比較細緻的介紹,相信看完之後會對 AQS 有一個比較深入的理解,但是想要理解以上內容,需要具備併發的一些基礎知識,比如說線程的狀態, CAS 機制等。最後希望這篇博客對需要的人有所幫助吧。


分享到:


相關文章: