高性能解決線程飢餓的利器 StampedLock

概覽

在 JDK 1.8 引入 StampedLock,可以理解為對 ReentrantReadWriteLock 在某些方面的增強,在原先讀寫鎖的基礎上新增了一種叫樂觀讀(Optimistic Reading)的模式。該模式並不會加鎖,所以不會阻塞線程,會有更高的吞吐量和更高的性能。

跟著“碼哥字節”帶著問題一起來看StampedLock給我們帶來了什麼…

  • 有了ReentrantReadWriteLock,為何還要引入StampedLock?
  • 什麼是樂觀讀?
  • 在讀多寫少的併發場景下,StampedLock如何解決寫線程難以獲取鎖的線程“飢餓”問題?
  • 什麼樣的場景使用?
  • 實現原理分析,是通過 AQS 實現還是其他的?

特性

它的設計初衷是作為一個內部工具類,用於開發其他線程安全的組件,提升系統性能,並且編程模型也比ReentrantReadWriteLock 複雜,所以用不好就很容易出現死鎖或者線程安全等莫名其妙的問題。

三種訪問數據模式

  • Writing(獨佔寫鎖):writeLock 方法會使線程阻塞等待獨佔訪問,可類比ReentrantReadWriteLock 的寫鎖模式,同一時刻有且只有一個寫線程獲取鎖資源;
  • Reading(悲觀讀鎖):readLock方法,允許多個線程同時獲取悲觀讀鎖,悲觀讀鎖與獨佔寫鎖互斥,與樂觀讀共享。
  • Optimistic Reading(樂觀讀):這裡需要注意了,是樂觀讀,並沒有加鎖。也就是不會有 CAS 機制並且沒有阻塞線程。僅噹噹前未處於 Writing 模式 tryOptimisticRead才會返回非 0 的郵戳(Stamp),如果在獲取樂觀讀之後沒有出現寫模式線程獲取鎖,則在方法validate返回 true ,允許多個線程獲取樂觀讀以及讀鎖。同時允許一個寫線程獲取寫鎖

支持讀寫鎖相互轉換

ReentrantReadWriteLock 當線程獲取寫鎖後可以降級成讀鎖,但是反過來則不行。

StampedLock提供了讀鎖和寫鎖相互轉換的功能,使得該類支持更多的應用場景。

注意事項

  1. StampedLock是不可重入鎖,如果當前線程已經獲取了寫鎖,再次重複獲取的話就會死鎖;
  2. 都不支持 Conditon 條件將線程等待;
  3. StampedLock 的寫鎖和悲觀讀鎖加鎖成功之後,都會返回一個 stamp;然後解鎖的時候,需要傳入這個 stamp。

詳解樂觀讀帶來的性能提升

那為何 StampedLock 性能比 ReentrantReadWriteLock 好?

關鍵在於StampedLock 提供的樂觀讀,我們知道ReentrantReadWriteLock 支持多個線程同時獲取讀鎖,但是當多個線程同時讀的時候,所有的寫線程都是阻塞的。

StampedLock 的樂觀讀允許一個寫線程獲取寫鎖,所以不會導致所有寫線程阻塞,也就是當讀多寫少的時候,寫線程有機會獲取寫鎖,減少了線程飢餓的問題,吞吐量大大提高。

這裡可能你就會有疑問,竟然同時允許多個樂觀讀和一個先線程同時進入臨界資源操作,那讀取的數據可能是錯的怎麼辦?

是的,樂觀讀不能保證讀取到的數據是最新的,所以將數據讀取到局部變量的時候需要通過 lock.validate(stamp) 校驗是否被寫線程修改過,若是修改過則需要上悲觀讀鎖,再重新讀取數據到局部變量。

同時由於樂觀讀並不是鎖,所以沒有線程喚醒與阻塞導致的上下文切換,性能更好。

其實跟數據庫的“樂觀鎖”有異曲同工之妙,它的實現思想很簡單。我們舉個數據庫的例子。

在生產訂單的表 product_doc 裡增加了一個數值型版本號字段 version,每次更新 product_doc 這個表的時候,都將 version 字段加 1。

<code>

select

id

,... ,

version

from

product_doc

where

id

=

123

/<code>

在更新的時候匹配 version 才執行更新。

<code>

update

product_doc

set

version

=

version

+

1

,...

where

id

=

123

and

version

=

5

/<code>

數據庫的樂觀鎖就是查詢的時候將 version 查出來,更新的時候利用 version 字段驗證,若是相等說明數據沒有被修改,讀取的數據是安全的。

這裡的 version 就類似於 StampedLock 的 Stamp。

使用示例

模仿寫一個將用戶 id 與用戶名數據保存在 共享變量 idMap 中,並且提供 put 方法添加數據、get 方法獲取數據、以及 putIfNotExist 先從 map 中獲取數據,若沒有則模擬從數據庫查詢數據並放到 map 中。

<code>

public

 

class

 

CacheStampedLock

 {          

private

 final Map idMap = 

new

 HashMap<>();     

private

 final StampedLock 

lock

 = 

new

 StampedLock();          

public

 

void

 

put

(

Integer key, String 

value

)

 {         

long

 stamp = 

lock

.writeLock();         

try

 {             idMap.put(key, 

value

);         } 

finally

 {             

lock

.unlockWrite(stamp);         }     }          

public

 String 

get

(

Integer key

)

 {                  

long

 stamp = 

lock

.tryOptimisticRead();                  String currentValue = idMap.

get

(key);                  

if

 (!

lock

.validate(stamp)) {                          stamp = 

lock

.readLock();             

try

 {                 currentValue = idMap.

get

(key);             } 

finally

 {                 

lock

.unlockRead(stamp);             }         }                  

return

 currentValue;     }          

public

 String 

putIfNotExist

(

Integer key, String 

value

)

 {                  

long

 stamp = 

lock

.readLock();         String currentValue = idMap.

get

(key);                  

try

 {             

while

 (Objects.isNull(currentValue)) {                                  

long

 wl = 

lock

.tryConvertToWriteLock(stamp);                                  

if

 (wl != 

0L

) {                                          stamp = wl;                     currentValue = 

value

;                     idMap.put(key, currentValue);                     

break

;                 } 

else

 {                                          

lock

.unlockRead(stamp);                     stamp = 

lock

.writeLock();                 }             }         } 

finally

 {                          

lock

.unlock(stamp);         }         

return

 currentValue;     } } /<code>

上面的使用例子中,需要引起注意的是 get()和 putIfNotExist() 方法,第一個使用了樂觀讀,使得讀寫可以併發執行,第二個則是使用了讀鎖轉換成寫鎖的編程模型,先查詢緩存,當不存在的時候從數據庫讀取數據並添加到緩存中。

在使用樂觀讀的時候一定要按照固定模板編寫,否則很容易出 bug,我們總結下樂觀讀編程模型的模板:

<code>

public

 

void

 

optimisticRead

(

)

 {          

long

 stamp = 

lock

.tryOptimisticRead();          copyVaraibale2ThreadMemory();          

if

 (!

lock

.validate(stamp)) {                  stamp = 

lock

.readLock();         

try

 {                          copyVaraibale2ThreadMemory();         } 

finally

 {                          

lock

.unlockRead(stamp);         }     }          useThreadMemoryVarables(); } /<code>

使用場景和注意事項

對於讀多寫少的高併發場景 StampedLock的性能很好,通過樂觀讀模式很好的解決了寫線程“飢餓”的問題,我們可以使用StampedLock 來代替ReentrantReadWriteLock ,但是需要注意的是 StampedLock 的功能僅僅是 ReadWriteLock 的子集,在使用的時候,還是有幾個地方需要注意一下。

  1. StampedLock是不可重入鎖,使用過程中一定要注意;
  2. 悲觀讀、寫鎖都不支持條件變量 Conditon ,當需要這個特性的時候需要注意;
  3. 如果線程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上時,此時調用該阻塞線程的 interrupt() 方法,會導致 CPU 飆升。所以,使用 StampedLock 一定不要調用中斷操作,如果需要支持中斷功能,一定使用可中斷的悲觀讀鎖 readLockInterruptibly() 和寫鎖 writeLockInterruptibly()。這個規則一定要記清楚。

原理分析

高性能解決線程飢餓的利器 StampedLock

我們發現它並不像其他鎖一樣通過定義內部類繼承
AbstractQueuedSynchronizer抽象類然後子類實現模板方法實現同步邏輯。但是實現思路還是有類似,依然使用了 CLH 隊列來管理線程,通過同步狀態值 state 來標識鎖的狀態。

其內部定義了很多變量,這些變量的目的還是跟 ReentrantReadWriteLock 一樣,將狀態為按位切分,通過位運算對 state 變量操作用來區分同步狀態。

比如寫鎖使用的是第八位為 1 則表示寫鎖,讀鎖使用 0-7 位,所以一般情況下獲取讀鎖的線程數量為 1-126,超過以後,會使用 readerOverflow int 變量保存超出的線程數。

自旋優化

對多核 CPU 也進行一定優化,NCPU 獲取核數,當核數目超過 1 的時候,線程獲取鎖的重試、入隊錢的重試都有自旋操作。主要就是通過內部定義的一些變量來判斷,如圖所示。

等待隊列

隊列的節點通過 WNode 定義,如上圖所示。等待隊列的節點相比 AQS 更簡單,只有三種狀態分別是:

  • 0:初始狀態;
  • -1:等待中;
  • 取消;

另外還有一個字段 cowait ,通過該字段指向一個棧,保存讀線程。結構如圖所示

高性能解決線程飢餓的利器 StampedLock

同時定義了兩個變量分別指向頭結點與尾節點。

<code> 

private

 transient 

volatile

 WNode whead;

private

 transient 

volatile

 WNode wtail; /<code>

另外有一個需要注意點就是 cowait, 保存所有的讀節點數據,使用的是頭插法。

當讀寫線程競爭形成等待隊列的數據如下圖所示:

高性能解決線程飢餓的利器 StampedLock

獲取寫鎖

<code>

public

 

long

 

writeLock

()

 

{     

long

 s, next;       

return

 ((((s = state) & ABITS) == 

0L

 &&              U.compareAndSwapLong(

this

, STATE, s, next = s + WBIT)) ?             next : acquireWrite(

false

0L

)); } /<code>

獲取寫鎖,如果獲取失敗則構建節點放入隊列,同時阻塞線程,需要注意的時候該方法不響應中斷,如需中斷需要調用 writeLockInterruptibly()。否則會造成高 CPU 佔用的問題。

(s = state) & ABITS 標識讀鎖和寫鎖未被使用,那麼直接執行 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) CAS 操作將第八位設置 1,標識寫鎖佔用成功。CAS 失敗的話則調用 acquireWrite(false, 0L)加入等待隊列,同時將線程阻塞。

另外acquireWrite(false, 0L) 方法很複雜,運用大量自旋操作,比如自旋入隊列。

獲取讀鎖

<code>

public

 

long

 

readLock

()

 

{     

long

 s = state, next;       

return

 ((whead == wtail && (s & ABITS) this, STATE, s, next = s + RUNIT)) ?             next : acquireRead(

false

0L

)); } /<code>

獲取讀鎖關鍵步驟

(whead == wtail && (s & ABITS) < RFULL如果隊列為空並且讀鎖線程數未超過限制,則通過 U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))CAS 方式修改 state 標識獲取讀鎖成功。

否則調用 acquireRead(false, 0L) 嘗試使用自旋獲取讀鎖,獲取不到則進入等待隊列。

acquireRead

當 A 線程獲取了寫鎖,B 線程去獲取讀鎖的時候,調用 acquireRead 方法,則會加入阻塞隊列,並阻塞 B 線程。方法內部依然很複雜,大致流程梳理後如下:

  1. 如果寫鎖未被佔用,則立即嘗試獲取讀鎖,通過 CAS 修改狀態為標誌成功則直接返回。
  2. 如果寫鎖被佔用,則將當前線程包裝成 WNode 讀節點,並插入等待隊列。如果是寫線程節點則直接放入隊尾,否則放入隊尾專門存放讀線程的 WNode cowait 指向的棧。棧結構是頭插法的方式插入數據,最終喚醒讀節點,從棧頂開始。

釋放鎖

無論是 unlockRead 釋放讀鎖還是 unlockWrite釋放寫鎖,總體流程基本都是通過 CAS 操作,修改 state 成功後調用 release 方法喚醒等待隊列的頭結點的後繼節點線程。

  1. 想將頭結點等待狀態設置為 0 ,標識即將喚醒後繼節點。
  2. 喚醒後繼節點通過 CAS 方式獲取鎖,如果是讀節點則會喚醒 cowait 鎖指向的棧所有讀節點。

釋放讀鎖

unlockRead(long stamp) 如果傳入的 stamp 與鎖持有的 stamp 一致,則釋放非排它鎖,內部主要是通過自旋 + CAS 修改 state 成功,在修改 state 之前做了判斷是否超過讀線程數限制,若是小於限制才通過 CAS 修改 state 同步狀態,接著調用 release 方法喚醒 whead 的後繼節點。

釋放寫鎖

unlockWrite(long stamp) 如果傳入的 stamp 與鎖持有的 stamp 一致,則釋放寫鎖,whead 不為空,且當前節點狀態 status != 0 則調用 release 方法喚醒頭結點的後繼節點線程。

總結

StampedLock 並不能完全代替ReentrantReadWriteLock ,在讀多寫少的場景下因為樂觀讀的模式,允許一個寫線程獲取寫鎖,解決了寫線程飢餓問題,大大提高吞吐量。

在使用樂觀讀的時候需要注意按照編程模型模板方式去編寫,否則很容易造成死鎖或者意想不到的線程安全問題。

它不是可重入鎖,且不支持條件變量 Conditon。並且線程阻塞在 readLock() 或者 writeLock() 上時,此時調用該阻塞線程的 interrupt() 方法,會導致 CPU 飆升。如果需要中斷線程的場景,一定要注意調用悲觀讀鎖 readLockInterruptibly() 和寫鎖 writeLockInterruptibly()

另外喚醒線程的規則和 AQS 類似,先喚醒頭結點,不同的是 StampedLock 喚醒的節點是讀節點的時候,會喚醒此讀節點的 cowait 鎖指向的棧的所有讀節點,但是喚醒與插入的順序相反。


分享到:


相關文章: