乾貨:多線程-ConcurrentHashMap源碼詳解

概述:

HashMap是集合中最常用的數據結構之一,由於HashMap非線程安全,因此不能用於併發訪問場景。在jdk1.5之前,通常使用HashTable作為HashMap的線程安全版本。HashMap對讀寫操作進行全局加鎖,在高併發的條件下會造成嚴重的鎖競爭和等待,極大地降低了系統的吞吐量。

乾貨:多線程-ConcurrentHashMap源碼詳解

優點:

相比於HashTable以及Collections.synchronizedMap(),ConcurrentHashMap在線程安全的的基礎上提供了更好的寫併發能力,並且讀操作(get)通常不會阻塞,使得讀寫操作可併發執行,支持客戶端修改ConcurrenHashMap的併發訪問度,迭代期間也不會拋出ConcurrentModificationException等等。

缺點:

一致性問題:這是當前所有分佈式系統都面臨的問題。

注意:

ConcurrentHashMap中key和value值都不能為null,HashMap中key可以為null,HashTable中key不能為null。

ConcurrentHashMap是線程安全的類並不能保證使用ConcurrentHashMap的操作都是線程安全的。

返回目錄

ConcurrentHashMap實現原理:

ConcurrentHashMap的基本策略是將table細分為多個Segment保存在數組segments中,每個Segment本身又是一個可併發的哈希表,同時每個Segment都是一把ReentrantLock鎖,只有在同一個Segment內才存在競爭關係,不同的Segment之間沒有鎖競爭,這就是分段鎖機制。Segment內部擁有一個HashEntry數組,數組中的每個元素又是一個鏈表。

為了減少佔用空間,除了第一個Segment之外,剩餘的Segment採用的是延遲初始化的機制,僅在第一次需要時才會創建(ensureSegment實現) 為了保證延遲初始化存在的可見性,訪問segments數組以及table數組的元素,均通過volatile訪問,主要藉助於Unsafe中原子操作getObjectVolatile來實現,此外,segments中segment的寫入以及table中元素和next域的寫入均使用UNSAFE.putOrderedObject來完成。這些操作提供了AtomicReferenceArrays的功能。

源碼解析:

繼承關係:

public class ConcurrentHashMap
extends AbstractMap //集合一些基本功能的實現
implements ConcurrentMap, //需要實現幾個刪除添加操作
Serializable { //序列化

返回目錄

成員變量:

 static final int DEFAULT_INITIAL_CAPACITY = 16; //默認容量
static final float DEFAULT_LOAD_FACTOR = 0.75f; //加載因子
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//默認併發度,該參數影響segments數組的長度
static final int MAXIMUM_CAPACITY = 1 << 30; //最大容量
//最大容量,構造ConcurrentHashMap時指定的大小超過該值,則會使用該值替換
//ConcurrentHashMap的大小必須是2的冪,且小於等於1<<30 ,以確保不超過int的範圍來索引條目
// 注意,這是集合元素最大容量,而不是線程最大個數
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//table數組的最小長度,必須是2的冪,至少為2,以免延遲構造後立即調整大小
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

//允許的最大segment數量,用於限定構造函數參數concurrent_level的邊界
//也就是最大允許的線程數量。
static final int RETRIES_BEFORE_LOCK = 2;
//非鎖定情況下調用size和containsValue方法的重試次數,
//避免由於table連續修改導致無限重試,次數超過則對全局加鎖。
private transient final int hashSeed = randomHashSeed(this);
//和當前相關聯,用於keyhash的隨機值,用來減少hash衝突
private static int randomHashSeed(ConcurrentHashMap instance) {
if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {
return sun.misc.Hashing.randomHashSeed(instance);
}
return 0;
}
final int segmentMask;
//用於索引segment的掩碼值(只留下高位),key高位hash碼用來選擇segment
final int segmentShift; //用來索引segment偏移值
final Segment[] segments; //數組+數組+鏈表 //segments創建後其容量不可變
transient Set keySet;
transient Set<map.entry>> entrySet;
transient Collection values;
/<map.entry>

返回目錄

構造函數:

initialCapacity: 創建ConcurrentHashMap對象的初始容量,即HashEntity的總數量,創建時未指定initialCapacity默認16 ,最大容量為MAXIMUM_CAPACITY.

LoadFactor: 負載因子,用於計算Segment的threshlod域

concurrencyLevel: 即ConcurrentHashMap的併發度,支持同時更新ConccurentHashMap且不發生鎖競爭的最大線程數。 但是其並不代表實際併發度,因為會使用大於等於該值的2的冪指數的最小值作為實際併發度,實際併發度即為segments數組的長度。如未指定則默認為16 ;

注意:併發度對ConccurentHashMap性能具有舉足輕重的作用,如果併發度設置過小,會帶來嚴重的鎖競爭問題;如果併發度設置過大,原本位於同一個Segment內的訪問會擴散到不同的Segment中,CPU cache命中率會下降,從而引起程序性能下降。

 @SuppressWarnings("unchecked")
//有參構造
public ConcurrentHashMap(int initialCapacity, //指定容量
float loadFactor, int concurrencyLevel) {
//加載因子和最大線程容量
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
//給定線程容量大於默認最大容量,採用默認最大容量

concurrencyLevel = MAX_SEGMENTS;
/*
* 尋找與給定參數concurrencylevel匹配的最佳數組ssize,
* 必須是2的冪,如果concurrencylevel是2的冪,那麼ssize就是
* concurrencyevel,否則concurrencylevel為ssize大於concurrency最小2的冪
*例 : concurrencyLevel為7,則ssize為2^3=8;

*/

// Find power-of-two sizes best matching arguments
int sshift = 0; //記錄左移次數,用來計算segment最大偏移值
int ssize = 1; //segment數組長度,也是最大線程數量
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1; //採用位運算而不是直接使用concurrenylevel,
//因為此值可能不一定是2的冪
}
this.segmentShift = 32 - sshift; //索引偏移量
this.segmentMask = ssize - 1; //-1是為了將低位二進制全部變1,達到掩碼目的
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//給定容量大於默認最大容量,採用默認最大容量
// 防止索引條目超出int值範圍
int c = initialCapacity / ssize; //每個table數組的最大容量
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY; //table數組最小長度
while (cap < c) //因為必須是2的冪,所以採用此方式,保證cap最終是比c的2的冪函數
cap <<= 1;
// create segments and segments[0]
Segment s0 = //創建segments和第一個segment
new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize]; //segments
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

//指定最大容量和加載因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity) { //指定最大容量
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap() { //默認構造
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
//由一個集合構建
public ConcurrentHashMap(Map extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
//集合元素數量處以加載因子就是當前集合容量
DEFAULT_INITIAL_CAPACITY), //默認容量,兩個去最大值
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); //默認加載因子,默認併發度
putAll(m); //將集合中元素添加到當前集合中
}

返回目錄

原子方法:

ConcurrentHashMap主要使用下面幾個方法對segments數組和table數組進行讀寫,並且保證線程安全性。

其主要使用了UNSAFE.getObjectVolatile提供的volatile讀語義,UNSAFE.getObjectVolatile提供了Volatile寫語義。

使用其好處為:

UNSAFE.getObjectVolatile使得非volatile聲明的對象具有volatile讀的語義

要使非volatile聲明的對象具有volatile寫語義則需要藉助操作UNSAFE.putObjectvolatile

UNSAFE.putOrderedObject操作的含義和作用是什麼:

為了控制特定條件下的指令重排序和內存可見性問題,java編輯器使用了內存屏障的CPU指令來禁止指令重排序。java中volatile寫入使用了內存的屏障中的LoadStore屏障規則,對於 Load1->LoadStore->Store2, 在Store2以及後續寫入操作被刷出之前,要保證Load1要讀取的數據被讀取完畢。

volatile的寫所插入的storeLoad是一個耗時的操作,因此出現了一個對volatile寫的升級版本,利用lazySet方法對性能進行優化,在實現上對volatile的寫只會在之前插入storestore屏障,對於這樣的Store1 ;StoreStore;Store2,在store2及後續寫入操作執行前,保證store1的寫入對其它處理器是可見,也就是按順序寫入。 UNSAFE.putOrderedObject正是提供了這樣的語義,避免了寫寫指定重排序,但是不保證內存可見性,因此需要藉助volatile讀來保證可見性。

ConcurrentHashMap正是利用了這些高性能的原子讀寫來避免加鎖帶來的開銷。

// 獲取給定table的第i個元素,使用volatile讀語義。

 @SuppressWarnings("unchecked")
static final HashEntry
entryAt(HashEntry[] tab, int i) {
return (tab == null) ? null :
(HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
/**
* Sets the ith element of given table, with volatile write
* semantics. (See above about use of putOrderedObject.)
*/
//設置給定table的第i個元素,使用volatile寫入語義
static final void setEntryAt(HashEntry[] tab, int i,
HashEntry e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
/*
* 通過Unsafe提供的具有volatile元素訪問語義的操作獲取Segment數組的第j個元素(如果ss為空)
* 注意:因為Segment數組的每個元素只能設置一次(使用完全有序的寫入)
* 所以,一些性能敏感的方法只能依靠此方法作為對空讀取的重新檢查。
*/
@SuppressWarnings("unchecked")
static final Segment segmentAt(Segment[] ss, int j) {
long u = (j << SSHIFT) + SBASE;
return ss == null ? null :
(Segment) UNSAFE.getObjectVolatile(ss, u);
}
/*
* 根據給定的Hash獲取segment
*/

@SuppressWarnings("unchecked")

private Segment segmentForHash(int h) {
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
return (Segment) UNSAFE.getObjectVolatile(segments, u);
}
/*
* 根據給定的segment和hash獲取table entry 一條鏈表
*/
@SuppressWarnings("unchecked")
static final HashEntry entryForHash(Segment seg, int h) {
HashEntry[] tab;
return (seg == null || (tab = seg.table) == null) ? null :
(HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
}

返回目錄

HashEntry:內部類,用來存儲key-value的數據結構

注意:value和next聲明為volatile,是為了保證內存的可見性,也就是保證讀取的值都是最新的值,而不會從緩存讀取。 寫入next域使用volatile寫入是為了保證原子性。寫入使用原子性操作,讀取使用volatile,保證多線程訪問的安全性。

 //存儲key-value的數據結構,
static final class HashEntry {
final int hash; //hash值
final K key;
volatile V value; //全局可見,用來實現containsValue方法
volatile HashEntry next; //全局可見,netxt節點
//初始化一個節點
HashEntry(int hash, K key, V value, HashEntry next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
設置下一節點
final void setNext(HashEntry n) { //原子性設置下next節點
UNSAFE.putOrderedObject(this, nextOffset, n);
}
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

返回目錄

Segment:內部類,用來實行併發操作

segment為ConcurrentHashMap的專用數據結構,同時擴展了ReentrantLock,使得Segment本身就是一把重入鎖,方便執行鎖定。Segment內部持有一個始終處於一致狀態的entry列表,使得讀取狀態無需加鎖(通過volatile讀table數組)。調整table大小期間通過複製節點實現,使舊版本的table仍然可以進行遍歷。

Segment僅定義需要加鎖的可變方法,針對ConcurrentHashMap中相應方法的調用都會被代理到Segment中的方法。這些可變方法使用scanAndLock和scanAndLockForPut在競爭中使用受控旋轉(自旋次數受限制的自旋鎖) 由於線程的阻塞與喚醒通常伴隨著上下文切換,CPU搶佔等,都是開銷比較大的操作。使用自旋次數受限制的自旋鎖,可以提高獲取鎖的概率,降低線程阻塞的概率,這樣可極大提升性能。為什麼受限自旋呢?(自旋會不斷消耗CPU的時間片,無限制自旋會導致開銷增加)所以自旋鎖適合多核CPU下,同時線程等待所的時間非常短,若等待時間較長,應該儘早進入阻塞。

成員變量和繼承關係:

 static final class Segment 
extends ReentrantLock //繼承了重入鎖
implements Serializable {

private static final long serialVersionUID = 2249069246763182397L;
//對segment加鎖時,在阻塞之前進行的最大自旋次數,
//在多處理器上,使用有限數量的重試來維護在定位節點時獲取的高速緩存

//最多自旋64
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
//每個segment的table數組,訪問數組中元素通過entryAt/setEntryAt提供的
//volatile語義來完成。
transient volatile HashEntry[] table;
//元素數量,只能在鎖中或其它volatile讀保證可見性之間進行訪問。
transient int count;
//當前segment中可變操作發生的次數,put,remove等,可能會溢出32位,
//它為isEmpry()和size()方法中的穩定性檢查提供了足夠的準確性。
//只能在鎖中或者其它volatile讀保證可見性之間進行訪問。
transient int modCount;
//table大小超過閾值對table進行擴容
transient int threshold; //閾值
final float loadFactor; //負載因子
//構造函數
Segment(float lf, int threshold, HashEntry[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}

返回目錄

scanAndLockForPut:

while每循環一次,都會嘗試獲取鎖,成功則返回, retries初始值設為-1是為了遍歷當前hash對應的桶的鏈表,找到則停止遍歷,未找到則會預創建一個節點;同時,如果頭節點發生變化,則會重新進行遍歷,直到自旋次數大於MAX_SCAN_RETRIES,使用lock進行加鎖,如果失敗則會進入等待隊列。

為何要遍歷一次鏈表:

scanAndLockForPut使用自旋次數受限制的自旋鎖進行優化加鎖的方式,此外遍歷鏈表也是一種優化方法,主要是儘可能使當前鏈表中的節點進入CPU高速緩存,提高緩存命中率,以便獲取鎖定後的遍歷速度更快。 實際上加鎖後並沒有使用已經找到的節點,因為它們必須在鎖定下重新獲取,以確保更新的順序一致性,但是遍歷一次後可以更快的進行定位 ,這是一種預熱優化方法。 scanAndLock中也使用了該優化方式。

scanAndLock內部實現方式與scanAndLockForPut更簡單,scanAndLock不需要預創建節點。因此主要用於remove和replace操作。

 //自旋獲取鎖
private HashEntry scanAndLockForPut(K key, int hash, V value) {
HashEntry first = entryForHash(this, hash); //根據key的hash值找到頭節點
HashEntry e = first;
HashEntry node = null;
int retries = -1; // negative while locating node
while (!tryLock()) { //嘗試獲取鎖,成功返回,不成功開始自旋
HashEntry f; // 用於後續重新檢查頭結點
if (retries < 0) { //第一次自旋
if (e == null) { //結束遍歷節點
if (node == null) //創建節點
node = new HashEntry
(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key)) //找到節點,結束遍歷
retries = 0;
else
e = e.next; //下一節點
//如果鏈表有節點,那麼如果沒有找到,
//那麼就會一致遍歷(retries=-1)的狀態下
//直到節點到達末尾
}
else if (++retries > MAX_SCAN_RETRIES) { //達到最大嘗試次數,
lock(); //進入加鎖方法,失敗則會進入排隊,阻塞當前線程
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
//頭節點發生變化需要重新遍歷,說明油新結點加入或者被移除
e = first = f; // re-traverse if entry changed
retries = -1; //自旋次數歸0,重新自旋
}
}
return node;
}

返回目錄

put操作:

流程:

先對Segment加鎖,然後根據(tab.length-1)&hash找到對應的slot

然後根據slot遍歷對應的鏈表,

如果key對應的entry存在(根據onlyIfAbsent)決定是否替換新值

如果key對應的entry不存在,創建新節點頭插法插入

若容量超出限制,則判斷是否進行rehash;

幾個優化:

在 scanAndLockForPut()中:

如果鎖能夠很快的獲取到,有限次數的自旋可防止線程進入阻塞,有助於提升性能。

在自旋期間會遍歷鏈表,希望遍歷後的鏈表被 cache緩存,為實際put操作過程中的鏈表遍歷操作提供性能(預熱優化:遍歷一次後可以更快進行定位)

並且還會預創建節點;

HashTable[] tab =table 好處:為什麼不直接用table操作

table被聲明為volatile,為了保證內存的可見性,table上的修改都必須立即更新到主內存,

volatile寫實際是具有一定開銷的。 由於put是中代碼是加鎖執行的,鎖是既能保證可見性,也能保證原子性的,因此不需要在對table進行volatile寫,將其賦給一個局部變量實現編譯,運行時優化。

node.setNext(first)也是同樣的道理,next同樣是被聲明為volatile,因此也是使用優化的方式UNSAFE.putOrderedObject進行volatile寫入操作。

put已經加鎖,為何訪問tab元素不直接通過數組索引,而用entryAt(tab,index):

加鎖保證了volatile同步語義,但是對table數組中元素的寫入使用UNSAFE.putOrderedObject進行順序寫,該操作只是禁止寫寫重排序指令,不能保證寫入後內存的可見性 所以必須使用使用entryAt(tab,index)提供的volatile讀獲取最新的數據

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry node = tryLock() ? null : //獲得鎖成功,
scanAndLockForPut(key, hash, value); //未成功獲取鎖,則自旋獲取鎖,
//如果超過自旋次數,則阻塞

V oldValue;
try {
HashEntry[] tab = table;
int index = (tab.length - 1) & hash; // 得到桶位置
HashEntry first = entryAt(tab, index);
//volatile讀語義獲取index的頭節點

for (HashEntry e = first;;) { //遍歷鏈表

if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) { //找到節點
oldValue = e.value;
if (!onlyIfAbsent) { //是否替換舊值
e.value = value;
++modCount;//被修改次數
}
break;
}
e = e.next; //不是,繼續遍歷
}
else { //e==null //沒有找到key值
if (node != null) //scanAndLockForPut只有找不到節點才會不返回null
node.setNext(first);
//將node設置為頭節點,此處可以看出其為頭插法鏈表插入元素

else //處理tryLock成功返回null值,沒有找到節點的情況
node = new HashEntry(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY) //當前table中元素數量大於閾值,
//需要重新進行rehash
rehash(node); //重新hash
else //沒有大於閾值
setEntryAt(tab, index, node);//將node頭節點插入table中
++modCount; //被修改次數
count = c; //元素個數
oldValue = null; //舊址=null
break;
}
}
} finally {
unlock();
}
return oldValue;
}

返回目錄

rehash操作:

rehash主要的作用是擴容,將擴容前table中的節點重新分配到新table中。由於table的capacity都是2的冪,按照2的冪擴容為原來的一倍,擴容前在slot i 中的元素,擴容後要麼在slot i中或者 i+擴容前table的capacity的solt中,這樣使得只需要移動原來桶中的部分元素即可將所有節點分配到新table中。

為了提高效率,rehash首先找到第一個後續所有節點在擴容後index都保持不變的結點,將這個結點加入擴容後的table的index對應的slot中,然後將節點之前的所有節點重排即可。

	 //重新進行擴容hash,這個操作是已經在put加鎖的
@SuppressWarnings("unchecked")
private void rehash(HashEntry node) {
HashEntry[] oldTable = table; //舊的table數組
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1; //二倍擴容
threshold = (int)(newCapacity * loadFactor); //新閾值
HashEntry[] newTable = //新數組長度
(HashEntry[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;

//將舊數組中所有節點複製到新數組中,對舊數組中鏈表最後同index的進行復用(提高效率)
for (int i = 0; i < oldCapacity ; i++) {//遍歷舊數組
HashEntry e = oldTable[i];
if (e != null) { //當前數組節點不為空
HashEntry next = e.next; //當前節點next節點
int idx = e.hash & sizeMask;//新數組角標
if (next == null) // Single node on list
newTable[idx] = e; //舊table數組此角標只有一個節點
else { //當前角標鏈表不止一個節點
// Reuse consecutive sequence at same slot
HashEntry lastRun = e; //鏈表頭節點
int lastIdx = idx;
for (HashEntry last = next; //對鏈表進行遍歷
//找到後續節點新index不變的節點
last != null;
last = last.next) {
int k = last.hash & sizeMask; //當前節點重新hash後的角標
if (k != lastIdx) { //當前節點k與lastIdx不同則進行替換
//目的是找到該鏈表最後相同新角標的節點,這樣就可以
//最後一段鏈表一次性加入到新index
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
//後續節點新index不變節點前的所有節點均需要重新創建分配

for (HashEntry p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry n = newTable[k]; //當前角標頭節點
newTable[k] = new HashEntry(h, p.key, v, n); //頭插法
}
}
}
}
//找到當前要插入節點對應的角標
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]); //頭插法插入
newTable[nodeIndex] = node; //設置為新的頭節點
table = newTable;
}

返回目錄

scanAndLock操作:自旋並獲取鎖

 private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry first = entryForHash(this, hash);
//通過volatile讀獲取指定座標的鏈表
HashEntry e = first;
int retries = -1;

while (!tryLock()) { //自旋嘗試獲取鎖
HashEntry f;
if (retries < 0) {
if (e == null || key.equals(e.key)) //鏈表為空||找到key對應節點
retries = 0;
else
e = e.next; //遍歷鏈表
}
else if (++retries > MAX_SCAN_RETRIES) { //大於最大自旋次數,阻塞線程
lock();
break;
}
else if ((retries & 1) == 0 && //頭結點發生變化,重新自旋並且遍歷鏈表
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}

返回目錄

remove操作:

final V remove(Object key, int hash, Object value) {
if (!tryLock()) //未成功獲取鎖,自旋獲取
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry[] tab = table; //已經加鎖,不需要volatile寫
int index = (tab.length - 1) & hash;
HashEntry e = entryAt(tab, index);
//volatile讀,獲取index角標的鏈表
HashEntry pred = null; //前驅

while (e != null) {
K k;
HashEntry next = e.next;
if ((k = e.key) == key || //找到要刪除節點
(e.hash == hash && key.equals(k))) {
V v = e.value;
//
if (value == null || value == v || value.equals(v)) {
if (pred == null) //要刪除的是頭節點
setEntryAt(tab, index, next);
else //非頭節點
pred.setNext(next);
++modCount; //修改次數+1
--count; //元素數量--
oldValue = v; //舊值
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}

返回目錄

replace操作:

boolean replace(K key, int hash, V oldValue, V newValue):根據舊值替換新值,若舊值發生變化,則返回false

 final boolean replace(K key, int hash, V oldValue, V newValue) {
if (!tryLock()) //嘗試獲取鎖,不成功則自旋獲取
scanAndLock(key, hash);
boolean replaced = false;

try {
HashEntry e;
//遍歷給定角標鏈表
for (e = entryForHash(this, hash); e != null; e = e.next) {
K k;
if ((k = e.key) == key || //找到key值
(e.hash == hash && key.equals(k))) {
//典型CAS操作
if (oldValue.equals(e.value)) { //舊址相同則替換為新值
e.value = newValue;
++modCount;
replaced = true;
}
break; //說明oldvValue已經被別的線程修改
}
}
} finally {
unlock();
}
return replaced;
}

final V replace(K key, int hash, V value) :直接進行替換,並返回舊值

final V replace(K key, int hash, V value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry e;
//遍歷鏈表
for (e = entryForHash(this, hash); e != null; e = e.next) {
K k;
//找到直接進行替換
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
e.value = value;
++modCount;
break;

}
}
} finally {
unlock();
}
return oldValue;
}

幾個常用操作源碼解析:

返回目錄

get操作: 不需要進行加鎖,只關心一個segment;非線程安全,得到的數據可能是過時數據。

public V get(Object key) {
Segment s; // manually integrate access methods to reduce overhead
HashEntry[] tab;
int h = hash(key); //對key進行hash
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; //得到對應segment
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) { //table數據存在
//遍歷數組
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
//找到指定key
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value; //返回對應value值
}
}
return null;
}

返回目錄

boolean isEmpty():判斷集合是否為空
public boolean isEmpty() {

long sum = 0L;
final Segment[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) { //遍歷sgment數組
Segment seg = segmentAt(segments, j); //得到對應table
if (seg != null) {
if (seg.count != 0)
return false;
sum += seg.modCount; //可變操作次數相加
}
}
//有過修改痕跡,再次遍歷
if (sum != 0L) { // recheck unless no modifications
for (int j = 0; j < segments.length; ++j) {
Segment seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum -= seg.modCount;
}
}
if (sum != 0L) //一加一減不為0 說明這期間有被修改過,所以不為null
return false;
}
return true;

返回目錄

int size() 得到集合元素個數:

size和containsValue與put和get最大的區別在於,都需要遍歷所有的Segment才能得到結果。

這兩個源碼實現都是先給三次機會。不lock所有的segment,比較相鄰兩次的modCount和,如果相同則說明在這之間整個集合是沒有進行更新操作的。得到的size是正確的。如果三次循環之後仍然沒有得到正確答案。那麼就對所有的segment進行加鎖。計算完畢後在進行解鎖。

public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
//為true表示size溢出32
long sum; // sum of modCounts //modCount和
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
//第一次迭代不計入重試,所以會重試三次
try {
for (;;) {
//前三次(-1,0,1) 進行不加鎖統計size,如果得不到準確值則第四次加鎖統計
if (retries++ == RETRIES_BEFORE_LOCK) { //默認table大小為2
//sgments全部加鎖
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
//遍歷segment
for (int j = 0; j < segments.length; ++j) {
Segment
seg = segmentAt(segments, j);
if (seg != null) { //當前table不為空
sum += seg.modCount;
int c = seg.count; //當前segment元素個數
if (c < 0 || (size += c) < 0)
overflow = true; //size溢出
}
}
if (sum == last) //兩次統計的可變操作修改次數相同,說明全部加鎖成功
//並且獲得準確size
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
// 三次以後才進行加鎖,因此此時需要解鎖
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size; //溢出返回最大值,否則返回size
}

返回目錄

boolean containsValue(Object value) /boolean contains(Object value) :集合中是否包含此值
public boolean containsValue(Object value) {
// Same idea as size()
if (value == null) //hashMap不允許value為null
throw new NullPointerException();
final Segment[] segments = this.segments;
boolean found = false; //是否包含
long last = 0;
int retries = -1;
try {
outer: for (;;) {
//三次不加鎖嘗試尋找,如果未成功則進行加鎖尋找

if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
long hashSum = 0L;
int sum = 0;
//遍歷segments
for (int j = 0; j < segments.length; ++j) {
HashEntry[] tab;
Segment seg = segmentAt(segments, j);

if (seg != null && (tab = seg.table) != null) {
//遍歷table
for (int i = 0 ; i < tab.length; i++) {
HashEntry e;
//遍歷鏈表
for (e = entryAt(tab, i); e != null; e = e.next) {
V v = e.value;
if (v != null && value.equals(v)) { //找到value
found = true;
break outer; //跳出死循環
}
}
}
sum += seg.modCount; //可變操作次數相加
}
}
if (retries > 0 && sum == last) //兩次操作中間沒有發生可變操作次數變化
//說明value值不存在
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) { //大於2 需要解鎖
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return found;
}

返回目錄

弱一致性:

ConcurrentHashMap是弱一致性的,它的get/containsKey/clear/iterator都是弱一致性的。

get和containsKey都是無鎖操作,均通過getObjectVolatile()提供的原子讀來獲得Segment以及對應的鏈表,然後遍歷鏈表。由於遍歷期間其它線程可能對鏈表結構做了調整 ,所以返回的可能是過時數據。


分享到:


相關文章: