chinese直男口爆体育生外卖, 99久久er热在这里只有精品99, 又色又爽又黄18禁美女裸身无遮挡, gogogo高清免费观看日本电视,私密按摩师高清版在线,人妻视频毛茸茸,91论坛 兴趣闲谈,欧美 亚洲 精品 8区,国产精品久久久久精品免费

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

如何在Python中使用MQTT

瑞科慧聯(lián)(RAK) ? 2022-12-22 10:41 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

Python 是一種跨平臺(tái)的計(jì)算機(jī)程序設(shè)計(jì)語言,是ABC 語言的替代品,屬于面向?qū)ο蟮膭?dòng)態(tài)類型語言。它最初被設(shè)計(jì)用于編寫自動(dòng)化腳本,隨著版本的不斷更新和語言新功能的添加,越來越多被用于獨(dú)立的、大型項(xiàng)目的開發(fā)。

MQTT 是一個(gè)物聯(lián)網(wǎng)傳輸協(xié)議,用于輕量級(jí)的發(fā)布/訂閱式消息傳輸,旨在為低帶寬和不穩(wěn)定的網(wǎng)絡(luò)環(huán)境中的物聯(lián)網(wǎng)設(shè)備提供可靠的網(wǎng)絡(luò)服務(wù)。其輕量、簡(jiǎn)單、開放和易于實(shí)現(xiàn)等特點(diǎn),使得它適用范圍更加廣泛。

本文主要介紹如何在 Python 項(xiàng)目中使用paho-mqtt客戶端庫(kù) ,實(shí)現(xiàn)客戶端與MQTT服務(wù)器的連接、訂閱、取消訂閱、收發(fā)消息等功能。

一、項(xiàng)目準(zhǔn)備

本項(xiàng)目使用 Python 3.10進(jìn)行開發(fā)測(cè)試。

用戶可用以下命令來確認(rèn) Python的版本:

python3 --version

Python 3.10.9

測(cè)試設(shè)備:

瑞科慧聯(lián)(RAK)網(wǎng)關(guān)RAK7268 V2、帶溫濕度傳感器的數(shù)據(jù)采集器Sensor Hub

二、選擇 MQTT 客戶端庫(kù)

paho-mqtt是目前 Python 中使用較多的 MQTT 客戶端庫(kù)。它為 Python 2.7 或 3.x 版本以上的客戶端類提供了對(duì) MQTT v3.1 和 v3.1.1 的支持,還提供了一些幫助程序功能。這使得消息發(fā)布到 MQTT 服務(wù)器變得更簡(jiǎn)單。

三、Pip 安裝 Paho MQTT 客戶端

Pip 是 Python 包管理工具。該工具提供了對(duì) Python 包的查找、下載、安裝、卸載的功能。

pip3install paho.mqtt

四、Python MQTT 使用

1、連接 MQTT 服務(wù)器

本文將使用瑞科慧聯(lián)LoRaWAN?網(wǎng)關(guān)提供的內(nèi)置 MQTT服務(wù),該服務(wù)基于 Mosquitto的開源消息代理。服務(wù)器接入信息如下:

  • Broker:192.168.230.1
  • TCP Port:1883

2、導(dǎo)入 Paho MQTT客戶端

from paho.mqtt import client as mqtt

3、設(shè)置 MQTT Broker 連接參數(shù)

設(shè)置 MQTT Broker 連接地址,端口以及 topic,同時(shí)調(diào)用 Pythonrandom.randint函數(shù)隨機(jī)生成 MQTT 客戶端 id。

MQTT_SERVER_IP ="192.168.230.1"

MQTT_PORT =1883

4、編寫 MQTT 連接函數(shù)

編寫連接回調(diào)函數(shù) on_connect,該函數(shù)將在客戶端連接后會(huì)被調(diào)用。在該函數(shù)中可以依據(jù)rc來判斷客戶端是否連接成功。同時(shí)可創(chuàng)建一個(gè) MQTT 客戶端連接到broker.emqx.io。

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務(wù)器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)

    mqttClient.on_message=on_message # 返回訂閱消息回調(diào)函數(shù)

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務(wù)器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務(wù)器賬號(hào)密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

5、發(fā)布消息

定義一個(gè) while 循環(huán)語句,在循環(huán)中設(shè)置每秒調(diào)用 MQTT 客戶端publish函數(shù)向/python/mqtt主題發(fā)送消息。

ddefon_publish():

    # 發(fā)布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/1/device/0000000000000444/tx'# 發(fā)布的主題,訂閱時(shí)需要使用這個(gè)主題才能訂閱此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}條消息發(fā)送成功'.format(msg_count))

        else:

            print('第{}條消息發(fā)送失敗'.format(msg_count))

        msg_count+=1

6、訂閱消息

編寫消息回調(diào)函數(shù) on_message,函數(shù)將在客戶端從 MQTT Broker 收到消息后被調(diào)用,并打印出訂閱的 topic 名稱以及接收到的消息內(nèi)容。

defon_subscribe():

    """訂閱主題:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        time.sleep(1)

7、完整代碼

消息訂閱代碼

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

網(wǎng)關(guān)通過mqtt發(fā)出數(shù)據(jù)

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦連接成功, 回調(diào)此方法"""

    rc_status= ["連接成功","協(xié)議版本錯(cuò)誤","無效的客戶端標(biāo)識(shí)","服務(wù)器無法使用","用戶名或密碼錯(cuò)誤","無授權(quán)"]

    print("connect:",rc_status[rc])

defon_message(client,userdata,msg):

    """一旦訂閱到消息, 回調(diào)此方法"""

    print("主題"+msg.topic +" 消息"+str(msg.payload.decode('gbk')))

    print("主題"+msg.topic +" 消息"+str(msg.payload.decode()))

    try:

        temp=json.loads(msg.payload.decode())

        # client.disconnect()

        deveui=temp['devEUI']

        print("devEUI: ",deveui)

        data=temp['data']

        print("解碼前的data為: ",data)

        data_decode=base64.b64decode(data).hex()

        print("解碼后的data為: ",data_decode)

        str1=data_decode[4:]

        ifstr1[0:4]=="0167":

            a=int(str1[4:8],16)*0.1 

            print("溫度:",a,"℃")

            ifstr1[8:12]=="0268":

               b=int(str1[12:16],16)

            print("濕度:",b,"%RH")

        elifstr1[0:4]=="0268":

            c=int(str1[4:8],16)

            print("濕度:",c,"%RH")                       

    exceptExceptionase:

        print(e)

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務(wù)器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)

    mqttClient.on_message=on_message # 返回訂閱消息回調(diào)函數(shù)

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務(wù)器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務(wù)器賬號(hào)密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

defon_subscribe():

    """訂閱主題:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        # allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")

        # mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)

        time.sleep(1)

if__name__=='__main__':

    on_subscribe()

消息發(fā)布代碼

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

網(wǎng)關(guān)通過mqtt發(fā)出數(shù)據(jù)

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦連接成功, 回調(diào)此方法"""

    rc_status= ["連接成功","協(xié)議版本錯(cuò)誤","無效的客戶端標(biāo)識(shí)","服務(wù)器無法使用","用戶名或密碼錯(cuò)誤","無授權(quán)"]

    print("connect:",rc_status[rc])

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務(wù)器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務(wù)器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務(wù)器賬號(hào)密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

defon_publish():

    # 發(fā)布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/x/device/x/tx'# 發(fā)布的主題,訂閱時(shí)需要使用這個(gè)主題才能訂閱此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'#需要發(fā)布的消息內(nèi)容

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}條消息發(fā)送成功'.format(msg_count))

        else:

            print('第{}條消息發(fā)送失敗'.format(msg_count))

        msg_count+=1

if__name__=='__main__':

    on_publish()

測(cè)試

消息發(fā)布

運(yùn)行 MQTT消息發(fā)布代碼,將看到客戶端連接成功,并且成功將消息發(fā)布。

pYYBAGOjwVmAR1KUAAApM_Y0F48108.png

消息訂閱

通過瑞科慧聯(lián)帶溫濕度傳感器的 Sensor hub進(jìn)行數(shù)據(jù)傳輸,訂閱并解析數(shù)據(jù)結(jié)果如下:

poYBAGOjwVmAdS2hAABgCqVnG0E194.png

五、總結(jié)

至此,我們完成了使用paho-mqtt客戶端連接到LoRaWAN?網(wǎng)關(guān)內(nèi)置 MQTT服務(wù)器,并實(shí)現(xiàn)了測(cè)試客戶端與 MQTT 服務(wù)器的連接、消息發(fā)布和訂閱并解析。

與 C ++ 或 Java 之類的高級(jí)語言不同,Python 比較適合設(shè)備側(cè)的業(yè)務(wù)邏輯實(shí)現(xiàn)。使用 Python 可以減少代碼上的邏輯復(fù)雜度,降低與設(shè)備的交互成本。未來,我們相信在物聯(lián)網(wǎng)領(lǐng)域 Python 將會(huì)有更廣泛的應(yīng)用!

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 物聯(lián)網(wǎng)
    +關(guān)注

    關(guān)注

    2939

    文章

    47265

    瀏覽量

    407020
  • python
    +關(guān)注

    關(guān)注

    57

    文章

    4855

    瀏覽量

    89517
  • MQTT
    +關(guān)注

    關(guān)注

    5

    文章

    717

    瀏覽量

    24755
收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評(píng)論

    相關(guān)推薦
    熱點(diǎn)推薦

    何在AMD Vitis Unified IDE中使用系統(tǒng)設(shè)備樹

    您將在這篇博客中了解系統(tǒng)設(shè)備樹 (SDT) 以及如何在 AMD Vitis Unified IDE 中使用 SDT 維護(hù)來自 XSA 的硬件元數(shù)據(jù)。本文還講述了如何對(duì) SDT 進(jìn)行操作,以便在 Vitis Unified IDE 中實(shí)現(xiàn)更靈活的使用場(chǎng)景。
    的頭像 發(fā)表于 11-18 11:13 ?2771次閱讀
    如<b class='flag-5'>何在</b>AMD Vitis Unified IDE<b class='flag-5'>中使</b>用系統(tǒng)設(shè)備樹

    物聯(lián)網(wǎng)MQTT網(wǎng)關(guān)是什么

    物聯(lián)網(wǎng)MQTT網(wǎng)關(guān)是一種采用MQTT物聯(lián)網(wǎng)協(xié)議的智能設(shè)備或軟件組件,其核心功能是連接不同通信協(xié)議的物聯(lián)網(wǎng)設(shè)備與消息代理服務(wù)器,實(shí)現(xiàn)設(shè)備間的數(shù)據(jù)交換與集中管理,同時(shí)支持邊緣計(jì)算、安全防護(hù)和協(xié)議轉(zhuǎn)換
    的頭像 發(fā)表于 08-29 15:24 ?614次閱讀

    請(qǐng)問如何在 Keil μVision 或 IAR EWARM 中使用觀察點(diǎn)進(jìn)行調(diào)試?

    何在 Keil μVision 或 IAR EWARM 中使用觀察點(diǎn)進(jìn)行調(diào)試?
    發(fā)表于 08-20 06:29

    第二十三章 W55MH32 MQTT_OneNET示例

    本文講解了如何在 W55MH32?芯片上實(shí)現(xiàn) MQTT?協(xié)議并連接 OneNET?平臺(tái),通過實(shí)戰(zhàn)例程展示了從準(zhǔn)備工作、連接配置到消息訂閱、發(fā)布及接收處理的完整過程。文章詳細(xì)介紹了 MQTT?協(xié)議
    的頭像 發(fā)表于 07-24 14:59 ?668次閱讀
    第二十三章 W55MH32 <b class='flag-5'>MQTT</b>_OneNET示例

    精通 MQTT:消息隊(duì)列遙測(cè)傳輸指南!

    ,解釋了其關(guān)鍵組件,并演示了如何使用Python實(shí)現(xiàn)MQTT客戶端。MQTT代理MQTT系統(tǒng)的核心是代理,它負(fù)責(zé)管理客戶端之間的消息交換。MQTT
    的頭像 發(fā)表于 06-16 16:56 ?814次閱讀
    精通 <b class='flag-5'>MQTT</b>:消息隊(duì)列遙測(cè)傳輸指南!

    何在MQTT中發(fā)布和訂閱實(shí)體

    MQTT中發(fā)布和訂閱實(shí)體(主題)是MQTT通信的核心操作,下面將詳細(xì)介紹其原理、步驟以及示例代碼,幫助你全面理解這一過程。 一、MQTT發(fā)布與訂閱的基本概念 發(fā)布(Publish):客戶端將
    的頭像 發(fā)表于 05-20 17:21 ?965次閱讀

    ?如何在虛擬環(huán)境中使Python,提升你的開發(fā)體驗(yàn)~

    RaspberryPiOS預(yù)裝了Python,你需要使用其虛擬環(huán)境來安裝包。今天出版的最新一期《TheMagPi》雜志刊登了我們文檔負(fù)責(zé)人NateContino撰寫的一篇實(shí)用教程,幫助你入門
    的頭像 發(fā)表于 03-25 09:34 ?618次閱讀
    ?如<b class='flag-5'>何在</b>虛擬環(huán)境<b class='flag-5'>中使</b>用 <b class='flag-5'>Python</b>,提升你的開發(fā)體驗(yàn)~

    零基礎(chǔ)入門:如何在樹莓派上編寫和運(yùn)行Python程序?

    在這篇文章中,我將為你簡(jiǎn)要介紹Python程序是什么、Python程序可以用來做什么,以及如何在RaspberryPi上編寫和運(yùn)行一個(gè)簡(jiǎn)單的Python程序。什么是
    的頭像 發(fā)表于 03-25 09:27 ?1498次閱讀
    零基礎(chǔ)入門:如<b class='flag-5'>何在</b>樹莓派上編寫和運(yùn)行<b class='flag-5'>Python</b>程序?

    MQTT物聯(lián)網(wǎng)平臺(tái)有哪些?有哪些功能?

    MQTT(Message Queuing Telemetry Transport)是一種基于客戶端-服務(wù)器架構(gòu)的發(fā)布/訂閱模式的消息傳輸協(xié)議,它廣泛應(yīng)用于機(jī)器與機(jī)器的通信(M2M)以及物聯(lián)網(wǎng)環(huán)境
    的頭像 發(fā)表于 03-15 14:23 ?1195次閱讀
    <b class='flag-5'>MQTT</b>物聯(lián)網(wǎng)平臺(tái)有哪些?有哪些功能?

    創(chuàng)建了用于OpenVINO?推理的自定義C++和Python代碼,從C++代碼中獲得的結(jié)果與Python代碼不同是為什么?

    創(chuàng)建了用于OpenVINO?推理的自定義 C++ 和 Python* 代碼。 在兩個(gè)推理過程中使用相同的圖像和模型。 從 C++ 代碼中獲得的結(jié)果與 Python* 代碼不同。
    發(fā)表于 03-06 06:22

    何在USB視頻類(UVC)框架中使用EZ-USB?FX3實(shí)現(xiàn)圖像傳感器接口USB視頻類(UVC)

    電子發(fā)燒友網(wǎng)站提供《如何在USB視頻類(UVC)框架中使用EZ-USB?FX3實(shí)現(xiàn)圖像傳感器接口USB視頻類(UVC).pdf》資料免費(fèi)下載
    發(fā)表于 02-28 17:36 ?2次下載

    何在MATLAB中使用DeepSeek模型

    在 DeepSeek-R1(https://github.com/deepseek-ai/DeepSeek-R1) AI 模型橫空出世后,人們幾乎就立馬開始詢問如何在 MATLAB 中使用這些模型
    的頭像 發(fā)表于 02-13 09:20 ?4024次閱讀
    如<b class='flag-5'>何在</b>MATLAB<b class='flag-5'>中使</b>用DeepSeek模型

    MQTT測(cè)試程序上機(jī)實(shí)驗(yàn)

    mqtt_test、paho.mqtt.c.tar.bz2放到Ubuntu上同一個(gè)目錄下。
    的頭像 發(fā)表于 02-11 13:35 ?1074次閱讀
    <b class='flag-5'>MQTT</b>測(cè)試程序上機(jī)實(shí)驗(yàn)

    使用Python實(shí)現(xiàn)xgboost教程

    使用Python實(shí)現(xiàn)XGBoost模型通常涉及以下幾個(gè)步驟:數(shù)據(jù)準(zhǔn)備、模型訓(xùn)練、模型評(píng)估和模型預(yù)測(cè)。以下是一個(gè)詳細(xì)的教程,指導(dǎo)你如何在Python中使用XGBoost。 1. 安裝XG
    的頭像 發(fā)表于 01-19 11:21 ?2185次閱讀

    何在Windows中使用MTP協(xié)議

    、圖片等)的通信協(xié)議,它被廣泛用于Android設(shè)備。以下是如何在Windows中使用MTP協(xié)議的詳細(xì)步驟: 1. 確保設(shè)備支持MTP 首先,你需要確認(rèn)你的設(shè)備支持MTP協(xié)議。大多數(shù)現(xiàn)代Android
    的頭像 發(fā)表于 01-03 10:26 ?4354次閱讀