Go 每日一庫之 message-bus

熱烈歡迎你,相識是一種緣分,Echa 哥為了你的到來特意準備了一份驚喜,go學習資料《 》

簡介

在一個涉及多模塊交互的系統中,如果模塊的交互需要手動去調用對方的方法,那麼代碼的耦合度就太高了。所以產生了異步消息通信。實際上,各種各樣的消息隊列都是基於異步消息的。不過它們大部分都有著非常複雜的設計,很多被設計成一個獨立的軟件來使用。今天我們介紹一個非常小巧的異步消息通信庫[message-bus](https://github.com/vardius/message-bus),它只能在一個進程中使用。源代碼只有一個文件,我們也簡單看一下實現。

快速使用

安裝:

<code>$ go get github.com/vardius/message-bus
/<code>

使用:

<code>package main

import (
"fmt"
"sync"

messagebus "github.com/vardius/message-bus"
)

func main() {
queueSize := 100
bus := messagebus.New(queueSize)

var wg sync.WaitGroup
wg.Add(2)

_ = bus.Subscribe("topic", func(v bool) {
defer wg.Done()

fmt.Println(v)
})

_ = bus.Subscribe("topic", func(v bool) {
defer wg.Done()
fmt.Println(v)
})

bus.Publish("topic", true)
wg.Wait()
}
/<code>

這是官網提供的例子,message-bus承擔了模塊間消息分發的角色。模塊 A 和 模塊 B 先向message-bus訂閱主題(topic),即告訴message-bus對什麼樣的消息感興趣。其他模塊 C 產生某個主題的消息,通知message-bus,由message-bus分發到對此感興趣的模塊。這樣就實現了模塊之間的解耦,模塊 A、B 和 C 之間不需要知道彼此。

上面的例子中:

  • 首先,調用messagebuss.New()創建一個消息管理器;
  • 其次調用Subscribe()方法向管理器訂閱主題;
  • 調用Publish()向管理器發佈主題消息,這樣訂閱該主題的模塊就會收到通知。

更復雜的例子

其實很多人會對何時使用這個庫產生疑問,message-bus GitHub 倉庫中 issue 中至今還躺著這個問題,https://github.com/vardius/message-bus/issues/4。我是做遊戲後端開發的,在一個遊戲中會涉及各種各樣的功能模塊,它們需要了解其他模塊產生的事件。例如每日任務有玩家升多少級的任務、成就係統有等級的成就、其他系統還可能根據玩家等級執行其他操作…如果硬寫的話,最後可能是這樣:

<code>func (p *Player) LevelUp() {
// ...
p.DailyMission.OnPlayerLevelUp(oldLevel, newLevel)
p.Achievement.OnPlayerLevelUp(oldLevel, newLevel)
p.OtherSystem.OnPlayerLevelUp(oldLevel, newLevel)
}
/<code>

需求一直在新增和迭代,如果新增一個模塊,也需要在玩家升級時進行一些處理,除了實現模塊自身的OnPlayerLevelUp方法,還必須在玩家的LevelUp()方法調用。這樣玩家模塊必須清楚地知道其他模塊的情況。如果功能模塊再多一點,而且由不同的人開發的,那麼情況會更復雜。使用異步消息可有效解決這個問題:在升級時我們只需要向消息管理器發佈這個升級“消息”,由消息管理器通知訂閱該消息的模塊。

我們設計的目錄結構如下:

<code>game
├── achievement.go
├── daily_mission.go
├── main.go
├── manager.go
└── player.go
/<code>

其中manager.go負責message-bus的創建:

<code>package main

import (
messagebus "github.com/vardius/message-bus"
)

var bus = messagebus.New(10)
/<code>

player.go對應玩家結構(為了簡便起見,很多字段省略了):

<code>package main

type Player struct {
level uint32
}

func NewPlayer() *Player {
return &Player{}
}

func (p *Player) LevelUp() {
oldLevel := p.level
newLevel := p.level+1
p.level++

bus.Publish("UserLevelUp", oldLevel, newLevel)
}
/<code>

achievement.go和daily_mission.go分別是成就和每日任務(也是省略了很多無關細節):

<code>// achievement.go
package main

import "fmt"

type Achievement struct {
// ...
}

func NewAchievement() *Achievement {
a := &Achievement{}
bus.Subscribe("UserLevelUp", a.OnUserLevelUp)
return a
}

func (a *Achievement) OnUserLevelUp(oldLevel, newLevel uint32) {
fmt.Printf("daily mission old level:%d new level:%d\\n", oldLevel, newLevel)
}
/<code>
<code>// daily_mission.go
package main


import "fmt"

type DailyMission struct {
// ...
}

func NewDailyMission() *DailyMission {
d := &DailyMission{}
bus.Subscribe("UserLevelUp", d.OnUserLevelUp)
return d
}

func (d *DailyMission) OnUserLevelUp(oldLevel, newLevel uint32) {
fmt.Printf("daily mission old level:%d new level:%d\\n", oldLevel, newLevel)
}
/<code>

在創建這兩個功能的對象時,我們訂閱了UserLevelUp主題。玩家在升級時會發布這個主題。

最後main.go驅動整個程序:

<code>package main

import "time"

func main() {
p := NewPlayer()
NewDailyMission()
NewAchievement()

p.LevelUp()
p.LevelUp()
p.LevelUp()

time.Sleep(1000)
}
/<code>

注意,由於message-bus是異步通信,為了能看到結果我特意加了time.Sleep,實際開發中不太可能使用Sleep。

最後我們運行整個程序:

<code>$ go run .
/<code>

因為要運行的是一個多文件程序,不能使用go run main.go!

實際上,當年我因為苦於模塊之間調來調去太麻煩了,自己用 C++ 擼了一個event-manager,https://github.com/darjun/event-manager。思路是一樣的。

缺點

message-bus訂閱主題時傳入一個函數,函數的參數可任意設置,發佈時必須使用相同數量的參數,這個限制感覺有點勉強。如果我們傳入的參數個數不一致,程序就panic了。我認為可以只用一個參數interface{},傳入對象即可。例如,上面的升級事件可以使用EventUserLevelUp的對象:

<code>type EventUserLevelUp struct {
oldLevel uint32
newLevel uint32
}
/<code>

對應地修改一下Player的LevelUp方法:

<code>func (p *Player) LevelUp() {
event := &EventUserLevelUp {
oldLevel: p.level,
newLevel: p.level+1,
}
p.level++

bus.Publish("UserLevelUp", event)
}
/<code>

和處理方法:

<code>func (d *DailyMission) OnUserLevelUp(arg interface{}) {
event := arg.(*EventUserLevelUp)
fmt.Printf("daily mission old level:%d new level:%d\\n", event.oldLevel, event.newLevel)
}
/<code>

這樣一來,我們似乎用不上反射了,訂閱者都是func (interface{})類型的函數或方法。感興趣的可自己實現一下,我 fork 了message-bus,做了這個修改。改動在這裡:https://github.com/darjun/message-bus,message-bus有測試和性能用例,改完跑一下。

源碼分析

message-bus的源碼只有一個文件,加上註釋還不到 130 行,我們簡單來看一下。

MessageBus就是一個簡單的接口:

<code>type MessageBus interface {
Publish(topic string, args ...interface{})
Close(topic string)
Subscribe(topic string, fn interface{}) error
Unsubscribe(topic string, fn interface{}) error
}
/<code>

Publish和Subscribe都講過了,Unsubscribe表示對某個主題不感興趣了,取消訂閱,Close直接關閉某個主題的隊列,刪除所有訂閱者。

在message-bus內部,每個主題對應一組訂閱者。每個訂閱者使用handler結構存儲回調和參數通道:

<code>type handler struct {
callback reflect.Value
queue chan []reflect.Value
}

/<code>

所有訂閱者都存儲在一個 map 中:


<code>type handlersMap map[string][]*handler

type messageBus struct {
handlerQueueSize int
mtx sync.RWMutex
handlers handlersMap
}
/<code>

messageBus是MessageBus接口的實現。我們來看看各個方法是如何實現的。

<code>func (b *messageBus) Subscribe(topic string, fn interface{}) error {
h := &handler{
callback: reflect.ValueOf(fn),
queue: make(chan []reflect.Value, b.handlerQueueSize),
}

go func() {
for args := range h.queue {
h.callback.Call(args)
}
}()

b.handlers[topic] = append(b.handlers[topic], h)
return nil
}
/<code>

調用Subscribe時傳入一個函數,message-bus為每個訂閱者創建一個handler對象,在該對象中創建一個帶緩衝的參數通道,緩衝大小由message-bus創建時的參數指定。 同時啟動一個goroutine,監聽通道,每當有參數到來時就執行註冊的回調。

<code>func (b *messageBus) Publish(topic string, args ...interface{}) { 

rArgs := buildHandlerArgs(args)
if hs, ok := b.handlers[topic]; ok {
for _, h := range hs {
h.queue }
}
}
/<code>

Publish發佈主題,buildHandlerArgs將傳入的參數轉為[]reflect.Value,以便反射調用回調時傳入。發送參數到該主題下所有handler的通道中。由Subscribe時創建的goroutine讀取並觸發回調。

<code>func (b *messageBus) Unsubscribe(topic string, fn interface{}) error {
rv := reflect.ValueOf(fn)

if _, ok := b.handlers[topic]; ok {
for i, h := range b.handlers[topic] {
if h.callback == rv {
close(h.queue)

b.handlers[topic] = append(b.handlers[topic][:i], b.handlers[topic][i+1:]...)
}
}

return nil
}

return fmt.Errorf("topic %s doesn't exist", topic)
}
/<code>

Unsubscribe將某個訂閱者從message-bus中移除,移除時需要關閉通道,否則會造成訂閱者的 goroutine 洩露。

<code>func (b *messageBus) Close(topic string) {
if _, ok := b.handlers[topic]; ok {
for _, h := range b.handlers[topic] {
close(h.queue)
}

delete(b.handlers, topic)

return
}
}

/<code>

Close關閉某主題下所有的訂閱者參數通道,並刪除該主題。

注意,為了保證併發安全,每個方法都加了鎖,分析實現時先忽略鎖和錯誤處理。

為了更直觀的理解,我畫了一個message-bus內部結構圖:

Go 每日一庫之 message-bus

總結

message-bus是一個小巧的異步通信庫,實際使用可能不多,但卻是學習源碼的好資源。

大家如果發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue

參考

  1. message-bus GitHub:https://github.com/vardius/message-bus
  2. Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib


分享到:


相關文章: