MQTT-Reactive的目的是提供一种用C编写的可移植且无阻塞的MQTT客户端,以便在反应式嵌入式系统中使用。首先,本文说明什么是反应系统。然后,它描述了如何为这种系统设计合适的软件结构。最后,本文展示了如何通过使用状态机和事件驱动范例在反应系统中使用MQTT-Reactive库。为此,本文使用一个真实的IoT设备作为演示示例,通过使用状态机,交互和结构等UML图,从中解释其软件结构和基于状态的行为。本文还提供了使用C语言实现IoT设备的MQTT-Reactive客户端的指南。
许多嵌入式系统是反应性的,即它们对内部或外部事件有反应。这些反应完成后,软件将返回以等待下一个事件。这就是为什么将事件驱动系统称为反应系统的原因。
事件驱动编程或反应式编程是为反应式系统实现灵活,可预测和可维护的软件的最合适的编程范例之一。在这种范例中,程序的流程由事件决定。通常,反应式软件的结构由几个并发单元(称为活动对象)组成,这些单元等待并处理各种事件。每个活动对象都拥有一个控制线程和一个事件队列,通过它可以处理其传入事件。在反应系统中,活动对象通常具有在状态图中定义的基于状态的行为。
为了探索如何在具有多个并发任务的反应性系统中使用MQTT-Reactive库,并同时使用状态机和事件驱动范例,我们以IoT设备为例。
使用MQTT协议的想法是在为铁路公司开发IoT设备时诞生的。该设备是一个清晰的反应系统,能够:
- 检测并存储几个数字输入的变化
- 采集,滤波和存储多个模拟信号
- 定期将存储的信息发送到远程服务器
- 通过GSM网络上的MQTT协议发送和接收信息
选择MQTT是因为它是一种基于发布者-订阅者的轻量级消息传递协议,通常用于需要高延迟和低数据速率链接的IoT和网络应用程序中,例如GSM网络。
通过使用LiamBindle的MQTT-C的修改版,可以实现上述IoT设备的MQTT功能。由于该设备的软件被设计为响应软件,因此必须对MQTT-C进行修改,以通过交换异步事件将其与系统的其余部分进行通信。这些事件用于通过网络接收和发送流量,以及将敏感信息连接和发布到服务器。产生的软件库称为MQTT-Reactive。
状态机
如图1所示,通过状态机使用了MQTT-Reactive,该状态机为MQTT-Reactive客户端的基本行为建模。它是一个称为MqttMgr(MQTT管理器)的活动对象。图1中的状态机操作演示了如何从状态机中使用MQTT-Reactive库。即使在图1中将C语言用作操作语言,也可以使用任何计算机或形式语言。
图1. MQTT-Reactive客户端的状态机
图1中的状态机以WaitingForNetConnection状态启动。与服务器建立网络连接后,WaitingForNetConnection接收到Activate事件,然后状态机转换为WaitingForSync状态。只有在此状态下,状态机才能将MQTT消息传递给代理,例如CONNECT或PUBLISH分别通过Connect和Publish事件。Sync状态使用UML的特殊机制来推迟Publish事件,该事件由Sync状态的内部隔离专区中的defer关键字指定。如果在Sync为当前状态时发生Publish事件,它将被保存(延迟)以备将来处理,直到SM进入Publish事件不在其延迟事件列表中的状态(如WaitingForSync或WaitingForNetConnection)。进入此类状态后,状态机将自动调用任何已保存的Publish事件,然后根据转换目标状态使用或丢弃此事件。
每隔SyncTime毫秒,状态机就会转换到Sync复合状态,该状态通过将Receive和Send事件发布到网络管理器来实际从网络发送和接收流量。它是处理网络问题的并发实体。
即使引入的MqttMgr仅支持CONNECT和PUBLISH数据包,也可以通过简单的更改来支持SUBSCRIBE数据包。
状态机使用params关键字来操作对消费事件的参数的访问。例如,在以下转换中,Connect事件包含两个参数clientId和keepAlive,它们的值用于更新相应的MqttMgr对象的属性:
在此示例中,Connect(clientId,keepAlive)事件是转换的触发器,而mqtt_connect()调用是由此执行的操作的一部分。换句话说,当MqttMgr对象收到参数为'publishing_client'和'400'的Connect(clientId,keepAlive)事件,Connect(“ publishing_client”,400)时,MqttMgr的clientId和keepAlive属性将更新为值'因此,“ publishing_client”和“ 400”。
为了创建和发送事件,状态机的动作使用GEN()宏。例如,以下语句将Receive事件发送到Collector对象,Collector指针将其作为MqttMgr对象的属性来引用:
GEN()语句的第一个参数是接收事件的对象,而第二个参数是要发送的事件,包括事件参数(如果有)。参数必须与事件参数一致。例如,以下语句生成一个ConnRefused(code)事件,并将其作为事件参数传递给Broker返回,并将其发送到Collector对象:
使用params关键字访问消耗的事件的参数,并使用GEN()宏从操作生成事件的想法是从Rational Rhapsody Developer的代码生成器中采纳的,仅用于说明目的。
图1中状态机的默认操作设置了从代理接收到连接接受时由MQTT-Reactive调用的回调。此回调应在MqttMgr代码中实现。此回调必须生成ConnAccepted或ConnRefused(code)事件以发送到Collector对象,如下所示。
模型实施
通过使用您喜欢的软件工具或仅使用自己的状态机实现,可以用C或C ++实现图1中的模型。Internet上有许多不同的工具可以做到这一点,例如RKH框架,QP框架,Yakindu Statechart工具或Rational Rhapsody Developer等等。它们都支持Statecharts和C / C ++语言。而且,其中一些工具包括绘制Statechart图并从中生成代码的工具。
此状态机是从称为MqttMgr(MQTT管理器)的活动对象执行的,该对象提供了MQTT-Reactive代码的严格封装,并且它是唯一允许调用任何MQTT-Reactive函数或访问MQTT-Reactive数据的实体。通过与MqttMgr交换事件,系统中的其他并发实体以及所有ISR只能间接使用MQTT-Reactive。使用此机制同步并发实体并在它们之间共享数据避免了处理传统阻塞机制(如信号量,互斥量,延迟或事件标志)的危险。这些机制可能导致难以诊断和修复的繁琐的意外故障。
MqttMgr活动对象将其属性封装为一组数据项。数据项使用名称和类型来指定变量,其中该类型实际上是数据类型。MqttMgr对象的数据项被映射到对象结构的成员。成员的名称和类型与对象数据的名称和类型相同。例如,MqttMgr对象类型的client属性通过值作为数据成员嵌入在MqttMgr结构内:
MqttMgr对象的数据可以直接访问和修改,而无需使用访问器或更改器操作。例如,可通过指向MqttMgr实例的me指针访问client和localRecv。
MqttMgr的属性列表如表1所示。
表1. MqttMgr属性
图2中的结构有助于牢记相关参与者之间的关系。它们是:收集器对象,它想将信息发送给代理;NetMgr对象,用于处理网络;和MqttMgr对象。
图2.物联网系统结构草案
图3中的序列图显示了当需要打开与MQTT服务器的会话时MqttMgr对象如何与系统其余部分进行交互。在此图中,MqttMgr状态和交换的异步消息在Collector,MqttMgr和NetMgr actor之间显示。
图3.连接到MQTT代理
NetMgr对象与代理建立网络连接后,从MqttMgr发送到MQTT服务器的第一个数据包必须是CONNECT数据包。因此,收集器参与者会向MqttMgr参与者发送Connect(clientId,keepAlive)事件。此事件必须带有客户端标识符和保持活动时间间隔。如果服务器接受连接请求,则MqttMgr actor将ConnAccepted事件发送给Collector actor通知这种情况。从那时起,收集器参与者可以将信息消息发布到该代理。
如果服务器拒绝连接请求,则MqttMgr actor将ConnRefused事件发送给Collector actor。此事件带有一个代码,该代码通知拒绝原因,如图4所示。请参见MQTT v3.1.1第3.2.2.3节。
图4.代理拒绝连接请求
图5显示了发布消息时的交互流程。为此,收集器参与者发送一个Publish(data,size,topic,qos)事件,该事件包含要发布的信息(数据),信息的长度(以字节为单位)(size),主题名称,信息将被发布(主题)和传递此消息的保证级别(质量保证)。在前面提到的IoT设备中,使用JSON规范对发布的信息进行了格式化。这是一种开放的标准格式,其中包含具有人类可读文本中的属性值对的数据对象。这种格式是使用jWrite完成的,它是一个用C编写的简单轻量级的库。
图5.将数据发布到代理
图6显示了一个场景,其中MQTT消息的接收和向网络的发送失败。如果网络管理员无法从网络接收流量,它将向MqttMgr actor发送ReceiveFail。同样,如果网络管理器无法将数据发送到网络,它将发送SendFail到MqttMgr actor。
图6.网络故障
表2总结了所示方案中涉及的事件。
表2.事件
结论
通过避免传统的阻塞机制(如信号量,互斥量,延迟或事件标志)的危险,MQTT-Reactive库,状态机和本文提出的软件体系结构允许反应性嵌入式系统以新颖的方式实现MQTT客户端。道路。这是通过将MQTT-Reactive代码封装在并发单元(称为活动对象)中来实现的,该并发单元中的基于状态的行为在建议的状态机中定义。该活动对象通过交换所使用的异步事件与系统的其余部分进行通信:不仅用于通过网络接收和发送流量,还用于将信息连接和发布到物联网应用程序的服务器。