深入golang之---goroutine併發控制與通信

開發 go 程序的時候,時常需要使用 goroutine 併發處理任務,有時候這些 goroutine 是相互獨立的,而有的時候,多個 goroutine 之間常常是需要同步與通信的。另一種情況,主 goroutine 需要控制它所屬的子 goroutine,總結起來,實現多個 goroutine 間的同步與通信大致有:

  • 全局共享變量
  • channel 通信(CSP 模型)
  • Context 包

本文章通過 goroutine 同步與通信的一個典型場景-通知子 goroutine 退出運行,來深入講解下 golang 的控制併發。

通知多個子 goroutine 退出運行

goroutine 作為 go 語言的併發利器,不僅性能強勁而且使用方便:只需要一個關鍵字 go 即可將普通函數併發執行,且 goroutine 佔用內存極小(一個 goroutine 只佔 2KB 的內存),所以開發 go 程序的時候很多開發者常常會使用這個併發工具,獨立的併發任務比較簡單,只需要用 go 關鍵字修飾函數就可以啟用一個 goroutine 直接運行;但是,實際的併發場景常常是需要進行協程間的同步與通信,以及精確控制子 goroutine 開始和結束,其中一個典型場景就是主進程通知名下所有子 goroutine 優雅退出運行。

由於 goroutine 的退出機制設計是,goroutine 退出只能由本身控制,不允許從外部強制結束該 goroutine。只有兩種情況例外,那就是 main 函數結束或者程序崩潰結束運行;所以,要實現主進程控制子 goroutine 的開始和結束,必須藉助其它工具來實現。

控制併發的方法

實現控制併發的方式,大致可分成以下三類:

  • 全局共享變量
  • channel 通信
  • Context 包

全局共享變量

這是最簡單的實現控制併發的方式,實現步驟是:

  1. 聲明一個全局變量;
  2. 所有子 goroutine 共享這個變量,並不斷輪詢這個變量檢查是否有更新;
  3. 在主進程中變更該全局變量;
  4. 子 goroutine 檢測到全局變量更新,執行相應的邏輯。

示例如下:

<code>package mainimport ("fmt""time")func main() {running := truef := func() {for running {fmt.Println("sub proc running...")time.Sleep(1 * time.Second)}fmt.Println("sub proc exit")}go f()go f()go f()time.Sleep(2 * time.Second)running = falsetime.Sleep(3 * time.Second)fmt.Println("main proc exit")}/<code>

全局變量的優勢是簡單方便,不需要過多繁雜的操作,通過一個變量就可以控制所有子 goroutine 的開始和結束;缺點是功能有限,由於架構所致,該全局變量只能是多讀一寫,否則會出現數據同步問題,當然也可以通過給全局變量加鎖來解決這個問題,但那就增加了複雜度,另外這種方式不適合用於子 goroutine 間的通信,因為全局變量可以傳遞的信息很小;還有就是主進程無法等待所有子 goroutine 退出,因為這種方式只能是單向通知,所以這種方法只適用於非常簡單的邏輯且併發量不太大的場景,一旦邏輯稍微複雜一點,這種方法就有點捉襟見肘。

channel 通信

另一種更為通用且靈活的實現控制併發的方式是使用 channel 進行通信。 首先,我們先來了解下什麼是 golang 中的 channel:Channel 是 Go 中的一個核心類型,你可以把它看成一個管道,通過它併發核心單元就可以發送或者接收數據進行通訊(communication)。 要想理解 channel 要先知道 CSP 模型:

CSP 是 Communicating Sequential Process 的簡稱,中文可以叫做通信順序進程,是一種併發編程模型,由 Tony Hoare 於 1977 年提出。簡單來說,CSP 模型由併發執行的實體(線程或者進程)所組成,實體之間通過發送消息進行通信,這裡發送消息時使用的就是通道,或者叫 channel。CSP 模型的關鍵是關注 channel,而不關注發送消息的實體。Go 語言實現了 CSP 部分理論,goroutine 對應 CSP 中併發執行的實體,channel 也就對應著 CSP 中的 channel。 也就是說,CSP 描述這樣一種併發模型:多個 Process 使用一個 Channel 進行通信, 這個 Channel 連結的 Process 通常是匿名的,消息傳遞通常是同步的(有別於 Actor Model)。

先來看示例代碼:

<code>package mainimport (    "fmt"    "os"    "os/signal"    "sync"    "syscall"    "time")func consumer(stop /<code>

這裡可以實現優雅等待所有子 goroutine 完全結束之後主進程才結束退出,藉助了標準庫 sync 裡的 Waitgroup,這是一種控制併發的方式,可以實現對多 goroutine 的等待,官方文檔是這樣描述的:

A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.

簡單來講,它的源碼裡實現了一個類似計數器的結構,記錄每一個在它那裡註冊過的協程,然後每一個協程完成任務之後需要到它那裡註銷,然後在主進程那裡可以等待直至所有協程完成任務退出。 使用步驟:

  1. 創建一個 Waitgroup 的實例 wg;
  2. 在每個 goroutine 啟動的時候,調用 wg.Add(1)註冊;
  3. 在每個 goroutine 完成任務後退出之前,調用 wg.Done()註銷。
  4. 在等待所有 goroutine 的地方調用 wg.Wait()阻塞進程,知道所有 goroutine 都完成任務調用 wg.Done()註銷之後,Wait()方法會返回。

該示例程序是一種 golang 的 select+channel 的典型用法,我們來稍微深入一點分析一下這種典型用法:

channel

首先了解下 channel,可以理解為管道,它的主要功能點是:

  1. 隊列存儲數據
  2. 阻塞和喚醒 goroutine

channel 實現集中在文件 runtime/chan.go 中,channel 底層數據結構是這樣的:

<code>type hchan struct {    qcount   uint           // 隊列中數據個數    dataqsiz uint           // channel 大小    buf      unsafe.Pointer // 存放數據的環形數組    elemsize uint16         // channel 中數據類型的大小    closed   uint32         // 表示 channel 是否關閉    elemtype *_type // 元素數據類型    sendx    uint   // send 的數組索引    recvx    uint   // recv 的數組索引    recvq    waitq  // 由 recv 行為(也就是 /<code>

從源碼可以看出它其實就是一個隊列加一個鎖(輕量),代碼本身不復雜,但涉及到上下文很多細節,故而不易通讀,有興趣的同學可以去看一下,我的建議是,從上面總結的兩個功能點出發,一個是 ring buffer,用於存數據; 一個是存放操作(讀寫)該 channel 的 goroutine 的隊列。

  • buf 是一個通用指針,用於存儲數據,看源碼時重點關注對這個變量的讀寫
  • recvq 是讀操作阻塞在 channel 的 goroutine 列表,sendq 是寫操作阻塞在 channel 的 goroutine 列表。列表的實現是 sudog,其實就是一個對 g 的結構的封裝,看源碼時重點關注,是怎樣通過這兩個變量阻塞和喚醒 goroutine 的

由於涉及源碼較多,這裡就不再深入。

select

然後是 select 機制,golang 的 select 機制可以理解為是在語言層面實現了和 select, poll, epoll 相似的功能:監聽多個描述符的讀/寫等事件,一旦某個描述符就緒(一般是讀或者寫事件發生了),就能夠將發生的事件通知給關心的應用程序去處理該事件。 golang 的 select 機制是,監聽多個 channel,每一個 case 是一個事件,可以是讀事件也可以是寫事件,隨機選擇一個執行,可以設置 default,它的作用是:當監聽的多個事件都阻塞住會執行 default 的邏輯。

select 的源碼在runtime/select.go ,看的時候建議是重點關注 pollorder 和 lockorder

  • pollorder 保存的是 scase 的序號,亂序是為了之後執行時的隨機性。
  • lockorder 保存了所有 case 中 channel 的地址,這裡按照地址大小堆排了一下 lockorder 對應的這片連續內存。對 chan 排序是為了去重,保證之後對所有 channel 上鎖時不會重複上鎖。

因為我對這部分源碼研究得也不是很深,故而點到為止即可,有興趣的可以去看看源碼啦!

具體到 demo 代碼:consumer 為協程的具體代碼,裡面是隻有一個不斷輪詢 channel 變量 stop 的循環,所以主進程是通過 stop 來通知子協程何時該結束運行的,在 main 方法中,close 掉 stop 之後,讀取已關閉的 channel 會立刻返回該 channel 數據類型的零值,因此子 goroutine 裡的

事實上,通過 channel 控制子 goroutine 的方法可以總結為:循環監聽一個 channel,一般來說是 for 循環裡放一個 select 監聽 channel 以達到通知子 goroutine 的效果。再借助 Waitgroup,主進程可以等待所有協程優雅退出後再結束自己的運行,這就通過 channel 實現了優雅控制 goroutine 併發的開始和結束。

channel 通信控制基於 CSP 模型,相比於傳統的線程與鎖併發模型,避免了大量的加鎖解鎖的性能消耗,而又比 Actor 模型更加靈活,使用 Actor 模型時,負責通訊的媒介與執行單元是緊耦合的–每個 Actor 都有一個信箱。而使用 CSP 模型,channel 是第一對象,可以被獨立地創建,寫入和讀出數據,更容易進行擴展。

殺器 Context

Context 通常被譯作上下文,它是一個比較抽象的概念。在討論鏈式調用技術時也經常會提到上下文。一般理解為程序單元的一個運行狀態、現場、快照,而翻譯中上下又很好地詮釋了其本質,上下則是存在上下層的傳遞,上會把內容傳遞給下。在 Go 語言中,程序單元也就指的是 Goroutine。

每個 Goroutine 在執行之前,都要先知道程序當前的執行狀態,通常將這些執行狀態封裝在一個 Context 變量中,傳遞給要執行的 Goroutine 中。上下文則幾乎已經成為傳遞與請求同生存週期變量的標準方法。在網絡編程下,當接收到一個網絡請求 Request,在處理這個 Request 的 goroutine 中,可能需要在當前 gorutine 繼續開啟多個新的 Goroutine 來獲取數據與邏輯處理(例如訪問數據庫、RPC 服務等),即一個請求 Request,會需要多個 Goroutine 中處理。而這些 Goroutine 可能需要共享 Request 的一些信息;同時當 Request 被取消或者超時的時候,所有從這個 Request 創建的所有 Goroutine 也應該被結束。

context 在 go1.7 之後被引入到標準庫中,1.7 之前的 go 版本使用 context 需要安裝 golang.org/x/net/context 包,關於 golang context 的更詳細說明,可參考官方文檔:context

Context 初試

Context 的創建和調用關係是層層遞進的,也就是我們通常所說的鏈式調用,類似數據結構裡的樹,從根節點開始,每一次調用就衍生一個葉子節點。首先,生成根節點,使用 context.Background 方法生成,而後可以進行鏈式調用使用 context 包裡的各類方法,context 包裡的所有方法:

  • func Background() Context
  • func TODO() Context
  • func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
  • func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
  • func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
  • func WithValue(parent Context, key, val interface{}) Context

這裡僅以 WithCancel 和 WithValue 方法為例來實現控制併發和通信: 話不多說,上碼:

<code>package mainimport ("context""crypto/md5""fmt""io/ioutil""net/http""sync""time")type favContextKey stringfunc main() {wg := &sync.WaitGroup{}values := []string{"https://www.baidu.com/", "https://www.zhihu.com/"}ctx, cancel := context.WithCancel(context.Background())for _, url := range values {wg.Add(1)subCtx := context.WithValue(ctx, favContextKey("url"), url)go reqURL(subCtx, wg)}go func() {time.Sleep(time.Second * 3)cancel()}()wg.Wait()fmt.Println("exit main goroutine")}func reqURL(ctx context.Context, wg *sync.WaitGroup) {defer wg.Done()url, _ := ctx.Value(favContextKey("url")).(string)for {select {case /<code>

前面我們說過 Context 就是設計用來解決那種多個 goroutine 處理一個 Request 且這多個 goroutine 需要共享 Request 的一些信息的場景,以上是一個簡單模擬上述過程的 demo。

首先調用 context.Background()生成根節點,然後調用 withCancel 方法,傳入根節點,得到新的子 Context 以及根節點的 cancel 方法(通知所有子節點結束運行),這裡要注意:該方法也返回了一個 Context,這是一個新的子節點,與初始傳入的根節點不是同一個實例了,但是每一個子節點裡會保存從最初的根節點到本節點的鏈路信息 ,才能實現鏈式。

程序的 reqURL 方法接收一個 url,然後通過 http 請求該 url 獲得 response,然後在當前 goroutine 裡再啟動一個子 groutine 把 response 打印出來,然後從 ReqURL 開始 Context 樹往下衍生葉子節點(每一個鏈式調用新產生的 ctx),中間每個 ctx 都可以通過 WithValue 方式傳值(實現通信),而每一個子 goroutine 都能通過 Value 方法從父 goroutine 取值,實現協程間的通信,每個子 ctx 可以調用 Done 方法檢測是否有父節點調用 cancel 方法通知子節點退出運行,根節點的 cancel 調用會沿著鏈路通知到每一個子節點,因此實現了強併發控制,流程如圖:

深入golang之---goroutine併發控制與通信

該 demo 結合前面說的 WaitGroup 實現了優雅併發控制和通信,關於 WaitGroup 的原理和使用前文已做解析,這裡便不再贅述,當然,實際的應用場景不會這麼簡單,處理 Request 的 goroutine 啟動多個子 goroutine 大多是處理 IO 密集的任務如讀寫數據庫或 rpc 調用,然後在主 goroutine 中繼續執行其他邏輯,這裡為了方便講解做了最簡單的處理。

Context 作為 golang 中併發控制和通信的大殺器,被廣泛應用,一些使用 go 開發 http 服務的同學如果閱讀過這些很多 Web framework 的源碼就知道,Context 在 Web framework 隨處可見,因為 http 請求處理就是一個典型的鏈式過程以及併發場景,所以很多 Web framework 都會藉助 Context 實現鏈式調用的邏輯。有興趣可以讀一下 context 包的源碼,會發現 Context 的實現其實是結合了 Mutex 鎖和 channel 而實現的,其實併發、同步的很多高級組件萬變不離其宗,都是通過最底層的數據結構組裝起來的,只要知曉了最基礎的概念,上游的架構也可以一目瞭然。

context 使用規範

最後,Context 雖然是神器,但開發者使用也要遵循基本法,以下是一些 Context 使用的規範:

  • Do not store Contexts inside a struct type; instead, pass a Context explicitly to each function that needs it. The Context should be the first parameter, typically named ctx;不要把 Context 存在一個結構體當中,顯式地傳入函數。Context 變量需要作為第一個參數使用,一般命名為 ctx;
  • Do not pass a nil Context, even if a function permits it. Pass context.TODO if you are unsure about which Context to use;即使方法允許,也不要傳入一個 nil 的 Context,如果你不確定你要用什麼 Context 的時候傳一個 context.TODO;
  • Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions;使用 context 的 Value 相關方法只應該用於在程序和接口中傳遞的和請求相關的元數據,不要用它來傳遞一些可選的參數;
  • The same Context may be passed to functions running in different goroutines; Contexts are safe for simultaneous use by multiple goroutines;同樣的 Context 可以用來傳遞到不同的 goroutine 中,Context 在多個 goroutine 中是安全的;

參考鏈接

  • [1] https://deepzz.com/post/golang-context-package-notes.html
  • [2] http://www.flysnow.org/2017/05/12/go-in-action-go-context.html
  • [3] https://golang.org/pkg/context/
  • [4] http://www.moye.me/2017/05/05/go-concurrency-patterns/


分享到:


相關文章: