本帖最后由 JerryZhen 于 2025-4-15 20:57 编辑

一、设计背景与目标

本文提出基于Python异步编程模型的设备管理方案,面向以下核心需求:

  • 支持多设备的并发管理
  • 毫秒级响应延迟要求
  • 多协议统一接入规范

image.png

二、架构设计思想

2.1 分层架构

image.png

2.2 核心设计原则

  • 单一职责原则:每个协议适配器仅处理特定协议通信
  • 开闭原则:通过继承ProtocolHandler扩展新协议
  • 接口隔离:设备管理不依赖具体协议实现
  • 异步优先:全链路非阻塞设计

三、关键技术实现

3.1 异步协议适配器

class AsyncProtocolHandler(ABC):
  •     @abstractmethod
  •     async def connect(self, **config):
  •         """建立长连接并维护连接池"""
  •         
  •     @abstractmethod
  •     async def keepalive(self):
  •         """心跳保活机制"""
  •         
  •     @abstractmethod
  •     async def batch_read(self, devices: List[str]):
  •         """批量读取优化"""
  • 复制代码

    3.2 连接池管理

    image.png

    四、核心代码解析

    4.1 设备管理器实现

    # 异步设备管理器
  • class AsyncIoTDeviceManager:
  •     def __init__(self):
  •         self.protocols: Dict[str, AsyncProtocolHandler] = {}
  •         self.devices: Dict[str, Dict] = {}
  •         self._lock = asyncio.Lock()

  •     async def register_protocol(self, name: str, handler: AsyncProtocolHandler):
  •         async with self._lock:
  •             if name in self.protocols:
  •                 logging.warning(f"Protocol {name} already registered. Overwriting.")
  •             self.protocols[name] = handler
  •             try:
  •                 await handler.connect()
  •             except Exception as e:
  •                 logging.error(f"Failed to connect protocol {name}: {e}")

  •     async def add_device(self, device_id: str, protocol: str, config: Dict):
  •         async with self._lock:
  •             if protocol not in self.protocols:
  •                 raise ValueError(f"Protocol {protocol} not registered")

  •             self.devices[device_id] = {
  •                 "protocol": protocol,
  •                 "config": config,
  •                 "status": "active"
  •             }
  •             logging.info(f"Device {device_id} added with protocol {protocol}")

  •     async def read_device(self, device_id: str) -> Any:
  •         async with self._lock:
  •             device = self.devices.get(device_id)
  •             if not device:
  •                 raise KeyError(f"Device {device_id} not found")

  •             handler = self.protocols[device["protocol"]]
  •             try:
  •                 return await handler.read(device_id, **device["config"])
  •             except Exception as e:
  •                 logging.error(f"Failed to read device {device_id}: {e}")
  •                 return None

  •     async def write_device(self, device_id: str, data: Any) -> bool:
  •         async with self._lock:
  •             device = self.devices.get(device_id)
  •             if not device:
  •                 raise KeyError(f"Device {device_id} not found")

  •             handler = self.protocols[device["protocol"]]
  •             try:
  •                 return await handler.write(device_id, data, **device["config"])
  •             except Exception as e:
  •                 logging.error(f"Failed to write device {device_id}: {e}")
  •                 return False
  • 复制代码

    4.2 MQTT协议实现示例

    # 异步MQTT实现
  • class AsyncMqttHandler(AsyncProtocolHandler):
  •     def __init__(self):
  •         self.connected = False

  •     async def connect(self, **config):
  •         logging.info(f"AsyncMqttHandler connecting to {config.get('broker', 'default_broker')}...")
  •         try:
  •             await asyncio.sleep(0.5)  # 模拟连接延迟
  •             self.connected = True
  •             logging.info("MQTT connection established")
  •         except Exception as e:
  •             logging.error(f"MQTT connection failed: {e}")
  •             self.connected = False

  •     async def disconnect(self):
  •         try:
  •             await asyncio.sleep(0.1)
  •             self.connected = False
  •             logging.info("MQTT connection closed")
  •         except Exception as e:
  •             logging.error(f"MQTT disconnection failed: {e}")

  •     async def read(self, device_id: str, **config):
  •         print("AsyncMqttHandler read=", config)
  •         if not self.connected:
  •             logging.warning("MQTT connection not established, cannot read")
  •             return None
  •         await asyncio.sleep(0.1)  # 模拟网络延迟
  •         return f"Async MQTT data from {config.get('topic', 'default_topic')}"

  •     async def write(self, device_id: str, data: Any, **config) -> bool:
  •         if not self.connected:
  •             logging.warning("MQTT connection not established, cannot write")
  •             return False
  •         logging.info(f"Async writing {data} to {config.get('topic', 'default_topic')}")
  •         await asyncio.sleep(0.2)
  •         return True
  • 复制代码

    五、完整示例代码以及运行结果

    import asyncio
  • from abc import ABC, abstractmethod
  • from typing import Dict, Any
  • import logging

  • # 配置日志
  • logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


  • # 异步协议接口
  • class AsyncProtocolHandler(ABC):
  •     @abstractmethod
  •     async def connect(self, **config):
  •         pass

  •     @abstractmethod
  •     async def disconnect(self):
  •         pass

  •     @abstractmethod
  •     async def read(self, device_id: str, **config) -> Any:
  •         pass

  •     @abstractmethod
  •     async def write(self, device_id: str, data: Any, **config) -> bool:
  •         pass


  • # 异步MQTT实现
  • class AsyncMqttHandler(AsyncProtocolHandler):
  •     def __init__(self):
  •         self.connected = False

  •     async def connect(self, **config):
  •         logging.info(f"AsyncMqttHandler connecting to {config.get('broker', 'default_broker')}...")
  •         try:
  •             await asyncio.sleep(0.5)  # 模拟连接延迟
  •             self.connected = True
  •             logging.info("MQTT connection established")
  •         except Exception as e:
  •             logging.error(f"MQTT connection failed: {e}")
  •             self.connected = False

  •     async def disconnect(self):
  •         try:
  •             await asyncio.sleep(0.1)
  •             self.connected = False
  •             logging.info("MQTT connection closed")
  •         except Exception as e:
  •             logging.error(f"MQTT disconnection failed: {e}")

  •     async def read(self, device_id: str, **config):
  •         print("AsyncMqttHandler read=", config)
  •         if not self.connected:
  •             logging.warning("MQTT connection not established, cannot read")
  •             return None
  •         await asyncio.sleep(0.1)  # 模拟网络延迟
  •         return f"Async MQTT data from {config.get('topic', 'default_topic')}"

  •     async def write(self, device_id: str, data: Any, **config) -> bool:
  •         if not self.connected:
  •             logging.warning("MQTT connection not established, cannot write")
  •             return False
  •         logging.info(f"Async writing {data} to {config.get('topic', 'default_topic')}")
  •         await asyncio.sleep(0.2)
  •         return True


  • # 异步Modbus实现
  • class AsyncModbusHandler(AsyncProtocolHandler):
  •     def __init__(self):
  •         self.connected = False

  •     async def connect(self, **config):
  •         logging.info(f"Async Modbus connecting {config.get('host', 'default_host')}...")
  •         try:
  •             await asyncio.sleep(0.3)
  •             self.connected = True
  •             logging.info("Modbus connection ready")
  •         except Exception as e:
  •             logging.error(f"Modbus connection failed: {e}")
  •             self.connected = False

  •     async def disconnect(self):
  •         try:
  •             await asyncio.sleep(0.1)
  •             self.connected = False
  •             logging.info("Modbus connection closed")
  •         except Exception as e:
  •             logging.error(f"Modbus disconnection failed: {e}")

  •     async def read(self, device_id: str, **config):
  •         if not self.connected:
  •             logging.warning("Modbus connection not established, cannot read")
  •             return None
  •         await asyncio.sleep(0.15)
  •         return f"Async Modbus register {config.get('register', 'default_register')} value"

  •     async def write(self, device_id: str, data: Any, **config) -> bool:
  •         if not self.connected:
  •             logging.warning("Modbus connection not established, cannot write")
  •             return False
  •         logging.info(f"Async writing {data} to Modbus register")
  •         await asyncio.sleep(0.25)
  •         return True


  • # 异步设备管理器
  • class AsyncIoTDeviceManager:
  •     def __init__(self):
  •         self.protocols: Dict[str, AsyncProtocolHandler] = {}
  •         self.devices: Dict[str, Dict] = {}
  •         self._lock = asyncio.Lock()

  •     async def register_protocol(self, name: str, handler: AsyncProtocolHandler):
  •         async with self._lock:
  •             if name in self.protocols:
  •                 logging.warning(f"Protocol {name} already registered. Overwriting.")
  •             self.protocols[name] = handler
  •             try:
  •                 await handler.connect()
  •             except Exception as e:
  •                 logging.error(f"Failed to connect protocol {name}: {e}")

  •     async def add_device(self, device_id: str, protocol: str, config: Dict):
  •         async with self._lock:
  •             if protocol not in self.protocols:
  •                 raise ValueError(f"Protocol {protocol} not registered")

  •             self.devices[device_id] = {
  •                 "protocol": protocol,
  •                 "config": config,
  •                 "status": "active"
  •             }
  •             logging.info(f"Device {device_id} added with protocol {protocol}")

  •     async def read_device(self, device_id: str) -> Any:
  •         async with self._lock:
  •             device = self.devices.get(device_id)
  •             if not device:
  •                 raise KeyError(f"Device {device_id} not found")

  •             handler = self.protocols[device["protocol"]]
  •             try:
  •                 return await handler.read(device_id, **device["config"])
  •             except Exception as e:
  •                 logging.error(f"Failed to read device {device_id}: {e}")
  •                 return None

  •     async def write_device(self, device_id: str, data: Any) -> bool:
  •         async with self._lock:
  •             device = self.devices.get(device_id)
  •             if not device:
  •                 raise KeyError(f"Device {device_id} not found")

  •             handler = self.protocols[device["protocol"]]
  •             try:
  •                 return await handler.write(device_id, data, **device["config"])
  •             except Exception as e:
  •                 logging.error(f"Failed to write device {device_id}: {e}")
  •                 return False


  • # 使用示例
  • async def main():
  •     manager = AsyncIoTDeviceManager()

  •     # 注册协议(并行初始化)
  •     await asyncio.gather(
  •     manager.register_protocol("mqtt", AsyncMqttHandler()),
  •     manager.register_protocol("modbus", AsyncModbusHandler())
  •     )

  •     # 添加设备(顺序执行)
  •     await manager.add_device("sensor1", "mqtt", {"topic": "sensors/temp", "broker": "mqtt.example.com"})
  •     await manager.add_device("plc1", "modbus", {"host": "192.168.1.10", "register": 40001})
  •     await manager.add_device("plc2", "modbus", {"host": "192.168.1.20", "register": 40001})

  •     # 并行操作设备
  •     results = await asyncio.gather(
  •         manager.read_device("sensor1"),
  •         manager.write_device("plc1", 42),
  •          manager.write_device("plc2", 43),
  •         return_exceptions=True
  •     )

  •     print(f"Read result: {results[0]}")
  •     print(f"Write status: {results[1]}")


  • if __name__ == "__main__":
  •     asyncio.run(main())
  • 复制代码

    image.png


    六、总结与展望

    本文提出的异步架构在具有以下特点:

    • 协议无关性:统一接入层设计
    • 弹性扩展:动态加载协议驱动
    • 资源高效:单节点高并发支撑