原创 物联网核心之MQTT移植

2017-9-12 08:18 2784 16 2 分类: MCU/ 嵌入式

在上一篇文章中,只是讲了MQTT的主要内容,至于怎么移植到STM32上,怎么使用才是最重要的关键。这里使用的平台是RT8711的WIFI SOC,使用的LWIP跟FreeRTOS,移植使用跟STM32+LWIP是没什么区别的。

   先在Github上找到Eclipse的开源MQTT客户端程序https://github.com/eclipse/paho.mqtt.embedded-c.git,并把源码下载起来

   解压源码,再进入MQTTPacket文件夹,里面有三个文件夹

   把src里面的所有文件和samples下的transport.c、transport.h两个文件复制到工程目录下。这里我们主要的移植工作就在transport里面。打开transport.c文件,这个是MQTT连接,发送,接收的接口,源码是Linux跟Windows平台,用的标准的Socket接口函数,我们这里的移植工作量很小,因为LWIP也是支持标准的Socket接口函数,只不过里面有些函数接口是LWIP不支持的,主要就是transport_open这个连接函数有区别。把原来的transport_open函数注释掉,重新写一个。

[C] 纯文本查看
复制代码

?

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int
transport_open(char* addr, int
port)
{
        int* sock = &mysock;
        struct
hostent *server;
    struct
sockaddr_in serv_addr;
        static
struct
timeval tv;
        int
timeout = 1000;
        fd_set readset;
        fd_set writeset;
 
    *sock = socket(AF_INET, SOCK_STREAM, 0);
    if(*sock < 0)
        DiagPrintf("[ERROR] Create socket failed\n");
     
    server = gethostbyname(addr);
    if(server == NULL)
        DiagPrintf("[ERROR] Get host ip failed\n");
     
    memset(&serv_addr,0,sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(port);
    memcpy(&serv_addr.sin_addr.s_addr,server->h_addr,server->h_length);
     
    if
(connect(*sock,(struct
sockaddr *)&serv_addr,sizeof(serv_addr)) < 0){
        DiagPrintf("[ERROR] connect failed\n");
        return
-1;
        }
        tv.tv_sec = 10;  /* 1 second Timeout */
        tv.tv_usec = 0;  
        setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char
*)&timeout,sizeof(timeout));
        return
mysock;
}

   到这里其实移植工作就已经完成了,就是这么的简单,剩下就是怎么使用MQTT。直接上代码,这里用FreeRTOS新建一个MQTT的任务。

[C] 纯文本查看
复制代码

?

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#define HOST_NAME "m2m.eclipse.org"#define HOST_PORT 1883
 
void
mqtt_thread( void
*arg)
{
        MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
        MQTTString receivedTopic;
        int
rc = 0;
        char
buf[200];
        int
buflen = sizeof(buf);
        int
mysock = 0;
        MQTTString topicString = MQTTString_initializer;
        int
payloadlen_in;
        unsigned char* payload_in;
        unsigned short
msgid = 1;
        int
subcount;
        int
granted_qos =0;
        unsigned char
sessionPresent, connack_rc;
        unsigned short
submsgid;
        int
len = 0;
        int
req_qos = 1;
        unsigned char
dup;
        int
qos;
        unsigned char
retained;
 
        char
*host = "m2m.eclipse.org";
        int
port = 1883;
        uint8_t  msgtypes = CONNECT;
        uint32_t curtick = xTaskGetTickCount();
        log_info("socket connect to server");
        mysock = transport_open(host,port);
        if(mysock < 0)
                return
mysock;
        log_notice("Sending to hostname %s port %d\n", host, port);
        data.clientID.cstring = "me";
        data.keepAliveInterval = 50;
        data.cleansession = 1;
        data.username.cstring = "";
        data.password.cstring = "";
        data.MQTTVersion = 4;
        while(1)
        {
                if((xTaskGetTickCount() - curtick) >(data.keepAliveInterval/2*1000))
                {
                        if(msgtypes == 0)
                        {
                                curtick = xTaskGetTickCount();
                                msgtypes = PINGREQ;
                        }
                }
 
                switch(msgtypes)
                {
                        case
CONNECT:        len = MQTTSerialize_connect(buf, buflen, &data);        
                                                        rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                        if
(rc == len)
                                                                log_info("send CONNECT Successfully");
                                                        else
                                                                log_debug("send CONNECT failed");                
                                                        log_info("MQTT concet to server!");
                                                        msgtypes = 0;
                                                        break;
                        case
CONNACK:         if
(MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
                                                        {
                                                                log_info("Unable to connect, return code %d\n", connack_rc);
                                                        }
                                                        else
log_info("MQTT is concet OK!");
                                                        msgtypes = SUBSCRIBE;
                                                        break;
 
                        case
SUBSCRIBE:        topicString.cstring = "ledtest";
                                                        len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
                                                        rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                        if
(rc == len)
                                                                log_info("send SUBSCRIBE Successfully\n");
                                                        else
                                                                log_debug("send SUBSCRIBE failed\n");        
                                                        log_info("client subscribe:[%s]",topicString.cstring);
                                                        msgtypes = 0;
                                                        break;
                        case
SUBACK:        rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);                                                        
                                                        log_info("granted qos is %d\n", granted_qos);                                                                
                                                        msgtypes = 0;
                                                        break;
                        case
PUBLISH:        rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,        &payload_in, &payloadlen_in, buf, buflen);
                                                        log_info("message arrived : %s\n", payload_in);
                                                        if(strstr(payload_in,"on"))
                                                        {
                                                                log_notice("LED on!!");
                                                        }
                                                        else
if(strstr(payload_in,"off"))
                                                        {
                                                                log_notice("LED off!!");
                                                        }
                                                        if(qos == 1)
                                                        {
                                                                log_info("publish qos is 1,send publish ack.");
                                                                memset(buf,0,buflen);
                                                                len = MQTTSerialize_ack(buf,buflen,PUBACK,dup,msgid);   //publish ack                        
                                                                rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                                if
(rc == len)
                                                                        log_info("send PUBACK Successfully");
                                                                else
                                                                        log_debug("send PUBACK failed");                                        
                                                        }
                                                        msgtypes = 0;
                                                        break;
 
                        case
PUBACK:        log_info("PUBACK!");
                                                        msgtypes = 0;
                                                        break;
                        case
PUBREC:        log_info("PUBREC!");     //just for qos2
                                                        break;
                        case
PUBREL:        log_info("PUBREL!");        //just for qos2
                                                        break;
                        case
PUBCOMP:        log_info("PUBCOMP!");        //just for qos2
                                                        break;
                        case
PINGREQ:        len = MQTTSerialize_pingreq(buf, buflen);
                                                        rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                        if
(rc == len)
                                                                log_info("send PINGREQ Successfully\n");
                                                        else
                                                                log_debug("send PINGREQ failed\n");        
                                                        log_info("time to ping mqtt server to take alive!");
                                                        msgtypes = 0;
                                                        break;
                        case
PINGRESP:        log_info("mqtt server Pong");                                                        
                                                        msgtypes = 0;
                                                        break;
                }
                                memset(buf,0,buflen);
                                rc=MQTTPacket_read(buf, buflen, transport_getdata);        
                                if(rc >0)
                                {
                                        msgtypes = rc;
                                        log_info("MQTT is get recv:");
                                }
                                gpio_write(&gpio_led, !gpio_read(&gpio_led));
        }
 
exit:
        transport_close(mysock);
    log_info("mqtt thread exit.");
    vTaskDelete(NULL);
}

       MQTT服务器仍然是上篇文章用的m2m.eclipse.org,一开始就是调用 前面移植的transport_open(host,port)去连接MQTT服务器,并返回套接字。然后就是前面说的登录、订阅、发布、心跳等操作,这里在While里用状态来实现整个连接。

首先就是CONNECT登录,MQTTPacket_connectData data = MQTTPacket_connectData_initializer初始化登录数据结构体,然后再对data进行初始化,data.clientID.cstring = "me";data.keepAliveInterval = 50;data.cleansession = 1;

data.username.cstring = "";data.password.cstring = "";data.MQTTVersion = 4;这里表示cilentID为 me,心跳时间为50,用户名跟密码都为空,结构初始化后,就是要对数据进去打包,调用MQTTSerialize_connect函数。打包后就是调用transport_sendPacketBuffer发送数据包。数据发送完后是调用MQTTPacket_read去接收服务器返回来的数据,并得到返回的数据包类型,根据数据包类型进行不同的逻辑处理。发送CONNECT后服务器对相应的返回CONNACK数据包表示登录成功。

登录成功后,开始订阅我们想要Topics,这里订阅一个”ledtest”的Tipics。数据包的初始化、打包、发送跟登录是相似的,就不详述,直接看代码就可以了,服务器会相应的返回SUBACK。

    接下来是心跳,通过获取系统的Tick来判断是否要发PINGREQ,如果的话,让状态机状态为PINGREQ后去发送心跳包,相应服务器会返回PINGRESP。

    最后是PUBLISH,这是MQTT的最主要的通信协议,这里我只实现的客户端接收PUBLISH,其它的使用,其实都可以在源码的MQTTPacket里面sample可以找到例程。PUBLISH实现也很简单,定阅了Topics之后,只要其它客户端向这个Topics推送数据,服务器就会转发到订阅者上。MQTTPacket_read接收到服务器的推送,解析成PUBLISH数据包,然后状态机改变状态到PUBLISH再调用MQTTDeserialize_publish进行解包,得到推送的内容,最后根据推送的内容执行相应的动作,就实现远程控制。

     整个代码的效果如下:

    可以看到MQTT的登录,订阅,还有在PC上通过MQTT.fx推送信息给ledtest,RT8711上收到推送的内容,并执行打开LED的动作。

    也许有人会问,如果STM32或者其它单片机是用WIFI模块或者GPRS模块,没有LWIP的怎么办。其实只要理解的MQTT的源码,就不难用GPRS或者WiFi模块去实现。MQTT的源码里都是对协议包进行打包解包,数据传输都是在tranport.c里面,我们完全不用transport,可以自己写通信接口,然后把打包的数据包通过模块发出去,写接收接口,把模块接收到服务器数据调用MQTT解包接口解析就可以了。

PARTNER CONTENT

文章评论1条评论)

登录后参与讨论

curton 2019-5-4 16:48

好资源 学习了

ev711 2017-11-6 08:29

欢迎加入面包板博主群,任何关于博客的问题,都能在该群得到最快的回应。并能与面包板众多博主交流切磋。

您可以通过 站内私信本账号 或发邮件到 evan.li@aspencore.com联系我,注明“加入面包板博主群”,即可。谢谢!

相关推荐阅读
mzwhhwj 2017-12-07 08:49
物联网前端后台1——一条硬件狗的学习历程
        距上次写MQTT的文章,不知不觉已经过了三个月了,本来打算写自己搭建MQTT Borker的,但这方面的资料只要一百度都有,难度也不大,又加上项目比较忙,一直没时间写文章。从上个月开始...
mzwhhwj 2017-09-12 08:15
打造实用个性的Log输出宏
 我们在调试嵌入式程序的时候,用IDE+调试器是最直接的方式,但是很多时候我们还需要用串口输出来打印一些Debug的信息,来帮助我们调试。串口调试最简单的方式的话,最简单就是用printf来打印。但是...
mzwhhwj 2017-08-24 08:39
物联网核心之MQTT(一)
       MQTT,目前物联网的最主要的协议,基本所有收费的云平台都是基于MQTT协议,比如机智云,和所有的开放云平台比如中国移动的oneNet、百度的云平台也都支持MQTT的接入。虽然MQTT很...
mzwhhwj 2016-03-18 18:47
硬件狗的浪漫
硬件平台:STM32F4-discovery开发平台:VS2013+VisualGDB5.12   ...
mzwhhwj 2015-11-06 09:24
评论:@sunyzz 博客中提到的“【博客大赛】“虚短”“虚断”两板斧,搞定运算放大器!”
学习了...
EE直播间
更多
我要评论
1
16
关闭 站长推荐上一条 /3 下一条