您好,登錄后才能下訂單哦!
這篇文章主要介紹了在Python中應該如何使用MQT,具有一定借鑒價值,需要的朋友可以參考下。希望大家閱讀完這篇文章后大有收獲。下面讓小編帶著大家一起了解一下。
Python 是一種廣泛使用的解釋型、高級編程、通用型編程語言。Python 的設計哲學強調代碼的可讀性和簡潔的語法(尤其是使用空格縮進劃分代碼塊,而非使用大括號或者關鍵詞)。Python 讓開發者能夠用更少的代碼表達想法,不管是小型還是大型程序,該語言都試圖讓程序的結構清晰明了。
MQTT 是一種基于發布/訂閱模式的 輕量級物聯網消息傳輸協議 ,可以用極少的代碼和帶寬為聯網設備提供實時可靠的消息服務,它廣泛應用于物聯網、移動互聯網、智能硬件、車聯網、電力能源等行業。
本文主要介紹如何在 Python 項目中使用 paho-mqtt 客戶端庫 ,實現客戶端與 MQTT 服務器的連接、訂閱、取消訂閱、收發消息等功能。
本項目使用 Python 3.6 進行開發測試,讀者可用如下命令確認 Python 的版本。
➜ ~ python3 --version Python 3.6.7
paho-mqtt 是目前 Python 中使用較多的 MQTT 客戶端庫,它在 Python 2.7 或 3.x 上為客戶端類提供了對 MQTT v3.1 和 v3.1.1 的支持。它還提供了一些幫助程序功能,使將消息發布到 MQTT 服務器變得非常簡單。
Pip 是 Python 包管理工具,該工具提供了對 Python 包的查找、下載、安裝、卸載的功能。
pip3 install -i https://pypi.doubanio.com/simple paho-mqtt
本文將使用 EMQ X 提供的 免費公共 MQTT 服務器 ,該服務基于 EMQ X 的 MQTT 物聯網云平臺 創建。服務器接入信息如下:
from paho.mqtt import client as mqtt_client
設置 MQTT Broker 連接地址,端口以及 topic,同時我們調用 Python random.randint 函數隨機生成 MQTT 客戶端 id。
broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" client_id = f'python-mqtt-{random.randint(0, 1000)}'
編寫連接回調函數 on_connect ,該函數將在客戶端連接后被調用,在該函數中可以依據 rc 來判斷客戶端是否連接成功。通常同時我們將創建一個 MQTT 客戶端,該客戶端將連接到 broker.emqx.io 。
def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) # Set Connecting Client ID client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client
首先定義一個 while 循環語句,在循環中我們將設置每秒調用 MQTT 客戶端 publish 函數向 /python/mqtt 主題發送消息。
def publish(client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic, msg) # result: [0, 1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1
編寫消息回調函數 on_message ,該函數將在客戶端從 MQTT Broker 收到消息后被調用,在該函數中我們將打印出訂閱的 topic 名稱以及接收到的消息內容。
def subscribe(client: mqtt_client): def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) client.on_message = on_message
# python 3.6 import random import time from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 1000)}' def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def publish(client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic, msg) # result: [0, 1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 def run(): client = connect_mqtt() client.loop_start() publish(client) if __name__ == '__main__': run()
# python 3.6 import random import time from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 1000)}' def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def publish(client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic, msg) # result: [0, 1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 def run(): client = connect_mqtt() client.loop_start() publish(client) if __name__ == '__main__': run() 消息訂閱代碼 # python3.6 import random from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 100)}' def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def subscribe(client: mqtt_client): def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) client.on_message = on_message def run(): client = connect_mqtt() subscribe(client) client.loop_forever() if __name__ == '__main__': run()
運行 MQTT 消息發布代碼,我們將看到客戶端連接成功,并且成功將消息發布。
python3 pub.py
運行 MQTT 消息訂閱代碼,我們將看到客戶端連接成功,并且成功接收到發布的消息。
python3 sub.py
至此,我們完成了使用 paho-mqtt 客戶端連接到 公共 MQTT 服務器 ,并實現了測試客戶端與 MQTT 服務器的連接、消息發布和訂閱。
與 C ++ 或 Java 之類的高級語言不同,Python 比較適合設備側的業務邏輯實現,使用 Python 您可以減少代碼上的邏輯復雜度,降低與設備的交互成本。我們相信在物聯網領域 Python 將會有更廣泛的應用。
接下來我們將會陸續發布更多關于物聯網開發及 Python 的相關文章,敬請關注。
感謝你能夠認真閱讀完這篇文章,希望小編分享在Python中應該如何使用MQT內容對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,遇到問題就找億速云,詳細的解決方法等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。