NSQ使用GoLang分發消息傳遞

關於如何在Golang中使用NSQ與生產者和消費者的一個小例子。

目前,隨著微服務的爆炸式增長,使用消息隊列以異步方式通信服務是很常見的。作為RabbitMQ或Kafka的大玩家已經通過互聯網獲得了很多教程,但最近我開始研究Go並且我想嘗試一些消息隊列,只是為了進行一些測試然後我找到了NSQ並且它自己在其網站上引入:“NSQ是一個實時分佈式消息傳遞平臺”

NSQ有兩個主要組件,“NSQLOOKUPD”和“NSQD”

  • NSQD是接收,排隊和向客戶端傳遞消息的守護程序。
  • NSQLOOKUPD是管理拓撲信息的守護程序。客戶端查詢nsqlookupd以發現特定主題的nsqd生成器,nsqd節點廣播主題和通道信息。
NSQ使用GoLang分發消息傳遞

還有一個名為“ NSQADMIN ” 的第三個組件,它是管理頁面的UI。

對於我們的第一個測試,我們需要一個nsqd和一個nsqlookupd運行

對於我們的第一個測試,我們需要一個nsqd和一個nsqlookupd運行,

用docker 運行:

# docker-compose.yml
---
version: '2'
services:
nsqlookupd:
image: nsqio/nsq
command: >
/nsqlookupd
-broadcast-address localhost:4160
ports:
- "4160:4160"
- "4161:4161"
nsqd:
image: nsqio/nsq
command: >
/nsqd
-broadcast-address localhost
-lookupd-tcp-address nsqlookupd:4160
ports:
- "4150:4150"
- "4151:4151"

我們可以使用以下命:

docker-compose up -d

使用NSQD和NSQLOOKUPD運行,讓我們在Go中編寫一個生產者和一個使用者,我將使用“ nsq-event-bus ”包,它是“ go-nsq包的一個小包裝”,生產者代碼:

\tpackage main
\t
\timport (
\t\t"github.com/rafaeljesus/nsq-event-bus"
\t\t"log"
\t)
\t
\ttype event struct{ Body string }
\t
\tfunc main() {
\t\ttopic := "events"
\t
\t\temitter, err := bus.NewEmitter(bus.EmitterConfig{})
\t\t
\t\tif err != nil {
\t\t\tlog.Fatal("[ERRO]", err)
\t\t}
\t\t
\t\tmessage := "[Emitter 1] sending message"
\t\te := event{message}
\t\t
\t\tif err = emitter.Emit(topic, &e); err != nil {
\t\t\tlog.Println("error while was emitting message", err)
\t\t}
\t\t
\t\tlog.Println("[Message emitted]", message)
\t}

消費者代碼:

package main
\t
\timport (
\t\t"github.com/rafaeljesus/nsq-event-bus"
\t\t"log"
\t\t"sync"

\t)
\t
\ttype event struct{ Body string }
\t
\tvar wg sync.WaitGroup
\t
\tfunc main() {
\t\twg.Add(1) // just to test purposes, the program will await for one message
\t
\t\tif err := bus.On(bus.ListenerConfig{
\t\t\tLookup: []string{"localhost:4161"},
\t\t\tTopic: "events",
\t\t\tChannel: "consumer1",
\t\t\tHandlerFunc: handler,
\t\t}); err != nil {
\t\t\t// handle failure to listen a message
\t\t\tlog.Println("Error while consuming message", err)
\t\t}
\t
\t\twg.Wait()
\t}
\t
\tfunc handler(message *bus.Message) (reply interface{}, err error) {
\t\te := event{}
\t\tif err = message.DecodePayload(&e); err != nil {
\t\t\t// handle failure to decode a message
\t\t\tlog.Println("Error while consuming message", err)
\t\t\tmessage.Finish()
\t\t\twg.Done()
\t\t\treturn
\t\t}
\t\t
\t\tlog.Println("[Consumer 1] Consuming message", e)
\t\tmessage.Finish()
\t\twg.Done()
\t\treturn
\t}

請注意,在消費者代碼中,我使用了WaitGroup,因為在查找響應到達之前可能需要一段時間。要看到兩個工作運行emitter.go或consumer.go(請記住,消費者將阻止終端,直到收到一條消息,然後你需要在其他終端運行發射器)

$ go run emitter.go  

2017/03/01 01:46:31 INF 1 (localhost:4150) connecting to nsqd
2017/03/01 01:46:31 [Message emitted] [Emitter 1] sending message
$ go run consumer.go
2017/03/01 01:46:25 INF 1 [events/consumer1] querying nsqlookupd http://localhost:4161/lookup?topic=events
2017/03/01 01:46:25 INF 1 [events/consumer1] (localhost:4150) connecting to nsqd
2017/03/01 01:46:44 [Consumer 1] Consuming message {[Emitter 1] sending message}


分享到:


相關文章: