一、前言
Go語言在設計上對同步(Synchronization,數據同步和線程同步)提供大量的支持,比如 goroutine和channel同步原語,庫層面有
1. sync:提供基本的同步原語(比如Mutex、RWMutex、Locker)和 工具類(Once、WaitGroup、Cond、Pool、Map)
2. sync/atomic:提供原子操作(基於硬件指令compare-and-swap)
引用自 。
上一期中,我們介紹瞭如何使用 sync.WaitGroup 提高程序的並行度。本期文章我們介紹 package sync 下的另一個工具類:sync.Cond。
sync.Cond 對標 同步原語“條件變量”,它可以阻塞一個,或同時阻塞多個線程,直到另一個線程 1) 修改了條件變量; 2)通知一個(或所有)等待的線程。
注:在使用層面上,Go語言裡沒有線程,只有更輕量級的協程。本文中,“線程”均代指“協程”(goroutine)。
相對於 sync.Once 和 sync.WaitGroup, sync.Cond 比較難以理解,使用門檻也很高,在 Google 上搜一下,排名前10結果中有這樣幾個:
非常神奇的是:一篇名為 “如何正確使用sync.Cond” 的帖子竟然有 16k 的瀏覽量!
究竟是條件變量這個概念難以理解,還是 sync.Cond 的設計太反人類,我們一探究竟。
二、sync.Cond 怎麼用
開篇我們就提到了條件變量的應用場景,我們回顧一下:
sync.Cond 對標 同步原語“條件變量”,它可以阻塞一個,或同時阻塞多個線程,直到另一個線程 1) 修改了條件變量; 2)通知一個(或所有)等待的線程。
首先,我們把概念搞清楚,條件變量的作用是控制多個線程對一個共享變量的讀寫。我們有三類主體:
- 共享變量:條件變量控制多個線程對該變量的讀寫;
- 等待線程:被條件變量阻塞的線程,有一個或多個;
- 更新線程:更新共享變量,並喚起一個或多個等待線程。
其次,我們看看 sync.Cond 的說明書:
// 創建一個 sync.Cond 對象func NewCond(l Locker) *Cond// 阻塞當前線程,並等待條件觸發func (c *Cond) Wait()// 喚醒所有等待線程func (c *Cond) Broadcast()// 喚起一個等待線程// 沒有等待線程也不會報錯func (c *Cond) Signal()
大家看完這段代碼,腦子裡第一個問題大概是:NewCond 要一把鎖是幹嘛用的?為了便於理解,我們以 kubernetes 源碼裡 FIFO 隊列為例,一步一步說 sync.Cond 的用法:
type FIFO struct { // lock 控制對象讀寫 lock sync.RWMutex // 阻塞Pop操作,Add成功後激活被阻塞線程 cond sync.Cond // items 存儲數據 items map[string]interface{} // queue 存儲key queue []string // keyFunc是hash函數 keyFunc KeyFunc // 維護items和queue同步 populated bool initialPopulationCount int // 隊列狀態:是否已經關閉 closed bool closedLock sync.Mutex}
首先,這是一個 FIFO 隊列,問題又來了:go 內置的 channel 不香嗎?還真的是不夠香。
FIFO 具備一些額外的特性:
- 支持自定義處理函數,並保障每個元素只被處理一次(exactly once);
- 支持元素去重,版本更新,並只處理最新版本,而不是每次更新都處理一次;
- 支持元素刪除,刪除的元素不進行處理;
- 支持 list 所有元素。
FIFO 的成員函數有:
// 從隊頭取一個元素,沒有則會被阻塞Pop(PopProcessFunc) (interface{}, error)// 向隊尾加一個元素,如果已經存在,則不做任何操作Add(obj interface{}) errorAddIfNotPresent(interface{}) error// 更新元素Update(obj interface{}) error// 刪除元素Delete(obj interface{}) error// 關閉隊列Close()// 讀取所有元素List() []interface{}// 讀取所有 keyListKeys() []string// 通過元素讀取元素(通過 keyFunc 映射到同樣的 key)Get(obj interface{}) (item interface{}, exists bool, err error)// 通過key讀取元素GetByKey(key string) (item interface{}, exists bool, err error)// 用傳入的數組替換隊列內容Replace([]interface{}, string) error// 同步items和queueResync() error// items和queue是否同步HasSynced() bool
回到本文的主題 sync.Cond, 在上面這個例子中
- 一個 FIFO 實例就是一個共享變量;
- 調用 Pop 的線程是等待線程;
- 調用 Add 的線程是更新線程;
lock sync.RWMutex 用於控制對共享變量的併發訪問,本質上是控制對 queue 和 items 兩個字段的併發訪問。由於條件變量 cond sync.Cond 在實現 Wait 時,把鎖操作也包含進去了,所以初始化時需要傳入一個鎖變量。在使用時,是這樣的:
// 初始化一個 FIFOfunc NewFIFO(keyFunc KeyFunc) *FIFO { // lock 和 cond 均是默認值 f := &FIFO{ items: map[string]interface{}{}, queue: []string{}, keyFunc: keyFunc, } // 將 lock 共享給 cond f.cond.L = &f.lock return f}// Pop 操作func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { // 鎖住共享變量 f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // 隊列已關閉 if f.IsClosed() { return nil, ErrFIFOClosed } // 隊列為空,等待數據 f.cond.Wait() } // 此處省略一段代碼... // 從 items 和 queue 刪除元素 }}// Add 操作func (f *FIFO) Add(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } // 鎖住共享變量 f.lock.Lock() defer f.lock.Unlock() // 此處省略一段代碼 ... // 添加元素到 items 和 queue // 通知等待線程 f.cond.Broadcast() return nil}
上面的代碼中,等待線程做的是:
- 給共享變量加鎖
- 有數據,就返回數據;沒有數據就調用 Wait 等數據
更新線程做的是:
- 給共享變量加鎖
- 寫入數據,調用 Broadcast
看起來很簡單,Ok? 但是你品一品,你細品,發現事情沒那麼簡單。
等待線程 加鎖以後,更新線程 要更新共享變量,怎麼會取到鎖呢?
我們先看看官方文檔對 Wait 的解釋:
Wait atomically unlocks c.L and suspends execution of the calling goroutine. After later resuming execution, Wait locks c.L before returning.
https://golang.org/pkg/sync/#Cond.Wait
大概意思是: Wait 首先會解鎖 c.L,然後阻塞當前的協程;後續協程被 Broadcast/Signal 喚醒以後,在對 c.L 加鎖,然後 return。
所以,cond sync.Cond 的初始化需要一把鎖,並且和 FIFO 實例用同一把鎖。
三、sync.Cond 實現
如果不考慮 runtime 如何實現阻塞和激活,sync.Cond 本身的實現邏輯還是比較簡單的。我們看下源碼(刪減版):
type Cond struct { noCopy noCopy // 共享變量被訪問前,必須取到鎖 L L Locker notify notifyList checker copyChecker}// Waitfunc (c *Cond) Wait() { // 給當前協程分配一張船票 t := runtime_notifyListAdd(&c.notify) // 解鎖 c.L.Unlock() // 暫定當前協程的執行,等通知 runtime_notifyListWait(&c.notify, t) // 加鎖 c.L.Lock()}// Signal 喚醒被 c 阻塞的一個協程(如果有)func (c *Cond) Signal() { runtime_notifyListNotifyOne(&c.notify)}// Broadcast 喚醒所有被 c 阻塞的協程func (c *Cond) Broadcast() { runtime_notifyListNotifyAll(&c.notify)}
這裡著重說下 runtime_* 函數的功能:
- runtime_notifyListAdd 將當前線程添加到通知列表,以能夠接收通知;
- runtime_notifyListWait 將當前協程休眠,接收到通知以後才會被喚醒;
- runtime_notifyListNotifyOne 發送通知,喚醒 notify 列表裡一個協程
- runtime_notifyListNotifyAll 發送通知,喚醒 notify 列表裡所有協程
四、總結
sync.Cond 是Go語言對條件變量的一個實現方式,但不是唯一的方式。本質上,sync.Once 和 channel 也是條件變量的實現。
- sync.Once 裡鎖和原子操作用於控制共享變量的讀寫;
- channel 通過 close(ch) 可以通知其他協程讀取數據;
但 sync.Once 和 channel 有一個明顯的缺點是:它們都只能保證第一次滿足條件變量,而 sync.Cond 可以提供持續的保障。
由於 sync.Cond 的複雜性(我認為是 godoc 寫的太差了),且應用場景相對較少,其出現頻次低於 sync.Once 和 sync.WaitGroup。不過在合適的應用場景出現時,它就會展示出自己的不可替代性。
References
- C++ std::condition_variable https://en.cppreference.com/w/cpp/thread/condition_variable
- kubernetes FIFO queue https://github.com/kubernetes/client-go/blob/master/tools/cache/fifo.go
閱讀更多 趙帥虎 的文章