91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python MQTT客戶端怎么使用

發布時間:2021-11-25 13:52:45 來源:億速云 閱讀:158 作者:iii 欄目:互聯網科技

本篇內容介紹了“Python MQTT客戶端怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

paho-mqtt

paho-mqtt 可以說是 Python MQTT 開源客戶端庫中的佼佼者。它由 Eclipse 基金會主導開發,除了 Python 庫以外,同樣支持各大主流的編程語言,比如 C++、Java、JavaScript、Golang 等。目前 Python 版本已經實現了 3.1 和 3.1.1 MQTT 協議,在最新開發版中實現了 MQTT 5.0。

在基金會的支持下,以每年一個版本的速度更新,本文發布時的最新版本為 1.5.0(于 2019 年 8 月發布)。

在 GitHub 主頁上,它提供了從入門的快速實現到每一個函數的詳細解讀,涵蓋了從初學者到高級使用者需要了解的各個部分。即使遇到超出范圍的問題,在 Google 上搜索,可以得到近 20 萬個相關詞條,是目前最為流行的 MQTT 客戶端。

得到如此多的關注度,除了穩定的代碼外,還有其易用性。Paho 的接口使用非常簡單優雅,您只需要少量的代碼就能實現 MQTT 的訂閱及消息發布。

安裝

pip3 install paho-mqtt

或者

git clone https://github.com/eclipse/paho.mqtt.python
cd paho.mqtt.python
python3 setup.py install

訂閱者

import paho.mqtt.client as mqtt

# 連接的回調函數
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe("$SYS/#")
    
# 收到消息的回調函數
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
    
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("broker.emqx.io", 1883, 60)
client.loop_forever()

發布者

import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    
client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)
for i in range(3):
    client.publish('a/b', payload=i, qos=0, retain=False)
    print(f"send {i} to a/b")
    time.sleep(1)

client.loop_forever()

甚至,你可以通過一行代碼,實現訂閱、發布。

import paho.mqtt.subscribe as subscribe

# 當調用這個函數時,程序會堵塞在這里,直到有一條消息發送到 paho/test/simple 主題
msg = subscribe.simple("paho/test/simple", hostname="broker.emqx.io")
print(f"{msg.topic} {msg.payload}")
import paho.mqtt.publish as publish

# 發送一條消息
publish.single("a/b", "payload", hostname="broker.emqx.io")
# 或者一次發送多個消息
msgs = [{'topic':"a/b", 'payload':"multiple 1"}, ("a/b", "multiple 2", 0, False)]
publish.multiple(msgs, hostname="broker.emqx.io")

HBMQTT

HBMQTT 基于 Python asyncio 開發,僅支持 3.1.1 的 MQTT 協議。由于使用 asyncio 庫,開發者需要使用 3.4 以上的 Python 版本。

CPU 的速度遠遠快于磁盤、網絡等 IO 操作,而在一個線程中,無論 CPU 執行得再快,遇到 IO 操作時,都得停下來等待讀寫完成,這無疑浪費了許多時間。

為了解決這個問題,Python 加入了異步 IO 的特性。在 Python 3.4 中,正式將 asyncio 納入標準庫中,并在 Python 3.5 中,加入了 async/await 關鍵字。用戶可以很輕松的使用在函數前加入 async 關鍵字,使函數變成異步函數。

HBMQTT 便是建立在 asyncio 標準庫之上。它允許用戶顯示的設置異步斷點,通過異步 IO,MQTT 客戶端在收取消息或發送消息時,掛載當前的任務,繼續處理下一個。

不過 HBMQTT 的知名度卻小得多。在 Google 上搜索,關于 HBMQTT 僅有 6000 多個詞條,在 Stack Overflow 上只有 10 個提問數。這就意味著,如果選擇 HBMQTT 的話你需要很強的解決問題的能力。

有意思的是,HBMQTT 本身也是一個 MQTT 服務器。你可以通過 hbmqtt 命令一鍵開啟。

$ hbmqtt
[2020-08-28 09:35:56,608] :: INFO - Exited state new
[2020-08-28 09:35:56,608] :: INFO - Entered state starting
[2020-08-28 09:35:56,609] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connections=-1)

安裝

pip3 install hbmqtt

或者

git clone https://github.com/beerfactory/hbmqtt
cd hbmqtt
python3 setup.py install

訂閱者

import logging
import asyncio
from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1, QOS_2

async def uptime_coro():
    C = MQTTClient()
    await C.connect('mqtt://broker.emqx.io/')
    await C.subscribe([
            ('$SYS/broker/uptime', QOS_1),
            ('$SYS/broker/load/#', QOS_2),
         ])
    try:
        for i in range(1, 100):
            message = await C.deliver_message()
            packet = message.publish_packet
            print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")
        await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
        await C.disconnect()
    except ClientException as ce:
        logging.error("Client exception: %s" % ce)
        
if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    asyncio.get_event_loop().run_until_complete(uptime_coro())

發布者

import logging
import asyncio
import time
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2

async def test_coro():
    C = MQTTClient()
    await  C.connect('mqtt://broker.emqx.io/')
    tasks = [
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)),
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)),
    ]
    await asyncio.wait(tasks)
    logging.info("messages published")
    await C.disconnect()
    
if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    asyncio.get_event_loop().run_until_complete(test_coro())

更多使用細節情參考官方文檔:https://hbmqtt.readthedocs.io/en/latest/。

gmqtt

gmqtt 是由個人開發者開源的客戶端庫。默認支持 MQTT 5.0 協議,如果連接的 MQTT 代理不支持 5.0 協議,則會降級到 3.1 并重新進行連接。

相較于前兩者,gmqtt 還屬于初級開發階段,本文發布時的版本號是 0.6.7。但它是早期支持 MQTT 5.0 的 Python 庫之一,因此在網絡上知名度尚可。

同樣,它建立在 asyncio 庫上,因此需要使用 Python 3.4 以上的版本。

安裝

pip3 install gmqtt

或者

git clone https://github.com/wialon/gmqtt
cd gmqtt
python3 setup.py install

訂閱者

import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    print('Connected')
    
def on_message(client, topic, payload, qos, properties):
    print(f'RECV MSG: {topic} {payload}')
    
def on_subscribe(client, mid, qos, properties):
    print('SUBSCRIBED')
    
def on_disconnect(client, packet, exc=None):
    print('Disconnected')
    
def ask_exit(*args):
    STOP.set()

async def main(broker_host):
    client = MQTTClient("client-id")
    
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_subscribe = on_subscribe
    client.on_disconnect = on_disconnect
    
    # 連接 MQTT 代理
    await client.connect(broker_host)
    
    # 訂閱主題
    client.subscribe('TEST/#')
    
    # 發送測試數據
    client.publish("TEST/A", 'AAA')
    client.publish("TEST/B", 'BBB')
    
    await STOP.wait()
    await client.disconnect()
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    
    loop.add_signal_handler(signal.SIGINT, ask_exit)
    loop.add_signal_handler(signal.SIGTERM, ask_exit)

    host = 'broker.emqx.io'
    loop.run_until_complete(main(host))

發布者

import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    print('Connected')
    client.subscribe('TEST/#', qos=0)
    
def on_message(client, topic, payload, qos, properties):
    print(f'RECV MSG: {topic}, {payload}')
    
def on_disconnect(client, packet, exc=None):
    print('Disconnected')
    
def ask_exit(*args):
    STOP.set()
    
async def main(broker_host):
    client = MQTTClient("client-id")
    
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    
    await client.connect(broker_host)
    
    client.publish('TEST/TIME', str(time.time()), qos=1)
    
    await STOP.wait()
    await client.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    
    loop.add_signal_handler(signal.SIGINT, ask_exit)
    loop.add_signal_handler(signal.SIGTERM, ask_exit)
    
    host = 'broker.emqx.io'  
    loop.run_until_complete(main(host))

如何選擇

在介紹完這三款 Python MQTT 客戶端庫之后,我們再來看看如何為自己選擇合適的 MQTT 客戶端庫。這三個客戶端各有自己的優缺點:

paho-mqtt 有著最優秀的文檔,代碼風格易于理解,同時有著強大的基金會支持,但目前文檔的版本還不支持 MQTT 5.0。

HBMQTT 使用 asyncio 庫實現,可以優化網絡 I/O 帶來的延遲。但是代碼風格不友好,同樣不支持 MQTT 5.0。

gmqtt 同樣通過 asyncio 庫實現,相比 HBMQTT ,代碼風格友好,最重要的是,它支持 MQTT 5.0。但開發進程慢,未來前景不明。

因此,在選擇時,您可以參考一下的思路:

  • 如果您是正常開發,想要將其運用在生產環境中,paho-mqtt 無疑是最好的選擇,其穩定性和代碼易讀性遠遠超過其它兩個庫。在遇到問題時,優秀的文檔和互聯網上大量的詞條,也能幫您找到更多的解決方案。

  • 對于熟練使用 asyncio 庫的讀者,不妨嘗試一下 HBMQTT 和 gmqtt。

  • 如果您想要學習、參與開源項目或者使用 MQTT 5.0, 則不妨試用一下 gmqtt,并嘗試為其共享一份代碼吧。

“Python MQTT客戶端怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

成武县| 嘉义市| 平罗县| 贵南县| 泰顺县| 南京市| 海晏县| 沙雅县| 筠连县| 定兴县| 泾阳县| 翁源县| 喀喇| 谷城县| 南陵县| 太保市| 阳春市| 阿拉善右旗| 遂宁市| 库车县| 博野县| 磐石市| 西乌珠穆沁旗| 工布江达县| 舞钢市| 龙山县| 朝阳区| 土默特右旗| 瑞丽市| 中江县| 荥经县| 缙云县| 锡林郭勒盟| 武定县| 德昌县| 芦溪县| 杭锦旗| 师宗县| 聂拉木县| 上蔡县| 桓仁|