本帖最后由 JerryZhen 于 2025-4-15 20:57 编辑
一、设计背景与目标
本文提出基于Python异步编程模型的设备管理方案,面向以下核心需求:
- 支持多设备的并发管理
- 毫秒级响应延迟要求
- 多协议统一接入规范
二、架构设计思想
2.1 分层架构
2.2 核心设计原则
- 单一职责原则:每个协议适配器仅处理特定协议通信
- 开闭原则:通过继承ProtocolHandler扩展新协议
- 接口隔离:设备管理不依赖具体协议实现
- 异步优先:全链路非阻塞设计
三、关键技术实现
3.1 异步协议适配器
class AsyncProtocolHandler(ABC):
@abstractmethod
async def connect(self, **config):
"""建立长连接并维护连接池"""
@abstractmethod
async def keepalive(self):
"""心跳保活机制"""
@abstractmethod
async def batch_read(self, devices: List[str]):
"""批量读取优化""" 复制代码
3.2 连接池管理
四、核心代码解析
4.1 设备管理器实现
class AsyncIoTDeviceManager:
def __init__(self):
self.protocols: Dict[str, AsyncProtocolHandler] = {}
self.devices: Dict[str, Dict] = {}
self._lock = asyncio.Lock()
async def register_protocol(self, name: str, handler: AsyncProtocolHandler):
async with self._lock:
if name in self.protocols:
logging.warning(f"Protocol {name} already registered. Overwriting.")
self.protocols[name] = handler
try:
await handler.connect()
except Exception as e:
logging.error(f"Failed to connect protocol {name}: {e}")
async def add_device(self, device_id: str, protocol: str, config: Dict):
async with self._lock:
if protocol not in self.protocols:
raise ValueError(f"Protocol {protocol} not registered")
self.devices[device_id] = {
"protocol": protocol,
"config": config,
"status": "active"
}
logging.info(f"Device {device_id} added with protocol {protocol}")
async def read_device(self, device_id: str) -> Any:
async with self._lock:
device = self.devices.get(device_id)
if not device:
raise KeyError(f"Device {device_id} not found")
handler = self.protocols[device["protocol"]]
try:
return await handler.read(device_id, **device["config"])
except Exception as e:
logging.error(f"Failed to read device {device_id}: {e}")
return None
async def write_device(self, device_id: str, data: Any) -> bool:
async with self._lock:
device = self.devices.get(device_id)
if not device:
raise KeyError(f"Device {device_id} not found")
handler = self.protocols[device["protocol"]]
try:
return await handler.write(device_id, data, **device["config"])
except Exception as e:
logging.error(f"Failed to write device {device_id}: {e}")
return False
复制代码4.2 MQTT协议实现示例
class AsyncMqttHandler(AsyncProtocolHandler):
def __init__(self):
self.connected = False
async def connect(self, **config):
logging.info(f"AsyncMqttHandler connecting to {config.get('broker', 'default_broker')}...")
try:
await asyncio.sleep(0.5)
self.connected = True
logging.info("MQTT connection established")
except Exception as e:
logging.error(f"MQTT connection failed: {e}")
self.connected = False
async def disconnect(self):
try:
await asyncio.sleep(0.1)
self.connected = False
logging.info("MQTT connection closed")
except Exception as e:
logging.error(f"MQTT disconnection failed: {e}")
async def read(self, device_id: str, **config):
print("AsyncMqttHandler read=", config)
if not self.connected:
logging.warning("MQTT connection not established, cannot read")
return None
await asyncio.sleep(0.1)
return f"Async MQTT data from {config.get('topic', 'default_topic')}"
async def write(self, device_id: str, data: Any, **config) -> bool:
if not self.connected:
logging.warning("MQTT connection not established, cannot write")
return False
logging.info(f"Async writing {data} to {config.get('topic', 'default_topic')}")
await asyncio.sleep(0.2)
return True 复制代码五、完整示例代码以及运行结果
import asyncio
from abc import ABC, abstractmethod
from typing import Dict, Any
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class AsyncProtocolHandler(ABC):
@abstractmethod
async def connect(self, **config):
pass
@abstractmethod
async def disconnect(self):
pass
@abstractmethod
async def read(self, device_id: str, **config) -> Any:
pass
@abstractmethod
async def write(self, device_id: str, data: Any, **config) -> bool:
pass
class AsyncMqttHandler(AsyncProtocolHandler):
def __init__(self):
self.connected = False
async def connect(self, **config):
logging.info(f"AsyncMqttHandler connecting to {config.get('broker', 'default_broker')}...")
try:
await asyncio.sleep(0.5)
self.connected = True
logging.info("MQTT connection established")
except Exception as e:
logging.error(f"MQTT connection failed: {e}")
self.connected = False
async def disconnect(self):
try:
await asyncio.sleep(0.1)
self.connected = False
logging.info("MQTT connection closed")
except Exception as e:
logging.error(f"MQTT disconnection failed: {e}")
async def read(self, device_id: str, **config):
print("AsyncMqttHandler read=", config)
if not self.connected:
logging.warning("MQTT connection not established, cannot read")
return None
await asyncio.sleep(0.1)
return f"Async MQTT data from {config.get('topic', 'default_topic')}"
async def write(self, device_id: str, data: Any, **config) -> bool:
if not self.connected:
logging.warning("MQTT connection not established, cannot write")
return False
logging.info(f"Async writing {data} to {config.get('topic', 'default_topic')}")
await asyncio.sleep(0.2)
return True
class AsyncModbusHandler(AsyncProtocolHandler):
def __init__(self):
self.connected = False
async def connect(self, **config):
logging.info(f"Async Modbus connecting {config.get('host', 'default_host')}...")
try:
await asyncio.sleep(0.3)
self.connected = True
logging.info("Modbus connection ready")
except Exception as e:
logging.error(f"Modbus connection failed: {e}")
self.connected = False
async def disconnect(self):
try:
await asyncio.sleep(0.1)
self.connected = False
logging.info("Modbus connection closed")
except Exception as e:
logging.error(f"Modbus disconnection failed: {e}")
async def read(self, device_id: str, **config):
if not self.connected:
logging.warning("Modbus connection not established, cannot read")
return None
await asyncio.sleep(0.15)
return f"Async Modbus register {config.get('register', 'default_register')} value"
async def write(self, device_id: str, data: Any, **config) -> bool:
if not self.connected:
logging.warning("Modbus connection not established, cannot write")
return False
logging.info(f"Async writing {data} to Modbus register")
await asyncio.sleep(0.25)
return True
class AsyncIoTDeviceManager:
def __init__(self):
self.protocols: Dict[str, AsyncProtocolHandler] = {}
self.devices: Dict[str, Dict] = {}
self._lock = asyncio.Lock()
async def register_protocol(self, name: str, handler: AsyncProtocolHandler):
async with self._lock:
if name in self.protocols:
logging.warning(f"Protocol {name} already registered. Overwriting.")
self.protocols[name] = handler
try:
await handler.connect()
except Exception as e:
logging.error(f"Failed to connect protocol {name}: {e}")
async def add_device(self, device_id: str, protocol: str, config: Dict):
async with self._lock:
if protocol not in self.protocols:
raise ValueError(f"Protocol {protocol} not registered")
self.devices[device_id] = {
"protocol": protocol,
"config": config,
"status": "active"
}
logging.info(f"Device {device_id} added with protocol {protocol}")
async def read_device(self, device_id: str) -> Any:
async with self._lock:
device = self.devices.get(device_id)
if not device:
raise KeyError(f"Device {device_id} not found")
handler = self.protocols[device["protocol"]]
try:
return await handler.read(device_id, **device["config"])
except Exception as e:
logging.error(f"Failed to read device {device_id}: {e}")
return None
async def write_device(self, device_id: str, data: Any) -> bool:
async with self._lock:
device = self.devices.get(device_id)
if not device:
raise KeyError(f"Device {device_id} not found")
handler = self.protocols[device["protocol"]]
try:
return await handler.write(device_id, data, **device["config"])
except Exception as e:
logging.error(f"Failed to write device {device_id}: {e}")
return False
async def main():
manager = AsyncIoTDeviceManager()
await asyncio.gather(
manager.register_protocol("mqtt", AsyncMqttHandler()),
manager.register_protocol("modbus", AsyncModbusHandler())
)
await manager.add_device("sensor1", "mqtt", {"topic": "sensors/temp", "broker": "mqtt.example.com"})
await manager.add_device("plc1", "modbus", {"host": "192.168.1.10", "register": 40001})
await manager.add_device("plc2", "modbus", {"host": "192.168.1.20", "register": 40001})
results = await asyncio.gather(
manager.read_device("sensor1"),
manager.write_device("plc1", 42),
manager.write_device("plc2", 43),
return_exceptions=True
)
print(f"Read result: {results[0]}")
print(f"Write status: {results[1]}")
if __name__ == "__main__":
asyncio.run(main()) 复制代码
六、总结与展望
本文提出的异步架构在具有以下特点:
- 协议无关性:统一接入层设计
- 弹性扩展:动态加载协议驱动
- 资源高效:单节点高并发支撑