国产精品免费无遮挡无码永久视频-国产高潮视频在线观看-精品久久国产字幕高潮-国产精品99精品无码视亚

電子工程網(wǎng)

標(biāo)題: EMQX + IoTDB:存儲(chǔ) MQTT 消息到時(shí)序數(shù)據(jù)庫(kù) [打印本頁(yè)]

作者: 洞察基    時(shí)間: 2022-7-22 15:16
標(biāo)題: EMQX + IoTDB:存儲(chǔ) MQTT 消息到時(shí)序數(shù)據(jù)庫(kù)


IoTDB 是最早由清華大學(xué)發(fā)起的開(kāi)源時(shí)序數(shù)據(jù)庫(kù)項(xiàng)目,現(xiàn)已經(jīng)是 Apache 的頂級(jí)項(xiàng)目。IoTDB 可以為用戶提供數(shù)據(jù)收集、存儲(chǔ)和分析等服務(wù)。由于其輕量級(jí)架構(gòu)、高性能和高可用的特性,以及與 Hadoop 和 Spark 生態(tài)的無(wú)縫集成,滿足了工業(yè) IoT 領(lǐng)域中海量數(shù)據(jù)存儲(chǔ)、高吞吐量數(shù)據(jù)寫(xiě)入和復(fù)雜數(shù)據(jù)查詢分析的需求。
EMQX 是一個(gè)大規(guī)模擴(kuò)展、可彈性伸縮的開(kāi)源云原生分布式物聯(lián)網(wǎng)消息中間件,由開(kāi)源物聯(lián)網(wǎng)數(shù)據(jù)基礎(chǔ)設(shè)施軟件供應(yīng)商 EMQ 映云科技 發(fā)布。EMQX 可以高效可靠地處理海量物聯(lián)網(wǎng)設(shè)備的并發(fā)連接,并且內(nèi)置了強(qiáng)大的規(guī)則引擎功能,用以對(duì)事件和消息流數(shù)據(jù)進(jìn)行高性能地實(shí)時(shí)處理。規(guī)則引擎通過(guò) SQL 語(yǔ)句提供了靈活的 "配置式" 的業(yè)務(wù)集成方案,簡(jiǎn)化了業(yè)務(wù)開(kāi)發(fā)流程,提升了易用性,降低了用戶的業(yè)務(wù)邏輯與 EMQX 的耦合度。
本文將介紹如何使用 EMQX 規(guī)則引擎的 MQTT 數(shù)據(jù)橋接功能,接收 MQTT 客戶端發(fā)送的數(shù)據(jù),并實(shí)時(shí)插入到時(shí)序數(shù)據(jù)庫(kù) IoTDB。
準(zhǔn)備工作
本文示例中用到的軟件和環(huán)境:
IoTDB 安裝
首先我們需要從 IoTDB 官方頁(yè)面下載 IoTDB Server(單機(jī)版)的二進(jìn)制包。
下載完成之后解壓,進(jìn)入解壓后的目錄:
% lsLICENSE         README.md       RELEASE_NOTES.md data             ext             licenses         sbinNOTICE           README_ZH.md     conf             docs             lib             logs             tools

要啟用 IoTDB 的 MQTT 協(xié)議支持,需要改動(dòng) IoTDB 的配置文件 conf/iotdb-engine.properties:
*后續(xù)建模使用了一個(gè)存儲(chǔ)組 root.sg,為了增加寫(xiě)入并行度,需要同時(shí)將 iotdb-engine.properties 中的 virtual_storage_group_num 設(shè)置為機(jī)器核數(shù)。
####################### MQTT Broker Configuration##################### whether to enable the mqtt service.enable_mqtt_service=true# the mqtt service binding host.mqtt_host=0.0.0.0# the mqtt service binding port.mqtt_port=2883# the handler pool size for handing the mqtt messages.mqtt_handler_pool_size=1# the mqtt message payload formatter.mqtt_payload_formatter=json# max length of mqtt message in bytemqtt_max_message_size=1048576

其中 enable_mqtt_service 默認(rèn)為 false,需要改成 true。mqtt_port 默認(rèn)值是 1883,為了避免與 emqx 的端口號(hào)沖突,需要改為 2883。
然后使用 ./sbin/start-server.sh 啟動(dòng) IoTDB 服務(wù)端:
% ./sbin/start-server.sh---------------------Starting IoTDB---------------------Maximum memory allocation pool = 2048MB, initial memory allocation pool = 512MBIf you want to change this configuration, please check conf/iotdb-env.sh(Unix or OS X, if you use Windows, check conf/iotdb-env.bat).2022-01-10 14:15:31,914 [main] INFO o.a.i.d.c.IoTDBDescriptor:121 - Start to read config file file:./sbin/../conf/iotdb-engine.properties...2022-01-10 14:14:28,690 [main] INFO o.a.i.d.s.UpgradeSevice:73 - Upgrade service stopped2022-01-10 14:14:28,690 [main] INFO o.a.i.db.service.IoTDB:153 - Congratulation, IoTDB is set up successfully. Now, enjoy yourself!2022-01-10 14:14:28,690 [main] INFO o.a.i.db.service.IoTDB:101 - IoTDB has started

我們保持這個(gè)終端窗口不動(dòng),另外打開(kāi)一個(gè)新的命令行終端窗口,啟動(dòng) IoTDB 的 shell 工具:
% ./sbin/start-cli.sh---------------------Starting IoTDB Cli---------------------_____       _________ ______   ______|_   _|     | _   _ ||_   _ `.|_   _ \| |   .--.|_/ | | \_| | | `. \ | |_) || | / .'`\ \ | |     | | | | | __'._| |_| \__. | _| |_   _| |_.' /_| |__) ||_____|'.__.' |_____| |______.'|_______/ version 0.12.4IoTDB> login successfullyIoTDB>

至此 IoTDB 環(huán)境就準(zhǔn)備好了。如要了解 IoTDB 的基本使用方法,可以參考官網(wǎng)的快速上手頁(yè)面。
安裝、配置 EMQX下載和啟動(dòng) EMQX
我們直接使用命令行下載 macOS 版本的 EMQX 開(kāi)源版,更多安裝包請(qǐng)?jiān)L問(wèn) EMQX 開(kāi)源版下載頁(yè)面
% wget https://www.emqx.com/en/downloads/broker/4.3.11/emqx-macos-4.3.11-amd64.zip

然后解壓并啟動(dòng) EMQX:
% unzip -q emqx-macos-4.3.11-amd64.zip% cd emqx% ./bin/emqx consolelog.to = "console"Erlang/OTP 23 [erts-11.1.8] [emqx] [64-bit] [smp:8:8] [ds:8:8:8] [async-threads:4] [hipe]Starting emqx on node emqx@127.0.0.1Start mqtt:tcp:internal listener on 127.0.0.1:11883 successfully.Start mqtt:tcp:external listener on 0.0.0.0:1883 successfully.Start mqtt:ws:external listener on 0.0.0.0:8083 successfully.Start mqtt:ssl:external listener on 0.0.0.0:8883 successfully.Start mqtt:wss:external listener on 0.0.0.0:8084 successfully.Start http:management listener on 8081 successfully.Start http:dashboard listener on 18083 successfully.EMQX Broker 4.3.11 is running now!Eshell V11.1.8 (abort with ^G)(emqx@127.0.0.1)1>

配置規(guī)則
使用瀏覽器打開(kāi) EMQX Dashboard,在規(guī)則引擎頁(yè)面創(chuàng)建一條規(guī)則:
SQL 語(yǔ)句為:
SELECT    clientid,    now_timestamp('millisecond') as now_ts_ms,    payload.bar as barFROM    "t/#"

然后我們?cè)陧?yè)面的底部,給規(guī)則加一個(gè) "橋接數(shù)據(jù)到 MQTT Broker" 動(dòng)作:
這個(gè)動(dòng)作需要關(guān)聯(lián)一個(gè)資源,我們點(diǎn)擊右上角的 “新建資源” 來(lái)創(chuàng)建一個(gè) MQTT Bridge 資源:
遠(yuǎn)程 Broker 地址要填寫(xiě) IoTDB 的 MQTT 服務(wù)地址,即 "127.0.0.1:2883"?蛻舳 Id、用戶名、密碼都填寫(xiě) root,因?yàn)?root 是 IoTDB 默認(rèn)的用戶名和密碼。
其他選項(xiàng)保持默認(rèn)值不變,點(diǎn)擊 ”測(cè)試連接“ 按鈕確保配置無(wú)誤,然后再點(diǎn)擊右下角的 ”新建“ 按鈕創(chuàng)建資源。
現(xiàn)在返回到動(dòng)作創(chuàng)建頁(yè)面,關(guān)聯(lián)資源的下拉框里自動(dòng)填充了我們剛才創(chuàng)建的資源。
現(xiàn)在我們繼續(xù)填寫(xiě)更多的動(dòng)作參數(shù):
IoTDB 不關(guān)心消息主題,我們填一個(gè)任意的主題:foo。
IoTDB 要求消息內(nèi)容是一個(gè) JSON 格式,消息內(nèi)容模板可以按照上圖中樣式填寫(xiě)。詳情請(qǐng)參見(jiàn) IoTDB 的通信服務(wù)協(xié)議文檔。
{ "device": "root.sg.${clientid}", "timestamp": ${now_ts_ms}, "measurements": [   "bar" ], "values": [   ${bar} ]}

注意其中的 "${clientid}", "${now_ts_ms}" 以及 "${bar}" 都是從規(guī)則的 SQL 語(yǔ)句的輸出中提取的變量,所以必須保證這些變量跟 SQL 語(yǔ)句的 SELECT 字句對(duì)應(yīng)上。
現(xiàn)在可以點(diǎn)擊 ”確認(rèn)“ 保存動(dòng)作配置,然后再次點(diǎn)擊 ”新建“ 完成規(guī)則的創(chuàng)建。
使用 MQTT Client 發(fā)送消息
接下來(lái)我們使用 MQTT 客戶端工具 - MQTT X,來(lái)發(fā)送一條消息給 EMQX:
MQTT X 是 EMQ 發(fā)布的一款完全開(kāi)源的 MQTT 5.0 跨平臺(tái)桌面客戶端。支持快速創(chuàng)建多個(gè)同時(shí)在線的 MQTT 客戶端連接,方便測(cè)試 MQTT/TCP、MQTT/TLS、MQTT/WebSocket 的連接、發(fā)布、訂閱功能及其他 MQTT 協(xié)議特性。
MQTT 客戶端的連接參數(shù)里面,我們只需要填一個(gè)參數(shù),Client ID:"abc",其他的保持默認(rèn)值不變。
連接成功之后,我們發(fā)送 2 條主題為:"t/1" 的消息,消息內(nèi)容格式為:
{ "bar": 0.2}

然后回到 EMQX Dashboard 的規(guī)則引擎頁(yè)面,觀察規(guī)則的命中次數(shù),確認(rèn)規(guī)則被觸發(fā)了 2 次:
最后我們回到命令行終端的 IoTDB 客戶端窗口,使用下面的 SQL 語(yǔ)句查詢數(shù)據(jù):
IoTDB> SHOW TIMESERIES root.sg.abc+---------------+-----+-------------+--------+--------+-----------+----+----------+|     timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|+---------------+-----+-------------+--------+--------+-----------+----+----------+|root.sg.abc.bar| null|     root.sg|   FLOAT| GORILLA|     SNAPPY|null|      null|+---------------+-----+-------------+--------+--------+-----------+----+----------+Total line number = 1It costs 0.006sIoTDB> SELECT * FROM root.sg.abc+-----------------------------+---------------+|                         Time|root.sg.abc.bar|+-----------------------------+---------------+|2022-01-10T17:39:41.724+08:00|            0.3||2022-01-10T17:40:32.805+08:00|            0.2|+-----------------------------+---------------+Total line number = 2It costs 0.007sIoTDB>

數(shù)據(jù)插入成功!
結(jié)語(yǔ)
至此,我們完成了通過(guò) EMQX 規(guī)則引擎功能將消息持久化到 IoTDB 時(shí)序數(shù)據(jù)庫(kù)。
在實(shí)際生產(chǎn)場(chǎng)景中,我們可以使用 EMQX 處理海量的物聯(lián)網(wǎng)設(shè)備并發(fā)連接,并通過(guò)規(guī)則引擎靈活地處理業(yè)務(wù)功能,然后將設(shè)備發(fā)送的消息持久化到 IoTDB 數(shù)據(jù)庫(kù),最后使用 Hadoop/Spark、Flink 或 Grafana 等對(duì)接 IoTDB 實(shí)現(xiàn)大數(shù)據(jù)分析、可視化展示等。
EMQX + IoTDB 的組合是一個(gè)簡(jiǎn)潔、高效且易擴(kuò)展、高可用的服務(wù)端集成方案,對(duì)于物聯(lián)網(wǎng)設(shè)備管理和數(shù)據(jù)處理場(chǎng)景來(lái)說(shuō),是一個(gè)不錯(cuò)的選擇。

原創(chuàng)文章,作者:EMQ,如若轉(zhuǎn)載,請(qǐng)注明出處:https://www.emqx.com/zh/blog/store-mqtt-messages-to-time-series-database-iotdb





歡迎光臨 電子工程網(wǎng) (http://www.4huy16.com/) Powered by Discuz! X3.4