深入解析 ConcurrentHashMap 實現內幕,吊打面試官?沒問題?



在開發中,我們經常使用 HashMap 容器來存儲 K-V 鍵值對,但是在併發多線程的情況下,HashMap 容器又是不安全的,因為在 put 元素的時候,如果觸發擴容操作,也就是 rehash ,就會將原數組的內容重新 hash 到新的擴容數組中,但是在擴容這個過程中,其他線程也在進行 put 操作,如果這兩個元素 hash 值相同,可能出現同時在同一數組下用鏈表表示,造成閉環,導致在get時會出現死循環,所以HashMap是線程不安全的

那有沒有安全的 Map 容器呢?有的,目前 JDK 中提供了三種安全的 Map 容器:

  • HashTable
  • Collections.SynchronizedMap(同步包裝器提供的方法)
  • ConcurrentHashMap

先來看看前兩種容器,它們都是利用非常粗粒度的同步方式,在高併發情況下,性能比較低下。Hashtable 是在 put、get、size 等各種方法加上“synchronized” 鎖來保證安全,這就導致了所有併發操作都要競爭同一把鎖,一個線程在進行同步操作時,其他線程只能等待,大大降低了併發操作的效率。

再來看看 Collections 提供的同步包裝器 SynchronizedMap ,我們可以先來看看 SynchronizedMap 的源代碼:

private static class SynchronizedMap
implements Map, Serializable {
private static final long serialVersionUID = 1978198479659022715L;

private final Map m; // Backing Map
final Object mutex; // Object on which to synchronize

SynchronizedMap(Map m) {
this.m = Objects.requireNonNull(m);
mutex = this;
}

SynchronizedMap(Map m, Object mutex) {
this.m = m;
this.mutex = mutex;
}
public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public boolean containsValue(Object value) {
synchronized (mutex) {return m.containsValue(value);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}

public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}
public void putAll(Map extends K, ? extends V> map) {
synchronized (mutex) {m.putAll(map);}

}
public void clear() {
synchronized (mutex) {m.clear();}
}
......
}

從源碼中,我們可以看出 SynchronizedMap 雖然方法沒有加 synchronized 鎖,但是利用了“this”作為互斥的 mutex,所以在嚴格意義上 SynchronizedMap 跟 HashTable 一樣,並沒有實際的改進。

第三個 ConcurrentHashMap 也是這篇文章的主角,它相對前兩種安全的 Map 容器來說,在設計和思想上有較大的變化,也極大的提高了 Map 的併發效率。就 ConcurrentHashMap 容器本身的實現來說,版本之間就會產生較大的差異,典型的就是 JDK1.7 和 JDK1.8 這兩個版本,可以說是發生了翻天覆地的變化,在本文中也會介紹這兩個版本的 ConcurrentHashMap 實現,主要的重點放在 JDK 1.8 版本上,我個人覺得 JDK 1.7 已經成為了過去式,沒必要深入研究。

ConcurrentHashMap 在 JDK 1.7 中的實現

在 JDK 1.7 版本及之前的版本中,ConcurrentHashMap 為了解決 HashTable 會鎖住整個 hash 表的問題,提出了分段鎖的解決方案,分段鎖就是將一個大的 hash 表分解成若干份小的 hash 表,需要加鎖時就針對小的 hash 表進行加鎖,從而來提升 hash 表的性能。JDK1.7 中的 ConcurrentHashMap 引入了 Segment 對象,將整個 hash 表分解成一個一個的 Segment 對象,每個 Segment 對象呢可以看作是一個細粒度的 HashMap。

Segment 對象繼承了 ReentrantLock 類,因為 Segment 對象它就變成了一把鎖,這樣就可以保證數據的安全。 在 Segment 對象中通過 HashEntry 數組來維護其內部的 hash 表。每個 HashEntry 就代表了 map 中的一個 K-V,如果發生 hash 衝突時,在該位置就會形成鏈表。

JDK1.7 中,ConcurrentHashMap 的整體結構可以描述為下圖的樣子:

深入解析 ConcurrentHashMap 實現內幕,吊打面試官?沒問題?

我們對 ConcurrentHashMap 最關心的地方莫過於如何解決 HashMap put 時候擴容引起的不安全問題?一起來看看 JDK1.7 中 ConcurrentHashMap 是如何解決這個問題的,我們先從 put 方法開始:

 public V put(K key, V value) {
Segment s;
if (value == null)
throw new NullPointerException();
// 二次哈希,以保證數據的分散性,避免哈希衝突
int hash = hash(key.hashCode());
int j = (hash >>> segmentShift) & segmentMask;
// Unsafe 調用方式,直接獲取相應的 Segment
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

在 put 方法中,首先是通過二次哈希減小哈希衝突的可能行,根據 hash 值以 Unsafe 調用方式,直接獲取相應的 Segment,最終將數據添加到容器中是由 segment對象的 put 方法來完成。Segment對象的 put 方法源代碼如下:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 無論如何,確保獲取鎖 scanAndLockForPut會去查找是否有key相同Node
ConcurrentHashMap.HashEntry node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
ConcurrentHashMap.HashEntry
[] tab = table;
int index = (tab.length - 1) & hash;
ConcurrentHashMap.HashEntry first = entryAt(tab, index);
for (ConcurrentHashMap.HashEntry e = first;;) {
// 更新已存在的key
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new ConcurrentHashMap.HashEntry(hash, key, value, first);
int c = count + 1;
// 判斷是否需要擴容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

由於 Segment 對象本身就是一把鎖,所以在新增數據的時候,相應的 Segment對象塊是被鎖住的,其他線程並不能操作這個 Segment 對象,這樣就保證了數據的安全性,在擴容時也是這樣的,在 JDK1.7 中的 ConcurrentHashMap擴容只是針對 Segment 對象中的 HashEntry 數組進行擴容,還是因為 Segment 對象是一把鎖,所以在 rehash 的過程中,其他線程無法對 segment 的 hash 表做操作,這就解決了 HashMap 中 put 數據引起的閉環問題

關於 JDK1.7 中的 ConcurrentHashMap 就聊這麼多,我們只需要直到在 JDK1.7 中 ConcurrentHashMap 採用分段鎖的方式來解決 HashMap 不安全問題。

ConcurrentHashMap 在 JDK1.8 中的實現

在 JDK1.8 中 ConcurrentHashMap 又發生了翻天覆地的變化,從實現的代碼量上就可以看出來,在 1.7 中不到 2000行代碼,而在 1.8 中已經 6000多行代碼了 。廢話不多說,我們來看看有那些變化。

先從容器安全說起,在容器安全上,1.8 中的 ConcurrentHashMap 放棄了 JDK1.7 中的分段技術,而是採用了 CAS 機制 + synchronized 來保證併發安全性,但是在 ConcurrentHashMap 實現裡保留了 Segment 定義,這僅僅是為了保證序列化時的兼容性而已,並沒有任何結構上的用處。 這裡插播個 CAS 機制的知識點:

CAS 機制

CAS 典型的應用莫過於 AtomicInteger 了,CAS 屬於原子操作的一種,能夠保證一次讀寫操作是原子的。CAS 通過將內存中的值與期望值進行比較,只有在兩者相等時才會對內存中的值進行修改,CAS 是在保證性能的同時提供併發場景下的線程安全性。在 Java 中 CAS 實現位於 sun.misc.Unsafe 類中,該類中定義了大量的 native 方法,CAS 的實現有以下幾個方法:

public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);

我們只能看到定義,並不能看到具體的實現,具體的實現依賴於操作系統,我們就不去管這些了,簡單瞭解方法裡面的參數是啥意思就行了:

  • o :目標操作對象
  • offset :目標操作數內存偏移地址
  • expected :期望值
  • x :更新值

CAS 機制雖然無需加鎖、安全且高效,但也存在一些缺點,概括如下:

  • 循環檢查的時間可能較長,不過可以限制循環檢查的次數
  • 只能對一個共享變量執行原子操作
  • 存在 ABA 問題(ABA 問題是指在 CAS 兩次檢查操作期間,目標變量的值由 A 變為 B,又變回 A,但是 CAS 看不到這中間的變換,對它來說目標變量的值並沒有發生變化,一直是 A,所以 CAS 操作會繼續更新目標變量的值。)

在存儲結構上,JDK1.8 中 ConcurrentHashMap 放棄了 HashEntry 結構而是採用了跟 HashMap 結構非常相似,採用 Node 數組加鏈表(鏈表長度大於8時轉成紅黑樹)的形式,Node 節點設計如下:

static class Node implements Map.Entry {
final int hash;
final K key;
volatile V val;
volatile Node next;
...省略...
}

跟 HashMap 一樣 Key 字段被 final 修飾,說明在生命週期內,key 是不可變的, val 字段被 volatile 修飾了,這就保證了 val 字段的可見性。

JDK1.8 中的 ConcurrentHashMap 結構如下圖所示:

深入解析 ConcurrentHashMap 實現內幕,吊打面試官?沒問題?

在這裡我提一下 ConcurrentHashMap 默認構造函數,我覺得這個地方比較有意思,ConcurrentHashMap 的默認構造函數如下:

public ConcurrentHashMap() {
}

發現沒這個構造函數啥事沒幹,為啥要這樣設計?這樣做的好處是實現了懶加載(lazy-load 形式),有效避免了初始化的開銷,這也是 JDK1.7 中ConcurrentHashMap 被很多人抱怨的地方。

結構上的變化就聊上面的兩點,跟上面一樣,我們還是來看看我們關心的問題,如何解決 HashMap 擴容時不安全的問題,帶著這個問題來閱讀 ConcurrentHashMap 的源代碼,關於 ConcurrentHashMap 的源代碼,在本文中主要聊新增(putVal )和擴容(transfer )這兩個方法,其他方法就不在一一介紹了。

putVal 方法

ConcurrentHashMap 新增元素並不是直接調用 putVal 方法,而是使用 put 方法

public V put(K key, V value) {
return putVal(key, value, false);
}

但是 put 方法調用了 putVal 方法,換一句話來說就是 putVal 是具體的新增方法,是 put 方法的具體實現,在 putVal 方法源碼加上了註釋,具體代碼如下:

 final V putVal(K key, V value, boolean onlyIfAbsent) {
// 如果 key 為空,直接返回
if (key == null || value == null) throw new NullPointerException();
// 兩次 hash ,減少碰撞次數
int hash = spread(key.hashCode());
// 記錄鏈表節點得個數
int binCount = 0;
// 無條件得循環遍歷整個 node 數組,直到成功
for (ConcurrentHashMap.Node[] tab = table;;) {
ConcurrentHashMap.Node f; int n, i, fh;
// lazy-load 懶加載的方式,如果當前 tab 容器為空,則初始化 tab 容器
if (tab == null || (n = tab.length) == 0)
tab = initTable();

// 通過Unsafe.getObjectVolatile()的方式獲取數組對應index上的元素,如果元素為空,則直接無所插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//// 利用CAS去進行無鎖線程安全操作
if (casTabAt(tab, i, null,
new ConcurrentHashMap.Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果 fh == -1 ,說明正在擴容,那麼該線程也去幫擴容
else if ((fh = f.hash) == MOVED)
// 協作擴容操作
tab = helpTransfer(tab, f);
else {
// 如果上面都不滿足,說明存在 hash 衝突,則使用 synchronized 加鎖。鎖住鏈表或者紅黑樹的頭結點,來保證操作安全
V oldVal = null;
synchronized (f) {

if (tabAt(tab, i) == f) {

if (fh >= 0) {// 表示該節點是鏈表
binCount = 1;
// 遍歷該節點上的鏈表
for (ConcurrentHashMap.Node e = f;; ++binCount) {
K ek;
//這裡涉及到相同的key進行put就會覆蓋原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
ConcurrentHashMap.Node pred = e;
if ((e = e.next) == null) {//插入鏈表尾部
pred.next = new ConcurrentHashMap.Node(hash, key,
value, null);
break;
}
}
}
else if (f instanceof ConcurrentHashMap.TreeBin) {// 該節點是紅黑樹節點
ConcurrentHashMap.Node p;
binCount = 2;
if ((p = ((ConcurrentHashMap.TreeBin)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 插入完之後,判斷鏈表長度是否大於8,大於8就需要轉換為紅黑樹
if (binCount != 0) {

if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果存在相同的key ,返回原來的值
if (oldVal != null)
return oldVal;
break;
}
}
}
//統計 size,並且檢測是否需要擴容
addCount(1L, binCount);
return null;
}

在源碼中有比較詳細的註釋,如果你想了解詳細的實現,可以逐行讀源碼,在這裡我們來對 putVal 方法做一個總結,putVal 方法主要做了以下幾件事:

  • 第一步、在 ConcurrentHashMap 中不允許 key val 字段為空,所以第一步先校驗key value 值,key、val 兩個字段都不能是 null 才繼續往下走,否則直接返回一個 NullPointerException 錯誤,這點跟 HashMap 有區別,HashMap 是可以允許為空的。
  • 第二步、判斷容器是否初始化,如果容器沒有初始化,則調用 initTable 方法初始化,initTable 方法如下:
 /**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node[] initTable() {
Node[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 負數表示正在初始化或擴容,等待
if ((sc = sizeCtl) < 0)
// 自旋等待
Thread.yield(); // lost initialization race; just spin
// 執行 CAS 操作,期望將 sizeCtl 設置為 -1,-1 是正在初始化的標識
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// CAS 搶到了鎖
try {
// 對 table 進行初始化,初始化長度為指定值,或者默認值 16
if ((tab = table) == null || tab.length == 0) {
// sc 在初始化的時候用戶可能會自定義,如果沒有自定義,則是默認的
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 創建數組
Node[] nt = (Node[])new Node,?>[n];
table = tab = nt;
// 指定下次擴容的大小,相當於 0.75 × n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

Table 本質上就是一個 Node 數組,其初始化過程也就是對 Node 數組的初始化過程,方法中使用了 CAS 策略執行初始化操作。初始化流程為:

1、判斷 sizeCtl 值是否小於 0,如果小於 0 則表示 ConcurrentHashMap 正在執行初始化操作,所以需要先等待一會,如果其它線程初始化失敗還可以頂替上去 2、如果 sizeCtl 值大於等於 0,則基於 CAS 策略搶佔標記 sizeCtl 為 -1,表示 ConcurrentHashMap 正在執行初始化,然後構造 table,並更新 sizeCtl 的值

  • 第三步、根據雙哈希之後的 hash 值找到數組對應的下標位置,如果該位置未存放節點,也就是說不存在 hash 衝突,則使用 CAS 無鎖的方式將數據添加到容器中,並且結束循環。
  • 第四步、如果並未滿足第三步,則會判斷容器是否正在被其他線程進行擴容操作,如果正在被其他線程擴容,則放棄添加操作,加入到擴容大軍中(ConcurrentHashMap 擴容操作採用的是多線程的方式,後面我們會講到),擴容時並未跳出死循環,這一點就保證了容器在擴容時並不會有其他線程進行數據添加操作,這也保證了容器的安全性
  • 第五步、如果 hash 衝突,則進行鏈表操作或者紅黑樹操作(如果鏈表樹超過8,則修改鏈表為紅黑樹),在進行鏈表或者紅黑樹操作時,會使用 synchronized 鎖把頭節點被鎖住了,保證了同時只有一個線程修改鏈表,防止出現鏈表成環
  • 第六步、進行 addCount(1L, binCount) 操作,該操作會更新 size 大小,判斷是否需要擴容,addCount 方法源碼如下:
 // X傳入的是1,check 傳入的是 putVal 方法裡的 binCount,沒有hash衝突的話為0,衝突就會大於1
private final void addCount(long x, int check) {
ConcurrentHashMap.CounterCell[] as; long b, s;
// 統計ConcurrentHashMap裡面節點個數
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
ConcurrentHashMap.CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// check就是binCount,binCount 最小都為0,所以這個條件一定會為true

if (check >= 0) {
ConcurrentHashMap.Node[] tab, nt; int n, sc;
// 這兒是自旋,需同時滿足下面的條件
// 1. 第一個條件是map.size 大於 sizeCtl,也就是說需要擴容
// 2. 第二個條件是`table`不為null
// 3. 第三個條件是`table`的長度不能超過最大容量
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// 該判斷表示已經有線程在進行擴容操作了
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果可以幫助擴容,那麼將 sc 加 1. 表示多了一個線程在幫助擴容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 如果不在擴容,將 sc 更新:標識符左移 16 位 然後 + 2. 也就是變成一個負數。高 16 位是標識符,低 16 位初始是 2
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}

addCount 方法做了兩個工作: 1、對 map 的 size 加一 2、檢查是否需要擴容,或者是否正在擴容。如果需要擴容,就調用擴容方法,如果正在擴容,就幫助其擴容。

擴容 transfer 方法

擴容 transfer 方法是一個非常牛逼的方法,在看具體的 transfer 源碼之前,我們先來了解一下什麼時候會觸發擴容操作,不出意外的話,以下兩種情況下可能觸發擴容操作

  • 調用 put 方法新增元素之後,會調用 addCount 方法來更新 size 大小,並檢查是否需要進行擴容,當數組元素個數達到閾值時,會觸發transfer方法
  • 觸發了 tryPresize 操作, tryPresize 操作會觸發擴容操作,有兩種情況會觸發 tryPresize 操作: 第一種情況:當某節點的鏈表元素個數達到閾值 8 時,這時候需要將鏈表轉成紅黑樹,在結構轉換之前會,會先判斷數組長度 n 是否小於閾值MIN_TREEIFY_CAPACITY,默認是64,如果小於則會調用tryPresize方法把數組長度擴大到原來的兩倍,並觸發transfer方法,重新調整節點的位置。 第二種情況:在 putAll 操作時會先觸發 tryPresize 操作。

tryPresize 方法源碼如下:

深入解析 ConcurrentHashMap 實現內幕,吊打面試官?沒問題?

好了,知道什麼時候會觸發擴容後,我們來看看 擴容 transfer 方法的源碼,這也是一塊硬骨頭,非常難啃,希望我可以儘量的把它講清楚,transfer 方法源碼如下:

 private final void transfer(ConcurrentHashMap.Node[] tab, ConcurrentHashMap.Node[] nextTab) {
int n = tab.length, stride;
// 多線程擴容,每核處理的量小於16,則強制賦值16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// nextTab 為空,先實例化一個新的數組
if (nextTab == null) { // initiating

try {
@SuppressWarnings("unchecked")
// 新數組的大小是原來的兩倍
ConcurrentHashMap.Node[] nt = (ConcurrentHashMap.Node[])new ConcurrentHashMap.Node,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 更新成員變量
nextTable = nextTab;
// 更新轉移下標,就是 老的 tab 的 length
transferIndex = n;
}
// bound :該線程此次可以處理的區間的最小下標,超過這個下標,就需要重新領取區間或者結束擴容
// advance: 該參數
int nextn = nextTab.length;
// 創建一個 fwd 節點,用於佔位。當別的線程發現這個槽位中是 fwd 類型的節點,則跳過這個節點。
ConcurrentHashMap.ForwardingNode fwd = new ConcurrentHashMap.ForwardingNode(nextTab);
// advance 變量指的是是否繼續遞減轉移下一個桶,如果為 true,表示可以繼續向後推進,反之,說明還沒有處理好當前桶,不能推進
boolean advance = true;
// 完成狀態,如果是 true,表示擴容結束
boolean finishing = false; // to ensure sweep before committing nextTab
// 死循環,i 表示下標,bound 表示當前線程可以處理的當前桶區間最小下標
for (int i = 0, bound = 0;;) {
ConcurrentHashMap.Node
f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
// 這兒多判斷一次,是否為了防止可能出現的remove()操作
if (tabAt(tab, i) == f) {
// 舊鏈表上該節點的數據,會被分成低位和高位,低位就是在新鏈表上的位置跟舊鏈表上一樣,
// 高位就是在新鏈表的位置是舊鏈表位置加上舊鏈表的長度

ConcurrentHashMap.Node ln, hn;
if (fh >= 0) {
int runBit = fh & n;
ConcurrentHashMap.Node lastRun = f;
for (ConcurrentHashMap.Node p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (ConcurrentHashMap.Node p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 該節點哈希值與舊鏈表長度與運算,結果為0,則在低位節點上,反之,在高位節點上
if ((ph & n) == 0)
ln = new ConcurrentHashMap.Node(ph, pk, pv, ln);
else
hn = new ConcurrentHashMap.Node(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
// 在nextTable i + n 位置處插上鍊表
setTabAt(nextTab, i + n, hn);
// 在table i 位置處插上ForwardingNode 表示該節點已經處理過了
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof ConcurrentHashMap.TreeBin) {
// 如果是TreeBin,則按照紅黑樹進行處理,處理邏輯與上面一致

// 紅黑樹的邏輯跟節點一模一樣,最後也會分高位和低位
ConcurrentHashMap.TreeBin t = (ConcurrentHashMap.TreeBin)f;
ConcurrentHashMap.TreeNode lo = null, loTail = null;
ConcurrentHashMap.TreeNode hi = null, hiTail = null;
int lc = 0, hc = 0;
for (ConcurrentHashMap.Node e = t.first; e != null; e = e.next) {
int h = e.hash;
ConcurrentHashMap.TreeNode p = new ConcurrentHashMap.TreeNode
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果樹的節點數小於等於 6,那麼轉成鏈表,反之,創建一個新的樹
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new ConcurrentHashMap.TreeBin(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new ConcurrentHashMap.TreeBin(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}

}
}
}
}
}

想知道具體的實現細節,請逐行讀源碼,如果遇到不懂得,歡迎留言交流,跟 putVal 方法一樣,我們同樣來對 transfer 方法進行總結,transfer 大致做了以下幾件事件:

  • 第一步:計算出每個線程每次可以處理的個數,根據 Map 的長度,計算出每個線程(CPU)需要處理的桶(table數組的個數),默認每個線程每次處理 16 個桶,如果小於 16 個,則強制變成 16 個桶。
  • 第二步:對 nextTab 初始化,如果傳入的新 table nextTab 為空,則對 nextTab 初始化,默認是原 table 的兩倍
  • 第三步:引入 ForwardingNode、advance、finishing 變量來輔助擴容,ForwardingNode 表示該節點已經處理過,不需要在處理,advance 表示該線程是否可以下移到下一個桶(true:表示可以下移),finishing 表示是否結束擴容(true:結束擴容,false:未結束擴容) ,具體的邏輯就不說了
  • 第四步:跳過一些其他細節,直接到數據遷移這一塊,在數據轉移的過程中會加 synchronized 鎖,鎖住頭節點,同步化操作,防止 putVal 的時候向鏈表插入數據
  • 第五步:進行數據遷移,如果這個桶上的節點是鏈表或者紅黑樹,則會將節點數據分為低位和高位,計算的規則是通過該節點的 hash 值跟為擴容之前的 table 容器長度進行位運算(&),如果結果為 0 ,則將數據放在新表的低位(當前 table 中為 第 i 個位置,在新表中還是第 i 個位置),結果不為 0 ,則放在新表的高位(當前 table 中為第 i 個位置,在新表中的位置為 i + 當前 table 容器的長度)
  • 第六步:如果桶掛載的是紅黑樹,不僅需要分離出低位節點和高位節點,還需要判斷低位和高位節點在新表以鏈表還是紅黑樹的形式存放。

還是那句話擴容 transfer 方法很難,希望我把它講清楚了,關於其他 ConcurrentHashMap 的方法在這裡就不聊了,ConcurrentHashMap 博大精深,要把它研究透還需要時日,好了,這就是我對 ConcurrentHashMap 實現內幕的分享,希望本文的內容對大家的學習或者工作能帶來一定的幫助,感謝大家的支持。


作者:平頭哥的技術博文
原文鏈接:https://juejin.im/post/5dee05ac6fb9a0163f77beb6


分享到:


相關文章: