AQS實現探究——Java 鎖的基架 AQS(二)

AQS實現探究——Java 鎖的基架 AQS(二)

鍵盤很好看

上篇介紹了AQS的概念,AQS 所支持的兩種模式:獨佔模式和共享模式,如何去使用AQS 實現獨佔鎖。這篇就介紹下AQS 是怎麼是將線程鎖住的?以及多個線程爭奪時AQS 做了哪些事情?

知識點簡介

volatile

volatile 是Java 中的關鍵字,主要有兩個特性:可見性和防止重排序。在AQS主要用到第一個特性就是可見性;什麼是可見性呢?學過操作系統的都知道,主內存是存放程序運行時數據的地方,但是為了速度每個CPU 都有自己的高速緩存,緩存自己從主內存加載後的數據,一個線程是運行在CPU上的,當這個線程想加載某個數據時優先從自己的高速緩存裡找,如果沒有找到再從主內存里加載,操作完就放到高速緩存中,別的線程工作時也是優先從自己的高速緩存裡讀,這樣就會讀到舊的數據,最後導致計算出錯,為了避免這種問題,保證多個CPU 的高速緩存是一致的,OS 內部有個緩存一致性協議(eg:MESI), 而volatile 就是使用這種協議來使修飾的變量在多個線程間可見。

CAS

CAS 全稱compareAndSwap,在Java中就是一個函數, 它有三個參數:修改值的內存地址,舊値,期望修改後的値;該函數大致流程如下:

  • 按照內存地址取出舊的値,
  • 與傳進來的舊値比較是否相同:不相同則失敗,否則執行步驟3
  • 將內存地址的値設置為期望值

上述三個動作是具有原子性的,即不可拆分的,對應著處理器的一個原子指令(CMPXCHG),處理器實現原子操作有兩種第一種就是通過對內存總線加鎖,第二種就緩存鎖定只對某個緩存行進行鎖定,第二種對資源的消耗低。

CLH 介紹

官方解釋

CLH鎖是Craig, Landin和Hagersten (CLH)鎖,CLH鎖是旋轉鎖,可以確保沒有飢餓,提供公平先到先得的服務。

CLH鎖是一種可伸縮的、高性能的、公平的自旋鎖,基於列表,應用程序線程僅在本地變量上自旋,它不斷輪詢前驅狀態,如果發現前驅狀態釋放鎖則結束自旋。

簡述某個線程獲取鎖的過程

先將自己加入到隊列中,並將自己的狀態(true/false)設置為true,然後循環判斷隊列中前一個節點(線程)中的狀態是否為false ,這裡的循環即模擬阻塞,直到隊列的前一個節點釋放鎖,將它的狀態設置為false ,則當前節點才獲取鎖。

實操(實現CLH)

簡介:和AQS相同只有一個狀態表示鎖是否空閒,而不是隊列中的每個節點都有一個狀態,此狀態有兩個值1或者0: 1表示鎖已經被搶佔,0表示鎖空閒。

原生的CLH 鎖 存在一個弊端:當前節點的線程會不斷輪詢前一個節點的狀態,它會造成CPU 使用率100%。對該弊端AQS採用阻塞和通知的手段,如果發現線程獲取鎖失敗則將當前線程阻塞住,等前一個線程釋放鎖時,將它的後繼線程阻塞狀態解除即可。思路:

AQS實現探究——Java 鎖的基架 AQS(二)

代碼如下:

**
* CLH鎖:一個種自旋鎖,通過先進先出確保無飢餓的和公平的鎖
*/
public class ClhLock {
// java 的一個不安全的幫助類,支持CAS 操作
private static Unsafe unsafe = UnsafeUtil.getUnsafe().orElse(null);
/**
* 0表示未鎖住
* 1表示鎖住
*/
private volatile int state = 0;
/**
* 獨佔鎖擁有者線程
*/
private Thread exclusiveOwnerThread;
/**
* 獲得鎖的節點
* 開始為空
*/
private transient volatile Node head;
/**
* 尾部節點
* 開始為空
*/
private transient volatile Node tail;

/**
* 為了CAS 操作而設置的變量.
* 用下面這幾個參數獲取對應實體上對應字段的地址,充當CAS 第一個參數
*/
private final static long headOffset;
private final static long tailOffset;
private final static long stateOffset;

static {
try {
if (Objects.isNull(unsafe)) {
throw new IllegalStateException("Unsafe instance has not initialized");
}
headOffset = unsafe.objectFieldOffset(ClhLock.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset(ClhLock.class.getDeclaredField("tail"));
stateOffset = unsafe.objectFieldOffset(ClhLock.class.getDeclaredField("state"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
public ClhLock() {
}
/**
* 嘗試獲取鎖,如果沒有獲取則加入等待隊列
*/
public void lock() {
acquire(1);
}
private void acquire(int arg) {
//加入等待隊列並返回當前節點
Node node = addWaiter(Thread.currentThread());
//輪詢
for (; ; ) {
Node h = head;
//判斷自己的前驅是否是佔有鎖的節點,若是則嘗試獲取鎖
if (node.prev == h && tryAcquire(arg)) {
System.out.println("acquire lock thread:" + Thread.currentThread().getName());
// 獲取鎖成功將自己的節點設置為head
setHead(node);
return;
}
//若獲取鎖失敗則阻塞當前線程
LockSupport.park(node.thread);
}
}
private void setHead(Node node) {
Node h = head;
// 通過cas 函數將 node 設置為head
if (compareAndSetHeadOrTail(headOffset, h, node)) {

//為了gc
node.prev = null;
node.thread = null;
}
}
/**
* 將線程入隊
* @param currentThread 將要入隊的線程
* @return
*/
private Node addWaiter(Thread currentThread) {
//新建一個節點代表當前線程
Node node = new Node(currentThread);
Node t = tail;
//判斷尾部節點是否為空,開始時尾部節點為空
if (t != null) {
//尾部節點不為空則將尾部節點賦給當前節點的前驅
node.prev = t;
//將自己設置為尾部節點,可能不成功,會被其它線程先一步設置,若設置不了則會進入下面的enq
if (compareAndSetHeadOrTail(tailOffset, t, node)) {
t.next = node;
return node;
}
}
//若尾部節點為空(第一個線程進來),或者將當前節點設置為尾部節點失敗
return enq(node);
}
protected boolean tryAcquire(int arg) {
assert arg == 1;
int tempState = getState();
if (tempState == 0 && compareAndSetState(tempState, arg)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean compareAndSetState(int oldValue, int expectValue) {
return unsafe.compareAndSwapInt(this, stateOffset, oldValue, expectValue);
}
private boolean compareAndSetHeadOrTail(long offset, Node oldValue, Node expectValue) {
return unsafe.compareAndSwapObject(this, offset, oldValue, expectValue);
}
/**
* 釋放鎖
*/
public void unLock() {
release();
}
protected void release() {
//嘗試釋放鎖
if (tryRelease(1)) {
Node h = head;
if (h != null) {
//喚醒後繼線程
unParkSuccessor();
}
}
}
private void unParkSuccessor() {
//喚醒後繼線程
Node n = head.next;
//下面邏輯暫時不管,不會出現這種情況
if (n == null) {
for (Node prev = tail.prev; prev != null; prev = prev.prev) {
if (prev != head) {
n = prev;
}
}
}

if (Objects.nonNull(n)) {
LockSupport.unpark(n.thread);
}
}
private boolean tryRelease(int arg) {
assert arg == 1;
int tempState = getState();
if (tempState == 1 && compareAndSetState(tempState, 0)) {
setExclusiveOwnerThread(null);
return true;
}
return false;
}

/**
* 隊列的一個元素
*/
class Node {
/**
* 包含哪個線程,創建時實例化
*/
private Thread thread;
/**
* 前驅
*/
private Node prev;
/**
* 後繼
*/
private Node next;
public Node(Thread thread) {
this.thread = thread;
}
public Thread getThread() {
return thread;
}
public Node getPrev() {
return prev;
}
public void setPrev(Node prev) {
this.prev = prev;
}
public Node() {
}
}
public int getState() {
return state;
}
public Node getHead() {
return head;
}
public Node getTail() {
return tail;
}
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
this.exclusiveOwnerThread = exclusiveOwnerThread;
}

個人建議:大家在看的時候,在草稿紙上比劃下。因為鏈表比數組要抽象的。

public class ClhLockTest {
private static int i = 1;
public static void incr() {
i++;
}
@Test
public void testLock() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(100);
ClhLock clhLock = new ClhLock();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
clhLock.lock();
incr();
clhLock.unLock();
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
Assertions.assertEquals(101,i);
}
}

AQS 中的CLH

簡述

AQS使用了CLH 先入先出的思想,但是並不是公平的,而是搶佔式的,但這取決於子類的實現方式。上文也說到AQS 只維護一個狀態state,而不像原生CLH 那樣每個節點都有自己的狀態。

Node 類的 構造

Node 類表示隊列中的某個元素

屬性

AQS實現探究——Java 鎖的基架 AQS(二)

Node 類waitStatus 的幾種狀態介紹

AQS實現探究——Java 鎖的基架 AQS(二)

獲取獨佔鎖過程

源碼分析 註釋

  • 嘗試獲取鎖,由子類實現
public final void acquire(int arg) {
//調用tryAcquire函數,嘗試修改state,該函數有子類實現,返回true 或者false
if (!tryAcquire(arg) &&
//若獲取失敗 則開始入隊等待獲取鎖
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// acquireQueued 會判斷該線程在獲取完鎖的時候是否被中斷,
//若中斷則繼續獲取鎖,獲取完再中斷自己
selfInterrupt();
}
  • 添加到隊列尾部
/**
* 為當前線程和給定的模式(獨佔模式或者共享模式)創建節點並將節點入隊
* @param mode Node.EXCLUSIVE 或者 Node.SHARED ——獨佔或者共享
* @return
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);

// 嘗試最快的方式入隊,失敗就就進入enq,不斷地輪詢直到成功入隊
//入隊就是設置tail 節點,因為會存在競爭的情況所以會設置失敗
Node pred = tail;
if (pred != null) {
//將當前節點的前驅設置為尾部節點,然後嘗試將當前節點設置為尾部節點
node.prev = pred;
if (compareAndSetTail(pred, node)) {
//入隊成功,將其前驅的後繼設置為當前節點
pred.next = node;
return node;
}
}
//以輪詢的方式入隊
enq(node);
// 入隊成功返回當前節點
return node;
}
/**
* 將節點插入到隊列中,如果需要的話初始化tail節點和head 節點
* @param node 要插入到隊列中的節點
* @return 節點的後繼
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//如果尾部節點為空,則需要將head 和tail 初始化,延遲加載有不有
if (compareAndSetHead(new Node()))
tail = head;
} else {
//若尾部節點不為空則

// 將當前節點的前驅設置為尾部節點,
// 然後嘗試將當前節點設置為尾部節點
node.prev = t;
if (compareAndSetTail(t, node)) {
//入隊成功,將其前驅的後繼設置為當前節點
t.next = node;
return t;
}
}
}
}
  • 已在隊列中的線程以獨佔模式獲取鎖
 /**
*
*已在隊列中的線程以獨佔模式獲取鎖並且不可中斷(不是重點)
* @param node 當前線程對應的節點
* @param arg 獲取參數 與子類定義有關不用管
* @return 如果在等待的時候被中斷則返回true
*/
final boolean acquireQueued(final Node node, int arg) {
//表示獲取鎖是否失敗,若最後還是未成功則取消該節點
boolean failed = true;
try {
//標識當前線程(節點)是否中斷
boolean interrupted = false;
//輪訓
for (;;) {
//獲取前驅

final Node p = node.predecessor();
//判斷前驅是否是head 節點,若是則嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
//若獲取鎖成功,則將當前節點設置為head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果獲取鎖失敗,則設置前驅的waitStatus為SIGNAL :下一個循環若還是失敗則阻塞當前線程
if (shouldParkAfterFailedAcquire(p, node) &&
//若前驅的waitStatus 為SIGNAL 則將當前線程阻塞,被喚醒的時候返回該線程是否被中斷
parkAndCheckInterrupt())
// 若被中斷
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

流程

  1. 調用tryAcquire方法嘗試獲取鎖,成功就代表已獲取鎖
  2. 第一步失敗則入隊
  3. 入隊後,然後開始輪詢嘗試獲取鎖
  4. 第三步輪詢兩次失敗就將自己阻塞住,等待前驅釋放時解除自己的阻塞狀態

釋放獨佔鎖過程

源碼分析 註釋

 public final boolean release(int arg) {
//嘗試釋放鎖由子類實現,正常返回true
if (tryRelease(arg)) {
Node h = head;
//如果head 不為空,且head的waitStatus 不為0(其後繼等待時將其設置為SIGNAL)
if (h != null && h.waitStatus != 0)
//喚醒後繼
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 喚醒給定節點的後繼
* @param node 給定的節點
*/
private void unparkSuccessor(Node node) {
/**
* 若果waitStatus 小於零則需要被喚醒
* 這裡只考慮為SIGNAL
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 喚醒後繼
* 通常其後繼就是下一個,但是因為可能後繼取消或者為空
* 需要從尾部向前遍歷尋找一個距離head最近的不為空且沒有被取消的節點

*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//喚醒後繼線程
LockSupport.unpark(s.thread);
}

流程

  1. 調用tryRelease 釋放鎖
  2. 若head 節點不為空且waitStatus 不等於0,則喚醒後繼線程

獲取共享鎖過程

源碼分析 註釋

 public final void acquireShared(int arg) {
/**
* 嘗試獲取共享鎖
* tryAcquired 返回 一個整數,為負數則獲取失敗,不同的子類實現方式不一樣
* Semaphore 類返回的是剩餘多少許可證
*/
if (tryAcquireShared(arg) < 0)
//獲取失敗 則入隊等待獲取

doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 以共享模式入隊,如何入隊上文已介紹
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//判斷當前節點的前驅是否是頭節點
if (p == head) {
//若是頭節點,則嘗試獲取鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
//獲取鎖成功
/**
* 將當前節點設置為頭
* 判斷其後繼節點是否為shared
* 若是則釋放自己,因為共享模式表示多個線程都可以同時佔有鎖
* 注意第二個參數 是tryAcquireShared 的返回值,方法裡會用到
*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//和獨佔鎖相同 具體可以上面
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);

}
}
/**
* 設置隊列的頭,
* 並檢查如果後繼以共享模式等待且propagate(tryAcquireShared 返回值) >0
* 或者waiStatus 被設置為PROPAGATE
* @param node 當前節點
* @param propagate 一個tryAcquireShared 的返回值
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 當propagate 大於 零
* 或者h.waitStatus 小於0
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//釋放共享鎖,下面詳解
doReleaseShared();
}
}

流程

  1. 嘗試獲取共享鎖成功即返回
  2. 第一步嘗試獲取鎖失敗,添加到隊列尾部,若需要初始化則先初始化head和tail 節點
  3. 然後再次嘗試獲取鎖,若失敗則將waitStatus 設置為SIGNAL,下一個循環獲取鎖失敗,則掛起當前線程。

釋放共享鎖過程

源碼分析 註釋

 /**
* 釋放共享鎖,喚醒它的後繼並保證傳播
*/
private void doReleaseShared() {
/*
* 保證一個釋放會傳播下去,即使有正在獲取或釋放。
* 如果需要,以一個通常的方式去喚醒head的後繼,
* 如果不需要,就把head 的status 設置為PROPAGATE,
* 保證後面的釋放傳播可以繼續
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果h 的waitStatus 是SIGNAL,則先嚐試將其設置為0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//喚醒後繼
unparkSuccessor(h);
}
//若waitStatus 為0,則將其設置為PROPAGATE,為後續的釋放保證傳播這個行為
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed

break;
}
}

流程

  1. 解除對共享鎖的佔用
  2. 喚醒head的後繼並保證傳播

後記

AQS 作為Java 鎖的基架,其重要性不言而喻。本文只是從獲取鎖釋放鎖的角度去分析,沒有涉及到waitStatus 為條件的情況以及取消狀態進行分析,下一節將從這個角度去分析下AQS,大家有啥疑問以及以及指正咱們評論區見。


分享到:


相關文章: