LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

1. LiteOS MQTT組件


概述

MQTT AL用來解耦基於MQTT的業務和MQTT的具體實現,具體來說以後的MQTT業務層應該有且只能使用MQTT AL提供的相關功能(API 數據結構 流程等)。MQTT AL定義MQTT的標準,用來屏蔽各個MQTT協議實現的差異(如軟件庫 或者硬件),讓上層業務無需關心MQTT的實現部分。

MQTT AL的api接口聲明在 中,使用相關的接口需要包含該頭文件,關於函數的詳細參數請參考該頭文件的聲明。


配置並連接

對接服務器的所有信息保存在結構體mqtt_al_conpara_t中,其定義在mqtt_al.h中,如下:

<code>/** @brief defines the paramter for the mqtt connect */typedef struct{    mqtt_al_string_t               serveraddr;   ///< mqtt server:support domain name and dot format    int                            serverport;   ///< mqtt server port    mqtt_al_security_para_t       *security;     ///< if NULL,will use en_mqtt_security_none    en_mqtt_al_verison             version;      ///< mqtt version will be used    mqtt_al_string_t               clientid;     ///< mqtt connect client identifier    mqtt_al_string_t               user;         ///< mqtt connect user    mqtt_al_string_t               passwd;       ///< mqtt connect passwd    int                            cleansession; ///< 1 clean the session while 0 not    mqtt_al_willmsg_t             *willmsg;      ///< mqtt connect will message    unsigned short                 keepalivetime;///< keep alive time    char                           conret;       ///< mqtt connect code, return by server    int                            timeout;      ///< how much time will be blocked}mqtt_al_conpara_t;/<code>

其中的一些參數值已經使用枚舉給出:

  • security:安全連接參數(使用此需要確保mbedtls組件開啟)

枚舉值如下:

<code>/** @brief  this enum all the transport encode we support now*/typedef enum{    en_mqtt_al_security_none = 0,    ///< no encode    en_mqtt_al_security_psk,         ///< use the psk mode in transport layer    en_mqtt_al_security_cas,         ///< use the ca mode in transport layer,only check the server    en_mqtt_al_security_cacs,         ///< use the ca mode in transport layer,both check the server and client    en_mqtt_al_security_end,         ///< the end for the mqtt}en_mqtt_al_security_t;/<code>


  • version:使用的MQTT協議版本

枚舉值如下:

<code>/** @brief enum the mqtt version*/typedef enum{    en_mqtt_al_version_3_1_0 = 0,    en_mqtt_al_version_3_1_1,}en_mqtt_al_verison;/<code>


另外,在複製的時候還需要注意,很多字符串參數都是使用mqttalstring_t類型,其定義如下:

<code>/** brief defines for all the ascii or data used in the mqtt engine */typedef struct{    char *data;      ///< buffer to storage the data    int   len;       ///< buffer data length}mqtt_al_string_t;   //used to represent any type string (maybe not ascii)/<code>

在配置結構體完成之後,調用配置函數進行配置並連接,API如下:

<code>/** *@brief: you could use this function to connect to the mqtt server * *@param[in] conparam  the parameter we will use in connect, refer to the data mqtt_al_conpara_t *@ *@return: first you should check the return value then the return code in conparam * *@retval NULL which means you could not get the connect to the server,maybe network reason *@retval handle, which means you get the context, please check the conparam for more */void * mqtt_al_connect( mqtt_al_conpara_t *conparam);/<code>

連接之後,首先應該檢查返回的handle指針是否為空,其次應該檢查mqttalconpara_t結構體中conret的值,有以下枚舉值:

<code>/** @brief defines for the mqtt connect code returned by the server */#define cn_mqtt_al_con_code_ok                0   ///< has been accepted by the server#define cn_mqtt_al_con_code_err_version       1   ///< server not support the version#define cn_mqtt_al_con_code_err_clientID      2   ///< client identifier is error#define cn_mqtt_al_con_code_err_netrefuse     3   ///< server service not ready yet#define cn_mqtt_al_con_code_err_u_p           4   ///< bad user name or password#define cn_mqtt_al_con_code_err_auth          5   ///< the client is not authorized#define cn_mqtt_al_con_code_err_unkown        -1  ///< unknown reason#define cn_mqtt_al_con_code_err_network      0x80 ///< network reason,you could try once more/<code>


訂閱消息


EMQ-X服務器有心跳機制,實際應用中訂閱之前應該先檢查連接狀態,本實驗中暫不檢查。

連接成功後,首先訂閱消息,設置回調函數,方便接收下發的命令。

訂閱消息的API如下:

<code>/** * @brief you could use this function subscribe a topic from the server * * @param[in] handle the handle we get from mqtt_al_connect * * @param[in] subpara  refer to the data mqtt_al_subpara_t * * @return 0 success  -1  failed * */int mqtt_al_subscribe(void *handle, mqtt_al_subpara_t *subpara);/<code>

兩個參數中,handle參數是之前使用mqttalconnect時返回的指針,直接傳入即可,subpara參數需要重點講述。

mqttalsubpara_t的定義如下:

<code>/** @brief defines the mqtt subscribe parameter*/typedef struct{    mqtt_al_string_t       topic;     ///< topic will be subscribe    en_mqtt_al_qos_t       qos;       ///< qos requested    fn_mqtt_al_msg_dealer  dealer;    ///< message dealer:used to deal the received message    void                  *arg;       ///< used for the message dealer    char                   subret;    ///< subscribe result code    int                    timeout;   ///< how much time will be blocked}mqtt_al_subpara_t;/<code>

其中訂閱消息質量qos的枚舉值如下:

<code>/** @brief enum all the qos supported for the application */typedef enum{    en_mqtt_al_qos_0 = 0,     ///< mqtt QOS 0    en_mqtt_al_qos_1,         ///< mqtt QOS 1    en_mqtt_al_qos_2,         ///< mqtt QOS 2    en_mqtt_al_qos_err}en_mqtt_al_qos_t;/<code>

dealer是一個函數指針,接收到下發命令之後會被回調,arg是回調函數參數,其定義如下:

<code>/** @brief  defines the mqtt received message dealer, called by mqtt engine*/typedef void (*fn_mqtt_al_msg_dealer)(void *arg,mqtt_al_msgrcv_t *msg);/<code>

訂閱之後,可以通過mqttalsubpara_t結構體中的subret值查看是否訂閱成功。

發佈消息

發佈消息的API如下:

<code>/** * @brief you could use this function to publish a message to the server * * @param[in] handle the handle we get from mqtt_al_connect * * @param[in] msg  the message we will publish, see the data mqtt_al_pubpara_t * * @return 0 success  -1  failed * */int    mqtt_al_publish(void *handle, mqtt_al_pubpara_t *pubpara);/<code>

兩個參數中,handle參數是之前使用mqttalconnect時返回的指針,直接傳入即可,pubpara參數需要重點講述。

mqttalpubpara_t的定義如下:

<code>/** @brief defines for the mqtt publish */typedef struct{    mqtt_al_string_t    topic;    ///< selected publish topic    mqtt_al_string_t    msg;      ///< message to be published    en_mqtt_al_qos_t    qos;      ///< message qos    int                 retain;   ///< message retain :1 retain while 0 not    int                 timeout;  ///< how much time will blocked}mqtt_al_pubpara_t;/<code>


MQTT組件自動初始化

MQTT在配置之後,會自動初始化。

在SDK目錄中的IoT_LINK_1.0.0iot_linklink_main.c文件中可以看到:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

2. 配置準備


Makefile配置

因為本次實驗用到的組件較多:

  • AT框架
  • ESP8266設備驅動
  • 串口驅動框架
  • cJSON組件
  • SAL組件
  • MQTT組件

這些實驗代碼全部編譯下來,有350KB,而小熊派開發板所使用的主控芯片STM32L431RCT6的 Flash 僅有256KB,會導致編譯器無法鏈接出可執行文件,所以要在makefile中修改優化選項,修改為-Os參數,即最大限度的優化代碼尺寸,並去掉-g參數,即代碼只能下載運行,無法調試,如圖:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

ESP8266設備配置

在工程目錄中的OS_CONFIG/iot_link_config.h文件中,配置ESP8266設備的波特率和設備名稱:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

WIFI對接信息配置


SDK:C:UsersAdministrator.icodesdkIoT_LINK_1.0.0(其中Administrator是實驗電腦的用戶名)。

在SDK目錄中的iot_linknetworktcpipesp8266_socketesp8266_socket_imp.c文件中,配置連接信息:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

之後修改同路徑下的esp8266_socket_imp.mk文件,如圖,將 TOPDIR 改為 SDKDIR :

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

修改paho_mqtt文件路徑

在SDK目錄中的iot_linknetworkmqttpaho_mqttpaho_mqtt.mk文件中,如圖,將 TOPDIR 改為 SDKDIR :

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

3. 使用mqtt.fx對接EMQ-X


配置

對接信息配置如下:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

其中ClientID隨機生成一個即可。

訂閱主題

使用mqtt.fx連接客戶端,訂閱本次實驗中的兩個主題:

  • 主題led_cmd:用於發佈控制命令
  • 主題lightness:用於上報亮度
LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

4. 上雲實驗


編寫實驗文件

在 Demo 文件夾下創建cloud_test_demo文件夾,在其中創建emqx_mqtt_demo.c文件。

編寫代碼:

<code>#include <osal.h>#include <mqtt>#include <string.h>#define DEFAULT_LIFETIME            60#define DEFAULT_SERVER_IPV4         "122.51.89.94"#define DEFAULT_SERVER_PORT         1883#define CN_MQTT_EP_CLIENTID         "emqx-test-001"#define CN_MQTT_EP_USERNAME         "mculover666"#define CN_MQTT_EP_PASSWD           "123456789"#define CN_MQTT_EP_SUB_TOPIC1       "led_cmd"#define CN_MQTT_EP_PUB_TOPIC1       "lightness"#define recv_buf_len 100static char recv_buffer[recv_buf_len];   //下發數據接收緩衝區static int  recv_datalen;                //表示接收數據長度osal_semp_t recv_sync;  //命令接收回調函數和處理函數之間的信號量char lightness_buf[10];static void mqtt_al_msg_dealer(void *arg,mqtt_al_msgrcv_t *msg){    if((msg->msg.len) < recv_buf_len)    {        //保存數據        memcpy(recv_buffer,msg->msg.data,msg->msg.len );        recv_buffer[msg->msg.len] = '\\0';        recv_datalen = msg->msg.len;        printf("recv buf: %s.\\r\\n", recv_buffer);        //釋放信號量,交由數據處理線程進行處理        osal_semp_post(recv_sync);    }    else    {        printf("recv buf is too small, len = %d.\\r\\n", msg->msg.len);    }}static int task_recv_cmd_entry(void *args){    while(1)    {        /* 阻塞等待信號量 */        osal_semp_pend(recv_sync,cn_osal_timeout_forever);        if(strstr(recv_buffer, "on"))        {                printf("-----------------LED ON !!! --------------------\\r\\n");        }        else if(strstr(recv_buffer, "off"))        {                printf("-----------------LED OFF !!! --------------------\\r\\n");        }    }    return 0;}static int task_report_msg_entry(void *args){    int ret = -1;    void *handle = NULL;    mqtt_al_conpara_t config;    mqtt_al_string_t str_temp;    mqtt_al_subpara_t subpara_led_cmd;    mqtt_al_pubpara_t pubpara_lightness;    int lightness_value = 0;    /* 配置結構體 */    str_temp.data = DEFAULT_SERVER_IPV4;    str_temp.len  = sizeof(DEFAULT_SERVER_IPV4);    config.serveraddr = str_temp;    config.serverport = DEFAULT_SERVER_PORT;    config.security   = en_mqtt_al_security_none;    config.version    = en_mqtt_al_version_3_1_0;    str_temp.data = CN_MQTT_EP_CLIENTID;    str_temp.len  = sizeof(CN_MQTT_EP_CLIENTID);    config.clientid   = str_temp;    str_temp.data = CN_MQTT_EP_USERNAME;    str_temp.len  = sizeof(CN_MQTT_EP_USERNAME);    config.user       = str_temp;    str_temp.data = CN_MQTT_EP_PASSWD;    str_temp.len  = sizeof(CN_MQTT_EP_PASSWD);    config.passwd     = str_temp;    config.cleansession = 1;    config.willmsg    = NULL;    config.keepalivetime = DEFAULT_LIFETIME;    config.timeout    = 30;    /* 配置並連接服務器 */    handle = mqtt_al_connect(&config);    if(handle == NULL)    {        /* 連接出錯 */        printf("config error.\\r\\n");        return -1;    }    else    {        /* 進一步檢查服務器返回值 */        if(config.conret != cn_mqtt_al_con_code_ok)        {            /* 服務器返回值出錯 */            printf("server return error, conret = %d.\\r\\n", config.conret);            return -1;        }        else        {            printf("connect to server success.\\r\\n");        }    }    /* 連接成功後,訂閱led_cmd主題消息 */    str_temp.data = CN_MQTT_EP_SUB_TOPIC1;    str_temp.len  = sizeof(CN_MQTT_EP_SUB_TOPIC1);    subpara_led_cmd.topic = str_temp;    subpara_led_cmd.qos = en_mqtt_al_qos_0;    subpara_led_cmd.dealer = mqtt_al_msg_dealer;    subpara_led_cmd.arg = NULL;    subpara_led_cmd.timeout = 60;    ret =  mqtt_al_subscribe(handle, &subpara_led_cmd);    if(ret < 0)    {        printf("sub topic %s fail.\\r\\n", subpara_led_cmd.topic.data);        return -1;    }    else    {        /* 進一步判斷是否訂閱成功 */        if(cn_mqtt_al_con_code_ok != subpara_led_cmd.subret)        {            printf("sub topic %s fail, subret = %d.\\r\\n", subpara_led_cmd.topic.data, subpara_led_cmd.subret);            return -1;        }        else        {            printf("sub topic %s success.\\r\\n", subpara_led_cmd.topic.data);        }    }        /* 每隔10s上報一次數據 */    str_temp.data = CN_MQTT_EP_PUB_TOPIC1;    str_temp.len  = sizeof(CN_MQTT_EP_PUB_TOPIC1);    pubpara_lightness.topic = str_temp;    pubpara_lightness.qos = en_mqtt_al_qos_0;    pubpara_lightness.retain = 0;    pubpara_lightness.timeout = 30;    while(1)    {        sprintf(lightness_buf, "%d", lightness_value);        str_temp.data = lightness_buf;        str_temp.len  = strlen(lightness_buf);        pubpara_lightness.msg = str_temp;        ret = mqtt_al_publish(handle, &pubpara_lightness);        if(ret < 0)        {            printf("publish topic %s fail.\\r\\n", pubpara_lightness.topic.data);            return -1;        }        else        {            printf("publish topic %s success. payload = %s, lightness = %d.\\r\\n", pubpara_lightness.topic.data, pubpara_lightness.msg.data, lightness_value);        }        lightness_value  ;        osal_task_sleep(10*1000);    }}int standard_app_demo_main(){     /* 創建信號量 */    osal_semp_create(&recv_sync,1,0);    /* 創建任務 */    osal_task_create("task_reportmsg",task_report_msg_entry,NULL,0x800,NULL,8);    osal_task_create("task_recv_cmd",task_recv_cmd_entry,NULL,0x400,NULL,8);    return 0;}/<string.h>/<mqtt>/<osal.h>/<code>


添加路徑

在user_demo.mk中添加如下:

<code>

添加位置如下:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

配置.sdkconfig

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

特別說明:實驗時需要關閉shell組件,否則會因動態內存分配失敗而無法連接。

數據上報實驗結果

編譯下載之後,可以在串口助手中看到輸出信息:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

在訂閱了該主題的客戶端也可以看到上報數據:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

命令下發實驗結果

在mqtt.fx中下發一條開啟命令:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

可以看到設備後作出回應:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

再下發一條關閉命令:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器

可以看到設備後作出回應:

LiteOS雲端對接教程03-LiteOS基於MQTT對接EMQ-X服務器


分享到:


相關文章: