Golang package sync 剖析(三):sync.Cond


Golang package sync 剖析(三):sync.Cond

一、前言

Go語言在設計上對同步(Synchronization,數據同步和線程同步)提供大量的支持,比如 goroutine和channel同步原語,庫層面有

1. sync:提供基本的同步原語(比如Mutex、RWMutex、Locker)和 工具類(Once、WaitGroup、Cond、Pool、Map)

2. sync/atomic:提供原子操作(基於硬件指令compare-and-swap)

引用自 。

上一期中,我們介紹瞭如何使用 sync.WaitGroup 提高程序的並行度。本期文章我們介紹 package sync 下的另一個工具類:sync.Cond。

sync.Cond 對標 同步原語“條件變量”,它可以阻塞一個,或同時阻塞多個線程,直到另一個線程 1) 修改了條件變量; 2)通知一個(或所有)等待的線程。

注:在使用層面上,Go語言裡沒有線程,只有更輕量級的協程。本文中,“線程”均代指“協程”(goroutine)。

相對於 sync.Once 和 sync.WaitGroup, sync.Cond 比較難以理解,使用門檻也很高,在 Google 上搜一下,排名前10結果中有這樣幾個:

Golang package sync 剖析(三):sync.Cond

關鍵詞“golang sync.Cond”部分Google搜索結果

非常神奇的是:一篇名為 “如何正確使用sync.Cond” 的帖子竟然有 16k 的瀏覽量!

Golang package sync 剖析(三):sync.Cond

如何正確使用sync.Cond

究竟是條件變量這個概念難以理解,還是 sync.Cond 的設計太反人類,我們一探究竟。

二、sync.Cond 怎麼用

開篇我們就提到了條件變量的應用場景,我們回顧一下:

sync.Cond 對標 同步原語“條件變量”,它可以阻塞一個,或同時阻塞多個線程,直到另一個線程 1) 修改了條件變量; 2)通知一個(或所有)等待的線程。

首先,我們把概念搞清楚,條件變量的作用是控制多個線程對一個共享變量的讀寫。我們有三類主體:

  1. 共享變量:條件變量控制多個線程對該變量的讀寫;
  2. 等待線程:被條件變量阻塞的線程,有一個或多個;
  3. 更新線程:更新共享變量,並喚起一個或多個等待線程。

其次,我們看看 sync.Cond 的說明書:

// 創建一個 sync.Cond 對象func NewCond(l Locker) *Cond// 阻塞當前線程,並等待條件觸發func (c *Cond) Wait()// 喚醒所有等待線程func (c *Cond) Broadcast()// 喚起一個等待線程// 沒有等待線程也不會報錯func (c *Cond) Signal()

大家看完這段代碼,腦子裡第一個問題大概是:NewCond 要一把鎖是幹嘛用的?為了便於理解,我們以 kubernetes 源碼裡 FIFO 隊列為例,一步一步說 sync.Cond 的用法:

type FIFO struct {  // lock 控制對象讀寫  lock sync.RWMutex  // 阻塞Pop操作,Add成功後激活被阻塞線程  cond sync.Cond  // items 存儲數據  items map[string]interface{}  // queue 存儲key  queue []string  // keyFunc是hash函數  keyFunc KeyFunc  // 維護items和queue同步  populated bool  initialPopulationCount int  // 隊列狀態:是否已經關閉  closed     bool  closedLock sync.Mutex}

首先,這是一個 FIFO 隊列,問題又來了:go 內置的 channel 不香嗎?還真的是不夠香。

Golang package sync 剖析(三):sync.Cond

真香警告

FIFO 具備一些額外的特性:

  1. 支持自定義處理函數,並保障每個元素只被處理一次(exactly once);
  2. 支持元素去重,版本更新,並只處理最新版本,而不是每次更新都處理一次;
  3. 支持元素刪除,刪除的元素不進行處理;
  4. 支持 list 所有元素。

FIFO 的成員函數有:

// 從隊頭取一個元素,沒有則會被阻塞Pop(PopProcessFunc) (interface{}, error)// 向隊尾加一個元素,如果已經存在,則不做任何操作Add(obj interface{}) errorAddIfNotPresent(interface{}) error// 更新元素Update(obj interface{}) error// 刪除元素Delete(obj interface{}) error// 關閉隊列Close()// 讀取所有元素List() []interface{}// 讀取所有 keyListKeys() []string// 通過元素讀取元素(通過 keyFunc 映射到同樣的 key)Get(obj interface{}) (item interface{}, exists bool, err error)// 通過key讀取元素GetByKey(key string) (item interface{}, exists bool, err error)// 用傳入的數組替換隊列內容Replace([]interface{}, string) error// 同步items和queueResync() error// items和queue是否同步HasSynced() bool

回到本文的主題 sync.Cond, 在上面這個例子中

  • 一個 FIFO 實例就是一個共享變量
  • 調用 Pop 的線程是等待線程
  • 調用 Add 的線程是更新線程

lock sync.RWMutex 用於控制對共享變量的併發訪問,本質上是控制對 queue 和 items 兩個字段的併發訪問。由於條件變量 cond sync.Cond 在實現 Wait 時,把鎖操作也包含進去了,所以初始化時需要傳入一個鎖變量。在使用時,是這樣的:

// 初始化一個 FIFOfunc NewFIFO(keyFunc KeyFunc) *FIFO {  // lock 和 cond 均是默認值  f := &FIFO{    items:   map[string]interface{}{},    queue:   []string{},    keyFunc: keyFunc,  }  // 將 lock 共享給 cond  f.cond.L = &f.lock  return f}// Pop 操作func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {  // 鎖住共享變量  f.lock.Lock()  defer f.lock.Unlock()  for {    for len(f.queue) == 0 {      // 隊列已關閉      if f.IsClosed() {        return nil, ErrFIFOClosed      }      // 隊列為空,等待數據      f.cond.Wait()    }        // 此處省略一段代碼...    // 從 items 和 queue 刪除元素  }}// Add 操作func (f *FIFO) Add(obj interface{}) error {  id, err := f.keyFunc(obj)  if err != nil {    return KeyError{obj, err}  }    // 鎖住共享變量  f.lock.Lock()  defer f.lock.Unlock()    // 此處省略一段代碼 ...  // 添加元素到 items 和 queue  // 通知等待線程  f.cond.Broadcast()  return nil}

上面的代碼中,等待線程做的是:

  1. 給共享變量加鎖
  2. 有數據,就返回數據;沒有數據就調用 Wait 等數據

更新線程做的是:

  1. 給共享變量加鎖
  2. 寫入數據,調用 Broadcast

看起來很簡單,Ok? 但是你品一品,你細品,發現事情沒那麼簡單。

Golang package sync 剖析(三):sync.Cond

周冬雨的凝視

等待線程 加鎖以後,更新線程 要更新共享變量,怎麼會取到鎖呢?

我們先看看官方文檔對 Wait 的解釋:

Wait atomically unlocks c.L and suspends execution of the calling goroutine. After later resuming execution, Wait locks c.L before returning.

https://golang.org/pkg/sync/#Cond.Wait

大概意思是: Wait 首先會解鎖 c.L,然後阻塞當前的協程;後續協程被 Broadcast/Signal 喚醒以後,在對 c.L 加鎖,然後 return。

所以,cond sync.Cond 的初始化需要一把鎖,並且和 FIFO 實例用同一把鎖。

三、sync.Cond 實現

如果不考慮 runtime 如何實現阻塞和激活,sync.Cond 本身的實現邏輯還是比較簡單的。我們看下源碼(刪減版):

type Cond struct {  noCopy noCopy  // 共享變量被訪問前,必須取到鎖 L  L Locker  notify  notifyList  checker copyChecker}// Waitfunc (c *Cond) Wait() {  // 給當前協程分配一張船票  t := runtime_notifyListAdd(&c.notify)  // 解鎖  c.L.Unlock()  // 暫定當前協程的執行,等通知  runtime_notifyListWait(&c.notify, t)  // 加鎖  c.L.Lock()}// Signal 喚醒被 c 阻塞的一個協程(如果有)func (c *Cond) Signal() {  runtime_notifyListNotifyOne(&c.notify)}// Broadcast 喚醒所有被 c 阻塞的協程func (c *Cond) Broadcast() {  runtime_notifyListNotifyAll(&c.notify)}

這裡著重說下 runtime_* 函數的功能:

  • runtime_notifyListAdd 將當前線程添加到通知列表,以能夠接收通知;
  • runtime_notifyListWait 將當前協程休眠,接收到通知以後才會被喚醒;
  • runtime_notifyListNotifyOne 發送通知,喚醒 notify 列表裡一個協程
  • runtime_notifyListNotifyAll 發送通知,喚醒 notify 列表裡所有協程

四、總結

sync.Cond 是Go語言對條件變量的一個實現方式,但不是唯一的方式。本質上,sync.Once 和 channel 也是條件變量的實現。

  1. sync.Once 裡鎖和原子操作用於控制共享變量的讀寫;
  2. channel 通過 close(ch) 可以通知其他協程讀取數據;

但 sync.Once 和 channel 有一個明顯的缺點是:它們都只能保證第一次滿足條件變量,而 sync.Cond 可以提供持續的保障。

由於 sync.Cond 的複雜性(我認為是 godoc 寫的太差了),且應用場景相對較少,其出現頻次低於 sync.Once 和 sync.WaitGroup。不過在合適的應用場景出現時,它就會展示出自己的不可替代性。

References

  1. C++ std::condition_variable https://en.cppreference.com/w/cpp/thread/condition_variable
  2. kubernetes FIFO queue https://github.com/kubernetes/client-go/blob/master/tools/cache/fifo.go


分享到:


相關文章: