Coolpy7內核功能擴展(新版)

針對CP7的內核功能擴展需要使用此功能進行相關開發,如用戶登入Coolpy7 mqtt borker的身份驗證,擴展消息記錄到數據庫等功能時。微服務作為一種更適合Coolpy7的技術架構,經過我時間的技術選型和性能測試,最終選定了CoAP作為擴展服務協議。

概述

Coolpy7從版本號:V7.2.1.1開始使用本新方式進行相關內核功能擴展。選用CoAP協議進行相關的服務端,CoAP協議使用UDP底層通信,沒有頭阻塞問題,性能更優於http等 基於TCP的其他協議。CoAP協議類似簡化版的http協議,在消息體積方面更有優勢,更節省帶寬。

業務說明示意圖


Coolpy7內核功能擴展(新版)


當相關事件發生時內核觸發Coolpy7 Extend Service相對應相關的事件處理函數。

技術架構說明示意圖


Coolpy7內核功能擴展(新版)

Coolpy7 Extend Service支持服務多個Coolpy7,更適合以微服務形式進行部署。

實例演練

本演練說明如果操作構建一個Coolpy7 core對一個擴展服務


Coolpy7內核功能擴展(新版)

假設一個Coolpy7 Core服務位於192.168.101.1,一個擴展服務位於192.168.101.4。可以通過以下操作過程進行聯調開發。

1.Coolpy7 Extend Service (此操作前請先本當前系統安裝好Golang環境)

<code># 進入golang環境目錄
cd $GOPATH/src

# 下載服務器端
git clone https://github.com/Coolpy7/coolpy7_extend_service.git && cd coolpy7_extend_service

# 安裝依賴包
go get

# 啟動coolpy7_extend_service 啟動參數
# l 當前服務Host地址 (默認為:5683即本地5683端口,此參數一般默認即可,無需配置, 使用UDP連接請開啟相關防火牆設置)
# ht 內核擴展功能服務token,(必須與客戶端配置一致)
go run coolpy7_extend_service.go -ht=coolpy7

# 啟動成功後會打印如下信息,即說明服務端已正常啟動,host於5683端口,請確保相關防火牆配置可用
2020/01/15 10:55:33 coolpy7 extend server on udp port :5683/<code>

2.Coolpy7 Core

<code># 下載服務器端
git clone https://github.com/Coolpy7/Coolpy7/releases/download/7.2.1.1/go_build_Coolpy7_go_linux.zip
# 解壓文件
unzip go_build_Coolpy7_go_linux.zip
# 提權
chmod -R 777 go_build_Coolpy7_go_linux
# 啟動Coolpy7 啟動參數
# as參數,啟動擴展服務功能,(關閉默認的禁用連接身份認證功能)

# ha參數,擴展服務器所在地址,本例為192.168.101.4:5683
# ht參數,服務器連接驗證密鑰,必須與擴展服務啟動參數中的ht參數一致,否則無法與之通信
./go_build_Coolpy7_go_linux -as=false -ha=192.168.101.4:5683 -ht=coolpy7

# 啟動時測試連接擴展服務器結果顯示
2020/01/15 11:30:04 connected to coolpy7 extend service 192.168.101.4:5683
# 啟動成功後會打印如下信息,即說明服務端已正常啟動,host於1883端口,請確保相關防火牆配置可用
2020/01/15 11:30:04 Coolpy7 v7.2.1.1 tcp [::]:1883 plugin build golang v1.13.1/<code>

擴展服務綁定端口後,內核的ha參數可用多種方式進行連接,如:通過域名連接[xxxx.com:5683],通過內外網IP[xxx.xxx.xxx.xxx:5683] 等方式。

擴展服務源代碼

<code>package main

import (
\t"bytes"
\t"encoding/json"
\t"flag"
\t//"github.com/dgrijalva/jwt-go"
\t"github.com/jacoblai/go-coap"
\t"log"
\t"net"
\t"os"
\t"os/signal"
\t"syscall"
)

var ctoken []byte

//jwt key
//var secretKey = "Coolpy2020"

func main() {

\tvar (
\t\taddr = flag.String("l", ":5683", "綁定Host地址")
\t\ttoken = flag.String("ht", "coolpy7", "內核擴展功能服務token,(必須與客戶端配置一致)")
\t)

\tflag.Parse()

\tctoken = []byte(*token)

\tmux := coap.NewServeMux()
\t//身份驗證
\tmux.Handle("/auth", tokenAuth(coap.FuncHandler(handleAuth)))
\t//訂閱
\tmux.Handle("/sub", tokenAuth(coap.FuncHandler(handleSub)))
\t//取消訂閱
\tmux.Handle("/unsub", tokenAuth(coap.FuncHandler(handleUnSub)))
\t//消息
\tmux.Handle("/pub", tokenAuth(coap.FuncHandler(handlePub)))
\t//客戶端離線
\tmux.Handle("/term", tokenAuth(coap.FuncHandler(handleTerm)))

\tgo func() {
\t\tif err := coap.ListenAndServe("udp", *addr, mux); err != nil {
\t\t\tlog.Fatal(err)
\t\t}
\t}()
\tlog.Println("coolpy7 extend server on udp port", *addr)

\tsignalChan := make(chan os.Signal, 1)
\tcleanupDone := make(chan bool)
\tsignal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
\tgo func() {
\t\tfor range signalChan {
\t\t\tlog.Println("safe quit")
\t\t\tcleanupDone \t\t}
\t}()
\t}

func response(m *coap.Message, payload []byte) *coap.Message {
\tres := &coap.Message{
\t\tType: coap.Acknowledgement,
\t\tCode: coap.Content,
\t\tMessageID: m.MessageID,
\t\tToken: m.Token,
\t\tPayload: payload,
\t}

\tres.SetOption(coap.ContentFormat, coap.AppJSON)
\treturn res
}

//token難中間件
func tokenAuth(next coap.Handler) coap.Handler {
\treturn coap.FuncHandler(func(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
\t\t//判斷token是否合法, != 0即為非法
\t\tif bytes.Compare(m.Token, ctoken) != 0 {
\t\t\tmsg := make(map[string]interface{})
\t\t\tmsg["ok"] = false
\t\t\tmsg["err"] = "token error"
\t\t\tpayload, _ := json.Marshal(&msg)
\t\t\tres := &coap.Message{
\t\t\t\tType: coap.Acknowledgement,
\t\t\t\tCode: coap.Content,
\t\t\t\tMessageID: m.MessageID,
\t\t\t\tToken: m.Token,
\t\t\t\tPayload: payload,
\t\t\t}
\t\t\tres.SetOption(coap.ContentFormat, coap.AppJSON)
\t\t\treturn nil
\t\t}
\t\t//通過後執行進行服務下一層中間件
\t\treturn next.ServeCOAP(l, a, m)
\t})
}

//用戶身份驗證處理函數
func handleAuth(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
\tvar msg map[string]interface{}
\terr := json.Unmarshal(m.Payload, &msg)
\tif err != nil {
\t\tlog.Println(err)
\t\treturn nil
\t}
\tif !msg["ok"].(bool) {
\t\t//錯誤通知
\t\tlog.Println(msg)
\t} else {
\t\t//請求消息
\t\tif m.IsConfirmable() {
\t\t\tmsg := make(map[string]interface{})

\t\t\t////固定值判斷認證登陸信息合法性

\t\t\t//if msg["cid"].(string) == "system" && msg["username"].(string) == "premissid" && msg["password"].(string) == "testpremissid" {
\t\t\t//\tmsg["ok"] = true
\t\t\t//}

\t\t\t////jwt token
\t\t\t//token, err := jwt.Parse(msg["password"].(string), func(token *jwt.Token) (interface{}, error) {
\t\t\t//\treturn []byte(secretKey), nil
\t\t\t//})
\t\t\t//if err != nil {
\t\t\t//\tmsg["ok"] = false
\t\t\t//}
\t\t\t//if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
\t\t\t//\tlog.Println(claims)
\t\t\t//\tmsg["ok"] = true
\t\t\t//} else {
\t\t\t//\tmsg["ok"] = false
\t\t\t//}

\t\t\t//默認允許所有請求直接允許登陸
\t\t\t//允許登陸設置為true,反之設置為false
\t\t\tmsg["ok"] = true
\t\t\tpayload, _ := json.Marshal(&msg)
\t\t\t//回覆內核
\t\t\treturn response(m, payload)
\t\t}
\t}
\treturn nil
}

//訂閱主題處理函數
//每個用戶消息推送都會觸發此事件
//cid:客戶端身份標識clientid, topic:主題,qos: 消息質量
//返回值ok:當為true時,允許此操作,false為禁止此次訂閱並強制斷開客戶端連接
func handleSub(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
\tvar msg map[string]interface{}
\terr := json.Unmarshal(m.Payload, &msg)
\tif err != nil {
\t\tlog.Println(err)
\t\treturn nil
\t}
\tif !msg["ok"].(bool) {

\t\t//錯誤通知
\t\tlog.Println(msg)
\t} else {
\t\t//訂閱通知消息
\t\tif m.IsConfirmable() {
\t\t\tmsg := make(map[string]interface{})
\t\t\t//允許訂閱設置為true,反之設置為false
\t\t\tmsg["ok"] = true
\t\t\tpayload, _ := json.Marshal(&msg)
\t\t\t//回覆內核
\t\t\treturn response(m, payload)
\t\t}
\t}
\treturn nil
}

//每個用戶消息推送都會觸發此事件
//cid:客戶端身份標識clientid, topic:主題,qos: 消息質量
//返回值:無返回指令
func handleUnSub(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
\tvar msg map[string]interface{}
\terr := json.Unmarshal(m.Payload, &msg)
\tif err != nil {
\t\tlog.Println(err)
\t\treturn nil
\t}
\tif !msg["ok"].(bool) {
\t\t//錯誤通知
\t\tlog.Println(msg)
\t} else {
\t\t//取消訂閱通知消息
\t\tlog.Println(msg)
\t}
\treturn nil
}

//消息推送處理函數
//每個用戶消息推送都會觸發此事件
//cid:客戶端身份標識clientid, topic:主題,qos: 消息質量, payload:消息內容

//返回值ok:當為true時告知內核有消息處理返回
//返回值rep:告知內核是否替換原消息內容發送,當為true時,內核會取消息中的payload替換原消息進和地發送
//返回值topic:替換原topic,當rep為true時生效
//返回值payload:替換原payload,當rep為true時生效
func handlePub(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
\tvar msg map[string]interface{}
\terr := json.Unmarshal(m.Payload, &msg)
\tif err != nil {
\t\tlog.Println(err)
\t\treturn nil
\t}
\tif !msg["ok"].(bool) {
\t\t//錯誤通知
\t\tlog.Println(msg)
\t} else {
\t\t//請求消息
\t\tif m.IsConfirmable() {
\t\t\tmsg := make(map[string]interface{})
\t\t\t//允許登陸設置為true,反之設置為false
\t\t\tmsg["ok"] = true
\t\t\t//要求內核替換消息內容設置為true
\t\t\tmsg["rep"] = false
\t\t\t////設置需要替換的消息內容
\t\t\t//msg["topic"] = msg["topic"]
\t\t\t//msg["payload"] = `{"test":"value"}`
\t\t\tpayload, _ := json.Marshal(&msg)
\t\t\t//回覆內核
\t\t\treturn response(m, payload)
\t\t}
\t}
\treturn nil
}

//用戶斷開連接或意外離線處理函數
//cid:客戶端身份標識clientid, err:退出原因

func handleTerm(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
\tvar msg map[string]interface{}
\terr := json.Unmarshal(m.Payload, &msg)
\tif err != nil {
\t\tlog.Println(err)
\t\treturn nil
\t}
\tlog.Println(msg)
\treturn nil
}/<code>

擴展服務開源地址:https://github.com/Coolpy7/coolpy7_extend_service

想了解更新Coolpy7請登陸官方網站: http://coolpy.net

Coolpy7單機千萬級MQTT服務器。可用於開始即時通信系統,物聯網平臺等。


分享到:


相關文章: