面試必備主流的線程模型以及golang goroutine調度GPM

現在主流的線程模型分三種:

用戶級線程模型、內核級線程模型和兩級線程模型(也稱混合型線程模型).

用戶級線程:

UTL用戶線程和內核KSE(KernelSchedulingEntity)是多對一的映射關係,創建、銷燬和調度全部都由自己程序(線程庫)決定,相比內核級線程較輕量級,但是由於線程表在用戶空間,所以操作系統內核對用戶級線程無感知的,操作系統只知道進程的存在(進程表),如果全部是用戶級線程的話,可以認為操作系統內核調度的基本單位是進程,內核一次只為一個進程分配一個內核處理器,因此一個線程的阻塞將會導致整個進程的阻塞;由於進程內的用戶級多線程相對於內核線程KES來說都是一個個執行的,所以不能做到真正意義的併發.

內核級線程:

KLT用戶線程和內核線程KSE是一對一的關係,每個創建的線程都會和一個內核線程KSE進行靜態綁定,內核線程的線程表在內核空間內,所以系統內核可以感知線程的存在,有些地方把這些內核級線程也被稱為偽進程(輕進程),線程的創建、調度等完全交予操作系統內核來完成,可以做到真正意義的併發,但是由於多個線程之間的上下文切換需要進行用戶態/內核態的轉換,所以較耗費資源和性能.

兩級線程模型:

用戶級線程和內核級線程的混合模式,兩級線程模型中的一個進程可以與多個內核線程KSE關聯,於是進程內的多個線程可以綁定不同的KSE,其實現原理就是將用戶級線程和內核線程KSE進行動態綁定,內核線程KSE數量<=用戶級線程的數量,其實也可以理解為一種動態的映射,當某個KSE因為其綁定的線程的阻塞操作被內核調度出CPU時,其關聯的進程中其餘用戶線程可以重新與其他KSE綁定運行.

Go語言的goroutine用的是兩級線程模型,相比較傳統的OS線程獨佔固定的棧空間,一般2M,多線程上下文切換消耗較大;goroutine 用戶態線程,可以創建很多,每個goroutine棧初始默認佔用2kb內存空間,可以動態擴展;runtime.GOMAXPROCS()函數設置當前程序併發時啟用的P的數量和OS線程的數量,同時也決定了關聯內核線程KSE的數量,默認使用CPU邏輯核心數.

goroutine的調度——GPM模型

G:即協程goroutine,用戶創建的需要被執行的任務.

P:processor管理器,P裡面會存儲當前goroutine運行的上下文環境,負責關聯G和M,P的數量決定了系統內最大並行的G的數量,擁有一個單獨的goroutine隊列LRQ(LocalRunQueue),當自己隊列goroutine任務執行完畢後,自動往全局goroutine隊列GRQ(GlobalRunQueue)中取任務,如果全局的隊列中也沒有任務了,它將會去搶佔其他processor調度管理器隊列中的任務,一邊索取LPQ的1/2任務.

M:對內核級線程的封裝,M>=P,真正執行任務的線程,M並不保存G的狀態,與G本身並沒有關係,所以G可以在不同的M執行,系統中的每個M都會擁有一個特殊的goroutine,是系統在初始化M時候創建,他包含了各種調度、垃圾回收和棧管理等程序,其他的G被稱為用戶G,這個特殊的可以稱為系統G.

關係:P的隊列持有G,G只有綁定了P才能被執行,而P調度執行G,最終通過M真正執行G的任務;可以認為是P包含G綁定M.

系統調用阻塞:

當G被阻塞在某個系統調用上時,此時G會阻塞在_Gsyscall狀態,M也處於block on syscall狀態,此時的M可被搶佔調度,執行該G的M會與P解綁,而P則嘗試與其它idle的M綁定,繼續執行其它G,如果沒有其它idle的M,但P的Local隊列中仍然有G需要執行,則創建一個新的M;當系統調用完成後,G會重新嘗試獲取一個idle的P(因為G必須被P持有才會被M執行,)進入它的Local隊列恢復執行,如果沒有idle的P,G會被標記為runnable加入到Global隊列.

用戶態阻塞:

當goroutine因為channel操作或者network I/O而阻塞時,netpoller實現了goroutine網絡I/O阻塞不會導致M被阻塞而是阻塞阻塞G,對應的G會被放置到某個wait隊列(如channel的waitq),該G的狀態由_Gruning變為_Gwaitting,而M會跳過該G嘗試獲取並執行下一個G,如果此時沒有runnable的G供M運行,那麼M將解綁P,並進入sleep狀態;當阻塞的G被另一端的G2喚醒時(比如channel的可讀/寫通知),G被標記為runnable,嘗試加入G2所在P的runnext,然後再是P的Local隊列和Global隊列.

Seched

代表著一個調度器,它維護有存儲空閒的M隊列和空閒的P隊列,可運行的G隊列,自由的G隊列以及調度器的一些狀態信息等;調度器Seched生成一個M,然後M需要持有綁定一個P,接著M會啟動一個內核級線程,循環執行P隊列的G任務.

DEMO代碼

<code>func hi(s string)  {  
fmt.Println("hi",s)
}/<code>
  • 1 啟動一個goroutine
<code>fmt.Println("main start")
go hi("蕾蕾")//go關鍵字啟動一個協程去執行hi函數
fmt.Println("main end")

time.Sleep(time.Second)
//sleep等待goroutine執行完畢,協程有點兒類似於異步的守護進程,
//當主線程結束的時候,其守護進程也將同時結束,/<code>

上面的demo打印如下:

<code>main start
main end
hi 蕾蕾/<code>

如果去掉關鍵字go的話,那麼程序將會變成同步的,從上往下執行。

  • 2 啟動多個goroutine
<code>fmt.Println("main start")
go hi("協程1")//go啟動協程1
go hi("協程2")//go啟動協程2
go hi("協程3")//go啟動協程3
fmt.Println("main end")
time.Sleep(time.Second)/<code>

輸出如下:

<code>main start
main end
hi 協程1
hi 協程2
hi 協程3/<code>
  • 3 channel 多個goroutine間的通信,當通道為空/滿的時候,會阻塞接收數據/發送數據

格式:var chanName chan chanElementType

比如:var cha chan int

chan必須在初始化分配內存以後才可以使用

格式:make(chan chanElementType,[緩衝區大小])

比如:make(chan int,10) 如果未指定10的話,那麼cap就是0

注意:chan一般是用在goroutine之間,如果用在主線程上,很容易引發panic如下:

fatal error: all goroutines are asleep - deadlock!

  • 3.1 有緩衝通道
<code>ch1 := make(chan int,10)//初始化chan
fmt.Println(cap(ch1))
ch1 ch1 ////x := x,ok := fmt.Println(x,ok)
close(ch1) //關閉通道
x,ok = \t\t\t\t\t\t\t\t\t\t\t\t//通道中沒有數據的話不會阻塞,得到的是對應的零值;
\t\t\t\t\t\t\t\t\t\t\t //如果返回true表明取到了chan裡面的值,
\t\t\t\t\t\t\t\t\t\t\t\t//如果返回false,表明chan已關閉,得到的是x對應的零值.
fmt.Println(x,ok)/<code>
  • 3.2 無緩衝通道,又稱為同步通道也就是說需要有兩個goroutine參與,如果缺少一方,將導致阻塞,如果是主線程被阻塞而又沒有其他的活躍的goroutine將引發上面提到的panic(有緩衝通道容量滿的場景也適用).
<code>ch2 := make(chan int)
go func() {  
ch2  fmt.Println("1...發送")  
time.Sleep(time.Millisecond*100)  
ch2  fmt.Println("2...發送")
}()
fmt.Println("接收",fmt.Println("接收",time.Sleep(time.Second)/<code>

打印結果如下:

<code>1...發送
接收 1
2...發送
接收 2/<code>
  • 3.3 從通道循環取值for range
<code>ch3 := make(chan int)
go func() {  
for i:=0;i<10;i++  {    
ch3  }  
close(ch3)
}()
//方式一
go func() {  
//如果不close,接收方將一直接收數據,如果沒數據就阻塞  
//如果通道被close,在通道沒有數據的情況下,接收方就會自動退出for range
for v := range ch3{    
fmt.Println(v)  
}  
fmt.Println("finished...")
}()
//方式二
go func() {  
for{    
v,ok :=  if !ok {      
break    
}else {      

fmt.Println(v)    
}  
}  
fmt.Println("finished...")
}()
time.Sleep(time.Second)/<code>
  • 3.4 單向chan
<code>ch5 := make(chan int,10)
go func() {  
//輸入型通道,不允許取值  
func(cha chan  cha  }(ch5)  
//輸出型通道,不允許寫入數據  
func(cha  fmt.Println( }(ch5)
}()
time.Sleep(time.Second)/<code>
  • 3.5 多個goroutine同時接收通道數據
<code>ch6 := make(chan int,10)
//1.準備多個goroutine一直消費通道的數據,不會重複消費
for i := 0; i  go func(goindex int) {    
for v := range ch6 {      
fmt.Println(goindex,"開始執行任務",v)      
time.Sleep(time.Second)    
}  
}(i)
}
//2.往通道發送數據
go func() {  
for i := 0; i  ch6  }
}()
time.Sleep(time.Second*11)/<code>
  • 4 select語句,多路複用

在多個通道case中自動選擇可以操作的case,如果都沒有符合條件的會執行default邏輯,如果沒有default邏輯則被阻塞,如果多個case同時符合條件,則隨機選擇一個執行.

<code>ch7 := make(chan int)
ch8 := make(chan int,1)
go func() {  
for i := 0; i  select {    
case ch7  \tfmt.Println("ch7發送成功了")    
case ch8  \tfmt.Println("ch8發送成功了")    
case v :=  \tfmt.Println("ch8接收到了",v)    
default:      
\tfmt.Println("default")    
}    
fmt.Println("select over")  
}
}()
time.Sleep(time.Second*5)/<code>
  • 5 sync包(包中的大部分工具類都是值傳遞而非引用傳遞)
  • 5.1 WaitGroup類似於java中的CountDownLacth,多阻塞一,多用於判斷多個goroutine的任務是否都已完成
<code>var wg sync.WaitGroup
wg.Add(10)  //表明goroutine任務數
for i := 0; i  go func(goindex int) {    
time.Sleep(time.Second*time.Duration(rand.Intn(10)))    
fmt.Println(goindex,"over")    
wg.Done() //當前goroutine結束  
}(i)
}
fmt.Println("wait start")
wg.Wait() //等待其他的goroutine全部完成

fmt.Println("all over")/<code>
  • 5.2 Mutex互斥鎖

在這個示例中如果不用互斥鎖的話,最後得到的j很可能不是1000.

<code>var mutex sync.Mutex
var wg sync.WaitGroup
j := 0
wg.Add(1000)
for i:=0;i<1000 ;i++  {  
go func() {    
mutex.Lock()    
j++    
mutex.Unlock()    
wg.Done()  
}()
}
wg.Wait()
fmt.Println("over",j)/<code>
  • 5.3 RWMutex讀寫互斥鎖.

總結:讀鎖先,讀鎖是寫鎖否;寫鎖先,讀寫都否。

<code>var rwMutex sync.RWMutex
var wg sync.WaitGroup
wg.Add(2)
go func() {  
rwMutex.RLock()  
fmt.Println("獲取讀鎖")  
time.Sleep(time.Second*2)  
fmt.Println("釋放讀鎖")  
rwMutex.RUnlock()  
wg.Done()
}()
go func() {  
//time.Sleep(time.Millisecond*5)  
rwMutex.Lock()  
fmt.Println("獲得寫鎖")  

time.Sleep(time.Second*2)  
fmt.Println("釋放寫鎖")  
rwMutex.Unlock()  
wg.Done()
}()
wg.Wait()/<code>
  • 5.4 Once保證指定函數只被執行一次

這個函數是無參類型,所以想要給函數傳遞參數的話,需要用到閉包的寫,常用場景比如加載配置文件或者單例模式。

<code>var func_t = func() {  
fmt.Println("I am be called")
}
var once sync.Once
for i := 0; i  once.Do(func_t)
}/<code>
  • 5.5 線程安全的map
<code>syncMap := sync.Map{}
syncMap.Store("key1","value1")//增
v, ok :=syncMap.Load("key1") //查
fmt.Println(v,ok) //如果查詢到了返回true並且返回對應的值
v, ok = syncMap.LoadOrStore("key1","value2")
fmt.Println(v,ok) //value1 true
//如果沒有查詢到返回false並且存儲和返回給定的值
v, ok = syncMap.LoadOrStore("key2","value2")
fmt.Println(v,ok) //value2 false
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
syncMap.Range(func(key, value interface{}) bool {
fmt.Println("range",key,value)
return true
})
syncMap.Delete("key1") //刪

v, _ =syncMap.Load("key1")
fmt.Println(v)/<code>
  • 6 原子操作包atomic
<code>n1 := int64(100)atomic.AddInt64(&n1,200)fmt.Println(n1)/<code>

這個包裡面很多原子操作相關工具比如add、cas、swap等,有興趣可以研究下。

讓我們一起進步學習,共同成長;歡迎大家關注收藏點贊,關注微信公眾號:芝麻技術棧


分享到:


相關文章: