走進C#併發隊列ConcurrentQueue的內部世界—NET Core篇

經過拋磚引玉,得到了一眾大佬的指點,找到了.NET Core版本下的ConcurrentQueue源碼,位於以下地址:

  • https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs
  • https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueueSegment.cs

我大致看了一下,雖然兩者的實現有不少相似的地方,不過在細節上新增了許多有意思的東西,還是覺得要單獨拉出來說一下。畫外音:誰叫我上篇立了flag,現在跪著也要寫完。。

必須要吐糟的是,代碼中ConcurrentQueue類明明是包含在System.Collections.Concurrent命名空間下,但是源碼結構中的文件卻放在System.Private.CoreLib目錄中,這是鬧哪出~


存儲結構

從上面給出的源碼地址可以猜測出整個結構依然是Segment+Queue的組合,通過一個Segment鏈表實現了Queue結構,但實際上內部又加了新的設計。拋去Queue先不看的話,Segment本身就是一個實現了多生產者多消費者的線程安全集合,甚至可以直接拿它當一個固定容量的線程安全隊列使用,這點與之前Framework中差別很大。如果結合Queue整體來看,Segment不再是固定容量,而是可以由Queue來控制每個Segment的容量大小(最小是32,上限是1024 * 1024)。

在Framework中,隊列會給每個Segment分配一個索引,雖然這個索引是long類型的,但理論上說隊列容量還是存在上限。在Core中就不一樣了,它取消了這個索引,真正實現了一個無邊界(unbounded)隊列。

我猜測的原因是,在Framework中由於每個Segment是固定大小的,維護一個索引可以很方便的計算隊列裡的元素數量,但是Core中的Segment大小不是固定的,使用索引並不能加快計算速度,使得這個索引不再有意義,這也意味著計算元素數量變得非常複雜。

一張圖看清它的真實面目,這裡繼續沿用上一篇的結構圖稍作修改:

走進C#併發隊列ConcurrentQueue的內部世界—NET Core篇

從圖中可以看到,整體結構上基本一致,核心改動就是Segment中增加了Slot(槽)的概念,這是真正存儲數據的地方,同時有一個序列號與之對應。

從代碼來看一下Segment的核心定義:

<code>internal sealed class ConcurrentQueueSegment
{
//存放數據的容器
\tinternal readonly Slot[] _slots;

\t//這個mask用來計算槽點,可以防止查找越界
\tinternal readonly int _slotsMask;

\t//首尾位置指針
\tinternal PaddedHeadAndTail _headAndTail;

\t//觀察保留標記,表示當前段在出隊時能否刪除數據
\tinternal bool _preservedForObservation;

\t//標記當前段是否被鎖住
\tinternal bool _frozenForEnqueues;

\t//下一段的指針
\tinternal ConcurrentQueueSegment? _nextSegment;
}
/<code>

其中_preservedForObservation和_frozenForEnqueues會比較難理解,後面再詳細介紹。

再看一下隊列的核心定義:

<code>public class ConcurrentQueue : IProducerConsumerCollection, IReadOnlyCollection
{
//每一段的初始化長度,也是最小長度
\tprivate const int InitialSegmentLength = 32;

//每一段的最大長度
\tprivate const int MaxSegmentLength = 1024 * 1024;

//操作多個段時的鎖對象
\tprivate readonly object _crossSegmentLock;

//尾段指針
\tprivate volatile ConcurrentQueueSegment _tail;

//首段指針
\tprivate volatile ConcurrentQueueSegment _head;
}
/<code>


常規操作

還是按上一篇的套路為主線循序漸進。

創建實例

ConcurrentQueue依然提供了2個構造函數,分別可以創建一個空隊列和指定數據集的隊列。

<code>/// <summary>
/// Initializes a new instance of the class.
/// /<summary>
public ConcurrentQueue()
{
_crossSegmentLock = new object();
_tail = _head = new ConcurrentQueueSegment(InitialSegmentLength);
}
/<code>

還是熟悉的操作,創建了一個長度是32的Segment並把隊列的首尾指針都指向它,同時創建了鎖對象實例,僅此而已。進一步看看Segment是怎麼創建的:

<code>internal ConcurrentQueueSegment(int boundedLength)
{
//這裡驗證了長度不能小於2並且必須是2的N次冪
Debug.Assert(boundedLength >= 2, $"Must be >= 2, got {boundedLength}");
Debug.Assert((boundedLength & (boundedLength - 1)) == 0, $"Must be a power of 2, got {boundedLength}");

_slots = new Slot[boundedLength];
//這個mask的作用就是用來計算數組索引的防止越界,可以用`& _slotsMask`取代`% _slots.Length`
_slotsMask = boundedLength - 1;

//設置初始序列號
for (int i = 0; i < _slots.Length; i++)
{
_slots[i].SequenceNumber = i;
}
}

internal struct Slot
{
[AllowNull, MaybeNull] public T Item;

public int SequenceNumber;
}

/<code>

再看看怎麼用集合初始化隊列,這個過程稍微麻煩點,但是很有意思:

<code>public ConcurrentQueue(IEnumerable collection)
{
if (collection == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.collection);
}

_crossSegmentLock = new object();

//計算得到第一段的長度
int length = InitialSegmentLength;
if (collection is ICollection c)
{
int count = c.Count;
if (count > length)
{
length = Math.Min(ConcurrentQueueSegment.RoundUpToPowerOf2(count), MaxSegmentLength);
}
}

//根據前面計算出來的長度創建一個Segment,再把數據依次入隊
_tail = _head = new ConcurrentQueueSegment(length);
foreach (T item in collection)
{
Enqueue(item);
}
}
/<code>

可以看到,第一段的大小是根據初始集合的大小確定的,如果集合大小count大於32就對count進行

向上取2的N次冪(RoundUpToPowerOf2)得到實際大小(但是不能超過最大值),否則就按默認值32來初始化。

向上取2的N次冪到底是啥意思??例如count是5,那得到的結果就是8(2×2×2);如果count是9,那結果就是16(2×2×2×2);如果剛好count是8那結果就是8(2×2×2),具體算法是通過位運算實現的很有意思。至於為什麼一定要是2的N次冪,中間的玄機我也沒搞明白。。

順藤摸瓜,再看看進隊操作如何實現。

元素進隊

<code>/// <summary>在隊尾追加一個元素/<summary>
public void Enqueue(T item)
{
// 先嚐試在尾段插入一個元素
if (!_tail.TryEnqueue(item))
{
// 如果插入失敗,就意味著尾段已經填滿,需要往後擴容
EnqueueSlow(item);
}
}

private void EnqueueSlow(T item)
{
while (true)
{
ConcurrentQueueSegment tail = _tail;

// 先嚐試再隊尾插入元素,如果擴容完成了就會成功
if (tail.TryEnqueue(item))

{
return;
}
// 獲得一把鎖,避免多個線程同時進行擴容
lock (_crossSegmentLock)
{
//檢查是否擴容過了
if (tail == _tail)
{
// 尾段凍結
tail.EnsureFrozenForEnqueues();
// 計算下一段的長度
int nextSize = tail._preservedForObservation ? InitialSegmentLength : Math.Min(tail.Capacity * 2, MaxSegmentLength);
var newTail = new ConcurrentQueueSegment(nextSize);

// 改變隊尾指向
tail._nextSegment = newTail;
// 指針交換
_tail = newTail;
}
}
}
}
/<code>

從以上流程可以看到,擴容的主動權不再由Segment去控制,而是交給了隊列。正因為如此,所以在跨段操作時要先加鎖,在Framework版本中是在原子操作獲得指針後進行的擴容所以不會有這個問題,後面的出隊操作也是一樣的道理。擴容過程中有兩個細節需要重點關注,那就是SegmentFrozen和下一段的長度計算。從前面Segment的定義中我們看到它維護了一個_frozenForEnqueues標記字段,表示當前段是否被凍結鎖定,在被鎖住的情況下會讓其他入隊操作失敗,看一下實現過程:

<code>// must only be called while queue's segment lock is held
internal void EnsureFrozenForEnqueues()
{
// flag used to ensure we don't increase the Tail more than once if frozen more than once
if (!_frozenForEnqueues)
{
_frozenForEnqueues = true;
Interlocked.Add(ref _headAndTail.Tail, FreezeOffset);
}
}
/<code>

首先判斷當前凍結狀態,然後把它設置為true,再使用原子操作把尾指針增加了2倍段長的偏移量,這個尾指針才是真正限制當前段不可新增元素的關鍵點,後面講段的元素追加再關聯起來詳細介紹。而為什麼要指定2倍段長這麼一個特殊值呢,目的是為了把尾指針和mask做運算後落在同一個slot上,也就是說雖然兩個指針位置不一樣但是都指向的是同一個槽。

再說說下一段長度的計算問題,它主要是受_preservedForObservation這個字段影響,正常情況下一段的長度是尾段的2倍,但如果尾段正好被標記為觀察保留(類似於上一篇的截取快照),那麼下一段的長度依然是初始值32,原作者認為入隊操作不是很頻繁,這樣做主要是為了避免浪費空間。

接著是重頭戲,看一下如何給段追加元素:

<code>public bool TryEnqueue(T item)
{
Slot[] slots = _slots;

// 如果發生競爭就自旋等待
SpinWait spinner = default;
while (true)
{
// 獲取當前段的尾指針
int currentTail = Volatile.Read(ref _headAndTail.Tail);
// 計算槽點
int slotsIndex = currentTail & _slotsMask;
// 讀取對應槽的序列號
int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);

// 判斷槽點序列號和指針是否匹配
int diff = sequenceNumber - currentTail;
if (diff == 0)
{
// 通過原子操作比較交換,保證了只有一個入隊者獲得可用空間
if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail)
{
// 把數據存入對應的槽點,以及更新序列號
slots[slotsIndex].Item = item;
Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentTail + 1);
return true;
}
}
else if (diff < 0)
{
// 序列號小於指針就說明該段已經裝滿了,直接返回false
return false;
}

// 這次競爭失敗了,只好等下去
spinner.SpinOnce(sleep1Threshold: -1);
}

}
/<code>

整個流程的核心就是藉助槽點序列號和尾指針的匹配情況判斷是否有可用空間,因為在初始化的時候序列號是從0遞增,正常情況下尾指針和序列號肯定是匹配的,只有在整個段被裝滿時尾指針才會大於序列號,因為前面的凍結操作會給尾指針追加2倍段長的偏移量。要重點提出的是,只有在數據被寫入並且序列號更新完成後才表示整個位置的元素有效,才能有出隊的機會,在Framework是通過維護一個狀態位來實現這個功能。整個設計很有意思,要慢慢品。

這裡我們可以總結一下序列號的核心作用:假設一個槽點N,對應序列號是Q,它能允許入隊的必要條件之一就是N==Q,由於入隊操作把位置N的序列號修改成N+1,那麼可以猜測出在出隊時的必要條件之一就是滿足Q==N+1。

代碼中的CompareExchange在上一篇中有介紹,這裡不再重複。另外關於Volatile相關的稍微提一下,它的核心作用是避免內存與CPU之間的高速緩存帶來的數據不一致問題,告訴編譯器直接讀寫原始數據,有興趣的可以找資料瞭解,限於篇幅不過多介紹。

元素出隊

可以猜測到,入隊的時候要根據容量大小進行擴容,那麼與之對應的,出隊的時候就需要對它進行壓縮,也就是丟棄沒有數據的段。

<code>/// <summary>從隊首移除一個元素/<summary>
public bool TryDequeue([MaybeNullWhen(false)] out T result) =>
_head.TryDequeue(out result) ||
TryDequeueSlow(out result);

private bool TryDequeueSlow([MaybeNullWhen(false)] out T item)
{
// 不斷循環嘗試出隊,直到成功或失敗為止
while (true)
{
ConcurrentQueueSegment head = _head;

// 嘗試從隊首移除,如果成功就直接返回了
if (head.TryDequeue(out item))
{
return true;
}

// 如果首段為空並且沒有下一段了,則說明整個隊列都沒有數據了,返回失敗
if (head._nextSegment == null)
{
item = default!;
return false;
}

// 既然下一段不為空,那就再次確認本段是否還能出隊成功,否則就要把它給移除了,等待下次循環從下一段出隊
if (head.TryDequeue(out item))
{

return true;
}

// 首段指針要往後移動,表示當前首段已丟棄,跨段操作要先加鎖
lock (_crossSegmentLock)
{
if (head == _head)
{
_head = head._nextSegment;
}
}
}
}
/<code>

整體流程基本和入隊一樣,外層通過一個死循環不斷嘗試操作,直到出隊成功或者隊列為空返回失敗為止。釋放空間的操作也從Segment轉移到隊列上,所以要加鎖保證線程安全。這一步我在代碼註釋中寫的很詳細就不多解釋了,再看一下核心操作Segment是如何移除元素的:

<code>public bool TryDequeue([MaybeNullWhen(false)] out T item)
{
Slot[] slots = _slots;

// 遇到競爭時自旋等待
SpinWait spinner = default;
while (true)
{
// 獲取頭指針地址
int currentHead = Volatile.Read(ref _headAndTail.Head);
// 計算槽點
int slotsIndex = currentHead & _slotsMask;

// 獲取槽點對應的序列號
int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);


// 比較序列號是否和期望值一樣,為什麼要加1的原因前面入隊時說過
int diff = sequenceNumber - (currentHead + 1);
if (diff == 0)
{
// 通過原子操作比較交換得到可以出隊的槽點,並把頭指針往後移動一位
if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead)
{
// 取出數據
item = slots[slotsIndex].Item!;
// 此時如果該段沒有被標記觀察保護,要把這個槽點的數據清空
if (!Volatile.Read(ref _preservedForObservation))
{
slots[slotsIndex].Item = default;
Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentHead + slots.Length);
}
return true;
}
}
else if (diff < 0)
{
// 這種情況說明該段已經沒有有效數據了,直接返回失敗。
bool frozen = _frozenForEnqueues;
int currentTail = Volatile.Read(ref _headAndTail.Tail);
if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
{
item = default!;
return false;
}
}

// 競爭失敗進入下一輪等待
spinner.SpinOnce(sleep1Threshold: -1);
}
}
/<code>

流程和追加元素類似,大部分都寫在備註裡面了,這裡只額外提一下為空的情況。Segment為空只有一種情況,那就是頭尾指針落在了同一個槽點,但這是會出現兩種可能性:

  • 第一種是都落在了非最後一個槽點,意味著該段沒有被裝滿,拿首尾指針相減即可判斷。
  • 第二種是都落在了最後一個槽點,意味著該段已經被裝滿了,如果此時正在進行擴容(frozen),那麼必須要在尾指針的基礎上減去FreezeOffset再去和頭指針判斷,原因前面有說過;

是不是感覺環環相扣、相輔相成、如膠似漆、balabala.....

統計元素數量

前面也預告過,因為隊列不再維護段索引,這樣會導致計算元素數量變得非常複雜,複雜到我都不想說這一部分了。簡單描述一下就跳過了:核心思路就是一段一段來遍歷,然後計算出每段的大小最後把結果累加,如果涉及多個段還得加鎖,具體到段內部就要根據首尾指針計算槽點得出實際數量等等等等,代碼很長就不貼出來了。

這裡也嚴重提醒一句,非必要情況下不要調用Count不要調用Count不要調用Count。

接下來重點說一下隊列的IsEmpty。由於Segment不再維護IsEmpty信息,所以實現方式就有點曲線救國了,通過嘗試能否從隊首位置獲取一個元素來判斷是否隊列為空,也就是常說的TryPeek操作,但細節上稍有不同。

<code>/// <summary>
/// 判斷隊列是否為空,千萬不要使用Count==0來判斷,也不要直接TryPeek
/// /<summary>
public bool IsEmpty => !TryPeek(out _, resultUsed: false);

private bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed)
{
ConcurrentQueueSegment s = _head;
while (true)
{
ConcurrentQueueSegment? next = Volatile.Read(ref s._nextSegment);

// 從首段中獲取頭部元素,成功的話直接返回true,獲取失敗就意味著首段為空了
if (s.TryPeek(out result, resultUsed))
{
return true;
}

// 如果下一段不為空那就再嘗試從下一段重新獲取
if (next != null)
{
s = next;
}
//如果下一段為空就說明整個隊列為空,跳出循環直接返回false了
else if (Volatile.Read(ref s._nextSegment) == null)
{
break;
}
}
result = default!;
return false;
}
/<code>

上面的代碼可以看到有一個特殊的參數resultUsed,它具體會有什麼影響呢,那就得看看Segment是如何peek的:

<code>public bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed)
{
// 實際上隊列的TryPeek是一個觀察保護操作,這時resultUsed會標記成true,如果是IsEmpty操作的話就為false,因為並不關心這個元素是否被釋放了
if (resultUsed)
{
_preservedForObservation = true;
Interlocked.MemoryBarrier();
}

Slot[] slots = _slots;

SpinWait spinner = default;
while (true)
{
int currentHead = Volatile.Read(ref _headAndTail.Head);
int slotsIndex = currentHead & _slotsMask;

int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);

int diff = sequenceNumber - (currentHead + 1);
if (diff == 0)
{
result = resultUsed ? slots[slotsIndex].Item! : default!;
return true;
}
else if (diff < 0)
{
bool frozen = _frozenForEnqueues;
int currentTail = Volatile.Read(ref _headAndTail.Tail);
if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
{
result = default!;
return false;
}
}
spinner.SpinOnce(sleep1Threshold: -1);
}
}
/<code>

除了最開始的resultUsed判斷,其他的基本和出隊的邏輯一致,前面說的很詳細,這裡不多介紹了。

枚舉轉換數據

前面反覆的提到觀察保護,這究竟是個啥意思??為什麼要有這個操作??

其實看過上一篇文章的話就比較好理解一點,這裡稍微回顧一下方便對比。在Framework中會有截取快照的操作,也就是類似ToArray\\ToList\\GetEnumerator這種要做數據迭代,它是通過原子操作維護一個m_numSnapshotTakers字段來實現對數據的保護,目的是為了告訴其他出隊的線程我正在遍歷數據,你們執行出隊的時候不要把數據給刪了我要用的。在Core中也是為了實現同樣的功能才引入了觀察保護的概念,換了一種實現方式而已。

那麼就以ToArray為例是怎麼和其他操作交互的:

<code>public T[] ToArray()
{
// 這一步可以理解為保護現場
SnapForObservation(out ConcurrentQueueSegment head, out int headHead, out ConcurrentQueueSegment tail, out int tailTail);

// 計算隊列長度,這也是要返回的數組大小
long count = GetCount(head, headHead, tail, tailTail);
T[] arr = new T[count];

// 開始迭代數據塞到目標數組中
using (IEnumerator e = Enumerate(head, headHead, tail, tailTail))
{

int i = 0;
while (e.MoveNext())
{
arr[i++] = e.Current;
}
Debug.Assert(count == i);
}
return arr;
}
/<code>

上面的代碼中,有一次獲取隊列長度的操作,還有一次獲取迭代數據的操作,這兩步邏輯比較相似都是對整個隊列進行遍歷,所以做一次數據轉換的開銷非常非常大,使用的時候一定要謹慎。別的不多說,重點介紹一下如何實現保護現場的過程:

<code>private void SnapForObservation(out ConcurrentQueueSegment head, out int headHead, out ConcurrentQueueSegment tail, out int tailTail)
{
// 要保護現場肯定要先來一把鎖
lock (_crossSegmentLock)
{
head = _head;
tail = _tail;

// 一段一段進行遍歷
for (ConcurrentQueueSegment s = head; ; s = s._nextSegment!)
{
// 把每一段的觀察保護標記設置成true
s._preservedForObservation = true;
// 遍歷到最後一段了就結束
if (s == tail) break;
}

// 尾段凍結,這樣就不能新增元素
tail.EnsureFrozenForEnqueues();

// 返回兩個指針地址用來對每一個元素進行遍歷
headHead = Volatile.Read(ref head._headAndTail.Head);
tailTail = Volatile.Read(ref tail._headAndTail.Tail);
}
}
/<code>

可以看到上來就是一把鎖,如果此時正在進行擴容或者收容的操作會直接阻塞掉,運氣好沒有阻塞的話你也不能有新元素入隊了,因為尾段已經凍結鎖死只能自旋等待,而出隊也不能釋放空間了。原話是:

At this point, any dequeues from any segment won't overwrite the value, and none of the existing segments can have new items enqueued.

有人就要問,這裡把尾段鎖死那等ToArray()完成後豈不是也不能有新元素入隊了?不用擔心,前面入隊邏輯提到過如果該段被鎖住隊列會新創建一個段然後再嘗試入隊,這樣就能成功了。但是問題又來了,假如前面的段還有很多空位,那豈不是有浪費空間的嫌疑?我們知道沒有觀察保護的時候每段會以2倍長度遞增,這樣的話空間浪費率還是挺高的。帶著疑問提了個Issue問一下:https://github.com/dotnet/runtime/issues/35094

到這裡就基本把.NET Core ConcurrentQueue說完了。


總結

對比Framework下的併發隊列,Core裡面的改動還是不小的,儘管保留了SpinWait和Interlocked相關操作,但是也加入了lock,邏輯上也複雜了很多,我一步步分析和寫文章搞了好幾天。

至於性能對比,我找到一個官方給出的測試結果,有興趣的可以看看:

https://github.com/dotnet/runtime/issues/27458#issuecomment-423964046


原文地址:https://www.cnblogs.com/hohoa/p/12685237.html


分享到:


相關文章: