您好,登錄后才能下訂單哦!
本篇內容介紹了“Python MQTT客戶端怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
paho-mqtt 可以說是 Python MQTT 開源客戶端庫中的佼佼者。它由 Eclipse 基金會主導開發,除了 Python 庫以外,同樣支持各大主流的編程語言,比如 C++、Java、JavaScript、Golang 等。目前 Python 版本已經實現了 3.1 和 3.1.1 MQTT 協議,在最新開發版中實現了 MQTT 5.0。
在基金會的支持下,以每年一個版本的速度更新,本文發布時的最新版本為 1.5.0(于 2019 年 8 月發布)。
在 GitHub 主頁上,它提供了從入門的快速實現到每一個函數的詳細解讀,涵蓋了從初學者到高級使用者需要了解的各個部分。即使遇到超出范圍的問題,在 Google 上搜索,可以得到近 20 萬個相關詞條,是目前最為流行的 MQTT 客戶端。
得到如此多的關注度,除了穩定的代碼外,還有其易用性。Paho 的接口使用非常簡單優雅,您只需要少量的代碼就能實現 MQTT 的訂閱及消息發布。
pip3 install paho-mqtt
或者
git clone https://github.com/eclipse/paho.mqtt.python cd paho.mqtt.python python3 setup.py install
import paho.mqtt.client as mqtt # 連接的回調函數 def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") client.subscribe("$SYS/#") # 收到消息的回調函數 def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect("broker.emqx.io", 1883, 60) client.loop_forever()
import paho.mqtt.client as mqtt import time def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") client = mqtt.Client() client.on_connect = on_connect client.connect("broker.emqx.io", 1883, 60) for i in range(3): client.publish('a/b', payload=i, qos=0, retain=False) print(f"send {i} to a/b") time.sleep(1) client.loop_forever()
甚至,你可以通過一行代碼,實現訂閱、發布。
import paho.mqtt.subscribe as subscribe # 當調用這個函數時,程序會堵塞在這里,直到有一條消息發送到 paho/test/simple 主題 msg = subscribe.simple("paho/test/simple", hostname="broker.emqx.io") print(f"{msg.topic} {msg.payload}")
import paho.mqtt.publish as publish # 發送一條消息 publish.single("a/b", "payload", hostname="broker.emqx.io") # 或者一次發送多個消息 msgs = [{'topic':"a/b", 'payload':"multiple 1"}, ("a/b", "multiple 2", 0, False)] publish.multiple(msgs, hostname="broker.emqx.io")
HBMQTT 基于 Python asyncio 開發,僅支持 3.1.1 的 MQTT 協議。由于使用 asyncio 庫,開發者需要使用 3.4 以上的 Python 版本。
CPU 的速度遠遠快于磁盤、網絡等 IO 操作,而在一個線程中,無論 CPU 執行得再快,遇到 IO 操作時,都得停下來等待讀寫完成,這無疑浪費了許多時間。
為了解決這個問題,Python 加入了異步 IO 的特性。在 Python 3.4 中,正式將 asyncio 納入標準庫中,并在 Python 3.5 中,加入了 async/await 關鍵字。用戶可以很輕松的使用在函數前加入 async 關鍵字,使函數變成異步函數。
HBMQTT 便是建立在 asyncio 標準庫之上。它允許用戶顯示的設置異步斷點,通過異步 IO,MQTT 客戶端在收取消息或發送消息時,掛載當前的任務,繼續處理下一個。
不過 HBMQTT 的知名度卻小得多。在 Google 上搜索,關于 HBMQTT 僅有 6000 多個詞條,在 Stack Overflow 上只有 10 個提問數。這就意味著,如果選擇 HBMQTT 的話你需要很強的解決問題的能力。
有意思的是,HBMQTT 本身也是一個 MQTT 服務器。你可以通過 hbmqtt 命令一鍵開啟。
$ hbmqtt [2020-08-28 09:35:56,608] :: INFO - Exited state new [2020-08-28 09:35:56,608] :: INFO - Entered state starting [2020-08-28 09:35:56,609] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connections=-1)
pip3 install hbmqtt
或者
git clone https://github.com/beerfactory/hbmqtt cd hbmqtt python3 setup.py install
import logging import asyncio from hbmqtt.client import MQTTClient, ClientException from hbmqtt.mqtt.constants import QOS_1, QOS_2 async def uptime_coro(): C = MQTTClient() await C.connect('mqtt://broker.emqx.io/') await C.subscribe([ ('$SYS/broker/uptime', QOS_1), ('$SYS/broker/load/#', QOS_2), ]) try: for i in range(1, 100): message = await C.deliver_message() packet = message.publish_packet print(f"{i}: {packet.variable_header.topic_name} => {packet.payload.data}") await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#']) await C.disconnect() except ClientException as ce: logging.error("Client exception: %s" % ce) if __name__ == '__main__': formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) asyncio.get_event_loop().run_until_complete(uptime_coro())
import logging import asyncio import time from hbmqtt.client import MQTTClient from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 async def test_coro(): C = MQTTClient() await C.connect('mqtt://broker.emqx.io/') tasks = [ asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)), asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)), asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)), ] await asyncio.wait(tasks) logging.info("messages published") await C.disconnect() if __name__ == '__main__': formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) asyncio.get_event_loop().run_until_complete(test_coro())
更多使用細節情參考官方文檔:https://hbmqtt.readthedocs.io/en/latest/。
gmqtt 是由個人開發者開源的客戶端庫。默認支持 MQTT 5.0 協議,如果連接的 MQTT 代理不支持 5.0 協議,則會降級到 3.1 并重新進行連接。
相較于前兩者,gmqtt 還屬于初級開發階段,本文發布時的版本號是 0.6.7。但它是早期支持 MQTT 5.0 的 Python 庫之一,因此在網絡上知名度尚可。
同樣,它建立在 asyncio 庫上,因此需要使用 Python 3.4 以上的版本。
pip3 install gmqtt
或者
git clone https://github.com/wialon/gmqtt cd gmqtt python3 setup.py install
import asyncio import os import signal import time from gmqtt import Client as MQTTClient STOP = asyncio.Event() def on_connect(client, flags, rc, properties): print('Connected') def on_message(client, topic, payload, qos, properties): print(f'RECV MSG: {topic} {payload}') def on_subscribe(client, mid, qos, properties): print('SUBSCRIBED') def on_disconnect(client, packet, exc=None): print('Disconnected') def ask_exit(*args): STOP.set() async def main(broker_host): client = MQTTClient("client-id") client.on_connect = on_connect client.on_message = on_message client.on_subscribe = on_subscribe client.on_disconnect = on_disconnect # 連接 MQTT 代理 await client.connect(broker_host) # 訂閱主題 client.subscribe('TEST/#') # 發送測試數據 client.publish("TEST/A", 'AAA') client.publish("TEST/B", 'BBB') await STOP.wait() await client.disconnect() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, ask_exit) loop.add_signal_handler(signal.SIGTERM, ask_exit) host = 'broker.emqx.io' loop.run_until_complete(main(host))
import asyncio import os import signal import time from gmqtt import Client as MQTTClient STOP = asyncio.Event() def on_connect(client, flags, rc, properties): print('Connected') client.subscribe('TEST/#', qos=0) def on_message(client, topic, payload, qos, properties): print(f'RECV MSG: {topic}, {payload}') def on_disconnect(client, packet, exc=None): print('Disconnected') def ask_exit(*args): STOP.set() async def main(broker_host): client = MQTTClient("client-id") client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect await client.connect(broker_host) client.publish('TEST/TIME', str(time.time()), qos=1) await STOP.wait() await client.disconnect() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, ask_exit) loop.add_signal_handler(signal.SIGTERM, ask_exit) host = 'broker.emqx.io' loop.run_until_complete(main(host))
在介紹完這三款 Python MQTT 客戶端庫之后,我們再來看看如何為自己選擇合適的 MQTT 客戶端庫。這三個客戶端各有自己的優缺點:
paho-mqtt 有著最優秀的文檔,代碼風格易于理解,同時有著強大的基金會支持,但目前文檔的版本還不支持 MQTT 5.0。
HBMQTT 使用 asyncio 庫實現,可以優化網絡 I/O 帶來的延遲。但是代碼風格不友好,同樣不支持 MQTT 5.0。
gmqtt 同樣通過 asyncio 庫實現,相比 HBMQTT ,代碼風格友好,最重要的是,它支持 MQTT 5.0。但開發進程慢,未來前景不明。
因此,在選擇時,您可以參考一下的思路:
如果您是正常開發,想要將其運用在生產環境中,paho-mqtt 無疑是最好的選擇,其穩定性和代碼易讀性遠遠超過其它兩個庫。在遇到問題時,優秀的文檔和互聯網上大量的詞條,也能幫您找到更多的解決方案。
對于熟練使用 asyncio 庫的讀者,不妨嘗試一下 HBMQTT 和 gmqtt。
如果您想要學習、參與開源項目或者使用 MQTT 5.0, 則不妨試用一下 gmqtt,并嘗試為其共享一份代碼吧。
“Python MQTT客戶端怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。