您好,登錄后才能下訂單哦!
這篇文章主要講解了“EMQ X+TDengine怎么搭建MQTT物聯網可視化平臺”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“EMQ X+TDengine怎么搭建MQTT物聯網可視化平臺”吧!
EMQ X 是基于高并發的 Erlang/OTP 語言平臺開發,支持百萬級連接和分布式集群架構,發布訂閱模式的開源 MQTT 消息服務器。EMQ X 內置了大量開箱即用的功能,其 開源版 EMQ X Broker 及 企業版 EMQ X Enterprise 均支持通過規則引擎將設備消息存儲到 TDengine。
TDengine 是濤思數據專為物聯網、車聯網、工業互聯網、IT 運維等設計和優化的大數據平臺。除核心的快 10 倍以上的時序數據庫功能外,還提供緩存、數據訂閱、流式計算等功能,最大程度減少研發和運維的復雜度,且核心代碼,包括集群功能全部開源。
TDengine 提供社區版、企業版和云服務版,安裝/使用教程詳見 TDengine 使用文檔。
Grafana 是一個跨平臺、開源的度量分析和可視化工具,可以查詢處理各類數據源中的數據,進行可視化的展示。它可以快速靈活創建的客戶端圖表,面板插件有許多不同方式的可視化指標和日志,官方庫中具有豐富的儀表盤插件,比如熱圖、折線圖、圖表等多種展示方式;支持 Graphite,TDengine、InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch和 KairosDB 等數據源,支持數據項獨立/混合查詢展示;可以創建自定義告警規則并通知到其他消息處理服務或組件中。
本文模擬物聯網環境數據采集場景,假設現有一定數據的環境數據采集點,所有采集點數據均通過 MQTT 協議 傳輸至采集平臺(MQTT Publish),主題設計如下:
sensor/data
傳感器發送的數據格式為 JSON,數據包括傳感器采集的溫度、濕度、噪聲音量、PM10、PM2.5、二氧化硫、二氧化氮、一氧化碳、傳感器 ID、區域、采集時間等數據。
{ "temperature": 30, "humidity" : 20, "volume": 44.5, "PM10": 23, "pm25": 61, "SO2": 14, "NO2": 4, "CO": 5, "id": "10-c6-1f-1a-1f-47", "area": 1, "ts": 1596157444170 }
現在需要實時存儲以便在后續任意時間查看數據,提出以下的需求:
每個設備按照每 5 秒鐘一次的頻率進行數據上報,數據庫需存儲每條數據以供后續回溯分析;
通過可視化系統查看 任意區域、任意時間區間內 的指標數據,如平均值、最大值、最小值。
本文所用各個組件均有 Docker 鏡像,除 EMQ X 需要修改少數配置為了便于操作使用下載安裝外,TDengine 與 Grafana 均使用 Docker 搭建。
安裝包資源與使用教程參照各自官網:
EMQ X:EMQ 官網 https://www.emqx.io/cn/
TDengine:濤思數據官網 https://www.taosdata.com/cn/
Grafana:Grafana 官網 https://grafana.com/
如果您是 EMQ X 新手用戶,推薦通過 EMQ X 文檔 快速上手
訪問 EMQ X 下載 頁面下載適合您操作系統的安裝包,本文截稿時 EMQ X 開源版最新版本為 v4.1.1,下載 zip 包的啟動步驟如下 :
## 解壓下載好的安裝包 unzip emqx-macosx-v4.1.1.zip cd emqx ## 以 console 模式啟動 EMQ X 方便調試 ./bin/emqx console
啟動成功后瀏覽器訪問 http://127.0.0.1:18083 訪問 EMQ X 管理控制臺 Dashboard,使用 admin
public
默認用戶名密碼完成初次登錄。
EMQ X 企業版 4.1.2 提供了原生 TDengine 寫入插件,性能更好、使用更方便,請移步規則引擎-寫入數據到 TDengine查看
為了方便測試使用通過 Docker 進行安裝(需映射網絡端口),也可以使用安裝包的方式進行安裝:
## 拉取并啟動容器 docker run -d --name tdengine -p 6030-6041:6030-6041 tdengine/tdengine:latest ## 啟動后檢查容器運行狀態 docker ps -a
使用以下命令通過 Docker 安裝并啟動 Grafana:
docker run -d --name=grafana -p 3000:3000 grafana/grafana
啟動成功后瀏覽器訪問 http://127.0.0.1:3000 訪問 Grafana 可視化面板,使用 admin
admin
默認用戶名密碼完成初次登錄,登錄后按照提示修改密碼使用新密碼登錄進入主界面:
進入TDengine Docker 容器:
docker exec -it tdengine bash
創建 test
數據庫:
taos create database test;
創建 sensor_data 表,關于 TDengine 數據結構以及 SQL 命令參見 TAOS SQL:
use test; CREATE TABLE sensor_data ( ts timestamp, temperature float, humidity float, volume float, PM10 float, pm25 float, SO2 float, NO2 float, CO float, sensor_id NCHAR(255), area TINYINT, coll_time timestamp );
打開 EMQ X Dashboared,進入 規則引擎 -> 規則 頁面,點擊 創建 按鈕進入創建頁面。
規則 SQL 用于 EMQ X 消息以及事件篩選,以下 SQL 表示從 sensor/data
主題篩選出 payload 數據:
SELECT payload FROM "sensor/data"
使用 SQL 測試功能 ,輸入測試數據進行篩選結果測試,測試有結果且輸出內容如下,標明 SQL 編寫正確:
{ "payload": "{\"temperature\":30,\"humidity\":20,\"volume\":44.5,\"PM10\":23,\"pm2.5\":61,\"SO2\":14,\"NO2\":4,\"CO\":5,\"id\":\"10-c6-1f-1a-1f-47\",\"area\":1,\"ts\":1596157444170}" }
為支持各種不同類型平臺的開發,TDengine 提供符合 REST 設計標準的 API。通過 RESTful Connector 提供了最簡單的連接方式,即使用 HTTP 請求攜帶認證信息與要執行的 SQL 操作 TDengine。
使用 EMQ X 開源版中的 發送到 Web 服務 即可通過 RESTful Connector 寫入數據到 TDengine。即將到來的 EMQ X 企業版 4.1.1 版本將提供原生更高性能的寫入 Connector。
發送到 Web 服務需要兩個數據,一個是關聯資源,另一個是消息內容模板。
關聯資源:HTTP 服務器配置信息,此處為 TDengine 的 RESTful Connector
消息內容模板:此處為攜帶數據的 INSERT SQL,注意我們應當在 SQL 中指定數據庫名,字符類型也要用單引號括起來, 消息內容模板為:
INSERT INTO test.sensor_data VALUES( now, ${payload.temperature}, ${payload.humidity}, ${payload.volume}, ${payload.PM10}, ${payload.pm25}, ${payload.SO2}, ${payload.NO2}, ${payload.CO}, '${payload.id}', ${payload.area}, ${payload.ts} )
點擊響應動作下的 添加 按鈕,在彈出框內選擇 發送數據到 Web 服務,點擊 新建資源 新建一個 WebHook 資源。
資源類型選擇 Webhook,請求 URL 填寫 http://127.0.0.1:6041/rest/sql,請求方法選擇 POST, 還需添加 Authorization 請求頭作為認證信息 。
Authorization 的值為 Basic + TDengine 的 {username}:{password}
經過 Base64 編碼之后的字符串, 例如 root:taosdata
編碼后實際填入的值為:Basic cm9vdDp0YW9zZGF0YQ==
在響應動作創建頁面選擇新建的資源,并填入消息模板內容即可。
以下腳本模擬了 10000 個設備在過去 24 小時內、每隔 5 秒鐘上報一條模擬數據并發送到 EMQ X 的場景。
總數據量: 24 * 3600 / 5 * 100 = 172 萬條
消息 TPS: 20
讀者安裝 Node.js ,按需修改配置參數后可以通過以下命令啟動:
npm install mqtt mockjs --save --registry=https://registry.npm.taobao.org node mock.js
附:模擬生成數據并發送到 EMQ X 代碼,請根據集群性能調整相關參數
// mock.js const mqtt = require('mqtt') const Mock = require('mockjs') const EMQX_SERVER = 'mqtt://localhost:1883' const CLIENT_NUM = 100 const STEP = 5000 // 模擬采集時間間隔 ms const AWAIT = 5000 // 每次發送完后休眠時間,防止消息速率過快 ms const CLIENT_POOL = [] startMock() function sleep(timer = 100) { return new Promise(resolve => { setTimeout(resolve, timer) }) } async function startMock() { const now = Date.now() for (let i = 0; i < CLIENT_NUM; i++) { const client = await createClient(`mock_client_${i}`) CLIENT_POOL.push(client) } // last 24h every 5s const last = 24 * 3600 * 1000 for (let ts = now - last; ts <= now; ts += STEP) { for (const client of CLIENT_POOL) { const mockData = generateMockData() const data = { ...mockData, id: client.options.clientId, ts, } client.publish('sensor/data', JSON.stringify(data)) } const dateStr = new Date(ts).toLocaleTimeString() console.log(`${dateStr} send success.`) await sleep(AWAIT) } console.log(`Done, use ${(Date.now() - now) / 1000}s`) } /** * Init a virtual mqtt client * @param {string} clientId ClientID */ function createClient(clientId) { return new Promise((resolve, reject) => { const client = mqtt.connect(EMQX_SERVER, { clientId, }) client.on('connect', () => { console.log(`client ${clientId} connected`) resolve(client) }) client.on('reconnect', () => { console.log('reconnect') }) client.on('error', (e) => { console.error(e) reject(e) }) }) } /** * Generate mock data */ function generateMockData() { return { "temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)), "humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)), "volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)), "PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)), "pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)), "SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "area": Mock.Random.integer(0, 20), "ts": 1596157444170, } }
組件安裝完成,模擬數據寫入成功后,按照 Grafana 可視化界面的操作指引,完成業務所需數據可視化配置。
Grafana 的 TDengine 數據源需要手動安裝插件,具體安裝范式以 TDengine 文檔為準。
添加數據源,即顯示的數據源信息。選取 TDengine 類型數據源,輸入連接參數進行配置,默認情況下,關鍵配置信息如下:
在 EMQ X Sample 倉庫獲取Grafana 儀表盤導出文件導入即可查看圖表示例。
添加好數據源后,添加需要顯示的數據儀表盤信息。儀表盤為多個可視化面板的集合,點擊 New Dashboard 后,選擇 + Query 通過查詢來添加數據面板。
創建面板需要四個步驟,分別是 Queries(查詢) 、 Visualization(可視化) 、 General(圖表配置) 、 Alert(告警) ,創建時間
使用 Grafana 的可視化查詢構建工具,查詢出所有設備的平均值。
以下 SQL 按照指定時間段($form $to)、指定時間間隔($interval),查詢出數據中關鍵指標的平均值:
select avg(temperature), avg(humidity), avg(volume), avg(PM10), avg(pm25), avg(SO2), avg(NO2), avg(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)
Visualization 默認不做更改, General 里面修改面板名稱為 歷史平均值,如果需要對業務進行監控告警,可以在 Alert 里編排告警規則,此處僅做可視化展示,不使用此功能。
完成創建后,點擊左上角返回按鈕,該 Dashboard 里成功添加一個數據面板。點擊頂部導航欄 保存 圖標,輸入 Dashboard 名稱完成 Dashboard 的創建。
繼續點擊 Dashboard 的 Add panel 按鈕,添加最大值、最小值圖表。操作步驟同添加平均值,僅對查詢中 SELECT 統計方法字段做出調整,調整為 AVG 函數為 MAX 與 MIN:
select max(temperature), max(humidity), max(volume), max(PM10), max(pm25), max(SO2), max(NO2), max(CO), min(temperature), min(humidity), min(volume), min(PM10), min(pm25), min(SO2), min(NO2), min(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)
保存儀表盤,拖拽調整每個數據面板大小、位置,最終得到一個視覺效果較好的數據儀表盤。儀表盤右上角可以選擇時間區間、自動刷新時間,此時設備持續發送數據采集數據,儀表盤數據值會有所變動,實現了比較好的可視化效果。
Q: 為什么 Grafana 中沒有圖標數據?
請拖動時間范圍,檢查、確保所選時段內有數據
Q: EMQ X 開源版和 EMQ X 企業版寫入 TDengine 功能上有什么區別?
開源版使用 Webhook + TDengine RESTful Connector,兩邊都有一定的性能損耗,最大寫入速度約為 700 條/秒
企業版使用 EMQ X 原生插件,能夠做到 20,000 條/秒寫入
Q: 規則執行了,但是寫入不了數據?
請檢查認證信息是否配置正確,請求頭、連接地址、端口等信息是否匹配 TDengin 版本
至此我們借助 EMQ X + TDengine 完成了物聯網數據傳輸、存儲、展現整個流程的系統搭建,讀者可以了解到 EMQ X 豐富的拓展能力與 TDengine 完備的大數據平臺特性在物聯網數據采集中的應用。深入學習掌握 Grafana 的其他功能后,用戶可以定制出更完善的數據可視化乃至監控告警系統。
感謝各位的閱讀,以上就是“EMQ X+TDengine怎么搭建MQTT物聯網可視化平臺”的內容了,經過本文的學習后,相信大家對EMQ X+TDengine怎么搭建MQTT物聯網可視化平臺這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。