本篇測評由電子工程世界的優秀測評者“JerryZhen”提供。 本文將介紹基于米爾電子MYD-LT527開發板的網關方案測試。
一、系統概述 基于米爾-全志 T527設計一個簡易的物聯網網關,該網關能夠管理多臺MQTT設備,通過MQTT協議對設備進行讀寫操作,同時提供HTTP接口,允許用戶通過HTTP協議與網關進行交互,并對設備進行讀寫操作。 二、系統架構 三、組件設計 MQTT組件:
- 負責與MQTT broker建立連接。
- 訂閱設備主題,接收設備發送的消息。
發布消息到設備,實現遠程控制。
設備管理組件:
- 維護一個設備列表,記錄設備的唯一標識符(如設備ID)、MQTT主題、連接狀態等信息。
提供設備增刪改查的方法。
HTTP組件:
- 基于FastAPI定義HTTP接口。
- 接收用戶請求,調用MQTT組件和設備管理組件進行相應操作。
返回操作結果給用戶。
四、接口設計 五、數據結構設計 設備信息:
- 設備ID (device_id):唯一標識設備的字符串。
- MQTT主題 (mqtt_topic):設備在MQTT broker上的主題。
- 連接狀態 (connection_status):表示設備是否在線的布爾值。
其他設備屬性(如名稱、描述等)。
設備數據:
六、安全性考慮 七、部署與擴展 八、實現步驟 - 安裝所需的Python庫:fastapi, uvicorn, paho-mqtt等。
- 創建FastAPI應用并定義路由。
- 實現MQTT組件,包括與MQTT broker的連接、訂閱、發布等功能。
- 實現設備管理組件,維護設備列表并提供增刪改查的方法。
- 實現HTTP組件,調用MQTT組件和設備管理組件處理用戶請求。
- 編寫測試代碼,驗證網關的各項功能是否正常工作。
部署網關服務并監控其運行狀態。
該設計方案僅僅是概述,具體實現細節可能需要根據實際需求和項目環境進行調整和優化。在實際開發中,還需要考慮異常處理、日志記錄、性能優化等方面的問題。基于上述設計方案,以下是一個簡化版的參考代碼,展示了如何使用FastAPI和paho-mqtt庫來創建一個物聯網網關。需要注意,示例中不包含完整的錯誤處理、用戶認證和授權機制,這些在實際生產環境中都是必不可少的。依賴的主要庫版本: fastapi==0.108.0 paho-mqtt==1.6.1 網關模擬代碼gateway.py: - from fastapi import FastAPI, HTTPException, Body, status
- from paho.mqtt.client import Client as MQTTClient
- from typing import List, Dict, Any
- import asyncio
- import json
- app = FastAPI()
- mqtt_client = None
- device_data = {}
- subtopic="gateway/device/#"
- # MQTT回調函數
- def on_message(client, userdata, msg):
- payload = msg.payload.decode()
- topic = msg.topic
- device_id = topic.split('/')[-1]
- device_data[device_id] = payload
- print(f"Received message from {device_id}: {payload}")
-
- # MQTT連接和訂閱
- def mqtt_connect_and_subscribe(broker_url, broker_port):
- global mqtt_client
- mqtt_client = MQTTClient()
- mqtt_client.on_message = on_message
- mqtt_client.connect(broker_url, broker_port, 60)
- mqtt_client.subscribe(subtopic)
- mqtt_client.loop_start()
-
- # MQTT發布消息
- async def mqtt_publish(topic: str, message: str):
- if mqtt_client is not None and mqtt_client.is_connected():
- mqtt_client.publish(topic, message)
- else:
- print("MQTT client is not connected!")
-
- # 設備管理:添加設備
- @app.post("/devices/", status_code=status.HTTP_201_CREATED)
- async def add_device(device_id: str):
- device_data[device_id] = None
- return {"message": f"Device {device_id} added"}
-
- # 設備管理:獲取設備列表
- @app.get("/devices/")
- async def get_devices():
- return list(device_data.keys())
-
- # 設備管理:獲取設備數據
- @app.get("/devices/{device_id}/data")
- async def get_device_data(device_id: str):
- if device_id not in device_data:
- raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Device {device_id} not found")
- return device_data.get(device_id)
-
- # 設備管理:發送數據到設備
- @app.post("/devices/{device_id}/data")
- async def send_data_to_device(device_id: str, data: Dict[str, Any] = Body(...)):
- topic = f"devices/{device_id}"
- message = json.dumps(data)
- await mqtt_publish(topic, message)
- return {"message": f"Data sent to {device_id}"}
-
- # 設備控制:發送控制命令到設備
- @app.post("/devices/{device_id}/control")
- async def control_device(device_id: str, command: str):
- topic = f"devices/device/{device_id}"
- await mqtt_publish(topic, command)
- return {"message": f"Control command sent to {device_id}"}
-
- # FastAPI啟動事件
- @app.on_event("startup")
- async def startup_event():
- mqtt_connect_and_subscribe("127.0.0.1", 1883)
-
- # FastAPI關閉事件
- @app.on_event("shutdown")
- async def shutdown_event():
- if mqtt_client is not None:
- mqtt_client.loop_stop()
- mqtt_client.disconnect()
-
- # 運行FastAPI應用
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="127.0.0.1", port=8000)
復制代碼
設備1模擬代碼 dev1.py: - import paho.mqtt.client as mqtt
- # 連接成功回調
- def on_connect(client, userdata, flags, rc):
- print('Connected with result code '+str(rc))
- client.subscribe('devices/1')
- # 消息接收回調
- def on_message(client, userdata, msg):
- print(msg.topic+" "+str(msg.payload))
- client.publish('gateway/device/1',payload=f'echo {msg.payload}',qos=0)
-
- client = mqtt.Client()
- # 指定回調函數
- client.on_connect = on_connect
- client.on_message = on_message
- # 建立連接
- client.connect('127.0.0.1', 1883)
- # 發布消息
- client.publish('gateway/device/1',payload='Hello, I am device',qos=0)
- client.loop_forever()
復制代碼
設備2模擬代碼 dev2.py - import paho.mqtt.client as mqtt
- # 連接成功回調
- def on_connect(client, userdata, flags, rc):
- print('Connected with result code '+str(rc))
- client.subscribe('devices/2')
- # 消息接收回調
- def on_message(client, userdata, msg):
- print(msg.topic+" "+str(msg.payload))
- client.publish('gateway/device/2',payload=f'echo {msg.payload}',qos=0)
- client = mqtt.Client()
- # 指定回調函數
- client.on_connect = on_connect
- client.on_message = on_message
- # 建立連接
- client.connect('127.0.0.1', 1883)
- # 發布消息
- client.publish('gateway/device/2',payload='Hello, I am device',qos=0)
- client.loop_forever()
復制代碼
運行網關代碼,打開網頁得到api接口:
通過api分別添加設備1和設備2,
在另外兩個控制臺中分別運行模擬設備1和模擬設備2的代碼通過網頁API向設備1發送數據
通過網頁API獲得設備回復的數據,設備代碼中只是簡單的把網關發過來的數據進行回傳
我們在網關的后臺可以看到完整的數據流
至此一個簡易的網關已經實現了,接下來將會嘗試實現樓宇里的最常見的bacnet設備進行通訊管理。
|