歡迎光臨
每天分享高質量文章

使用 MQTT 在專案中實現資料收發 | Linux 中國

從開源資料到開源事件流,瞭解一下 MQTT 釋出/訂閱(pubsub)線路協議。
— Sean Dague


致謝
編譯自 | 
https://opensource.com/article/18/6/mqtt
 
 作者 | Sean Dague
 譯者 | Andy Song (pinewall) ????共計翻譯:28 篇 貢獻時間:132 天

從開源資料到開源事件流,瞭解一下 MQTT 釋出/訂閱(pubsub)線路協議。

去年 11 月我們購買了一輛電動汽車,同時也引發了有趣的思考:我們應該什麼時候為電動汽車充電?對於電動汽車充電所用的電,我希望能夠對應最小的二氧化碳排放,歸結為一個特定的問題:對於任意給定時刻,每千瓦時對應的二氧化碳排放量是多少,一天中什麼時間這個值最低?

尋找資料

我住在紐約州,大約 80% 的電力消耗可以自給自足,主要來自天然氣、水壩(大部分來自於尼亞加拉Niagara大瀑布)、核能發電,少部分來自風力、太陽能和其它化石燃料發電。非盈利性組織 紐約獨立電網運營商New York Independent System Operator[1] (NYISO)負責整個系統的運作,實現發電機組發電與用電之間的平衡,同時也是紐約路燈系統的監管部門。

儘管沒有為公眾提供公開 API,NYISO 還是盡責提供了不少公開資料[2]供公眾使用。每隔 5 分鐘彙報全州各個發電機組消耗的燃料資料。資料以 CSV 檔案的形式釋出於公開的檔案庫中,全天更新。如果你瞭解不同燃料對發電瓦數的貢獻比例,你可以比較準確的估計任意時刻的二氧化碳排放情況。

在構建收集處理公開資料的工具時,我們應該時刻避免過度使用這些資源。相比將這些資料打包併傳送給所有人,我們有更好的方案。我們可以建立一個低開銷的事件流event stream,人們可以訂閱並第一時間得到訊息。我們可以使用 MQTT[3] 實現該方案。我的專案(ny-power.org[4])標的是收錄到 Home Assistant[5] 專案中;後者是一個開源的家庭自動化home automation平臺,擁有數十萬使用者。如果所有使用者同時訪問 CSV 檔案伺服器,估計 NYISO 不得不增加訪問限制。

MQTT 是什麼?

MQTT 是一個釋出訂閱線路協議publish/subscription wire protocol,為小規模裝置設計。釋出訂閱系統工作原理類似於訊息匯流排。你將一條訊息釋出到一個主題topic上,那麼所有訂閱了該主題的客戶端都可以獲得該訊息的一份複製。對於訊息傳送者而言,無需知道哪些人在訂閱訊息;你只需將訊息釋出到一系列主題,並訂閱一些你感興趣的主題。就像參加了一場聚會,你選取並加入感興趣的對話。

MQTT 能夠構建極為高效的應用。客戶端訂閱有限的幾個主題,也只收到它們感興趣的內容。不僅節省了處理時間,還降低了網路頻寬使用。

作為一個開放標準,MQTT 有很多開源的客戶端和服務端實現。對於你能想到的每種程式語言,都有對應的客戶端庫;甚至有嵌入到 Arduino 的庫,可以構建感測器網路。服務端可供選擇的也很多,我的選擇是 Eclipse 專案提供的 Mosquitto[6] 服務端,這是因為它體積小、用 C 編寫,可以承載數以萬計的訂閱者。

為何我喜愛 MQTT

在過去二十年間,我們為軟體應用設計了可靠且準確的模型,用於解決服務遇到的問題。我還有其它郵件嗎?當前的天氣情況如何?我應該此刻購買這種產品嗎?在絕大多數情況下,這種問答式ask/receive的模型工作良好;但對於一個資料爆炸的世界,我們需要其它的模型。MQTT 的釋出訂閱模型十分強大,可以將大量資料傳送到系統中。客戶可以訂閱資料中的一小部分併在訂閱資料釋出的第一時間收到更新。

MQTT 還有一些有趣的特性,其中之一是遺囑last-will-and-testament訊息,可以用於區分兩種不同的靜默,一種是沒有主題相關資料推送,另一種是你的資料接收器出現故障。MQTT 還包括保留訊息retained message,當客戶端初次連線時會提供相關主題的最後一條訊息。這對那些更新緩慢的主題來說很有必要。

我在 Home Assistant 專案開發過程中,發現這種訊息匯流排模型對異構系統heterogeneous systems尤為適合。如果你深入物聯網Internet of Things領域,你會發現 MQTT 無處不在。

我們的第一個 MQTT 流

NYSO 公佈的 CSV 檔案中有一個是實時的燃料混合使用情況。每 5 分鐘,NYSO 釋出這 5 分鐘內發電使用的燃料型別和相應的發電量(以兆瓦為單位)。

這個 CSV 檔案看起來像這樣:

< 如顯示不全,請左右滑動 >
時間戳 時區 燃料型別 兆瓦為單位的發電量
05/09/2018 00:05:00 EDT 混合燃料 1400
05/09/2018 00:05:00 EDT 天然氣 2144
05/09/2018 00:05:00 EDT 核能 4114
05/09/2018 00:05:00 EDT 其它化石燃料 4
05/09/2018 00:05:00 EDT 其它可再生資源 226
05/09/2018 00:05:00 EDT 風力 1
05/09/2018 00:05:00 EDT 水力 3229
05/09/2018 00:10:00 EDT 混合燃料 1307
05/09/2018 00:10:00 EDT 天然氣 2092
05/09/2018 00:10:00 EDT 核能 4115
05/09/2018 00:10:00 EDT 其它化石燃料 4
05/09/2018 00:10:00 EDT 其它可再生資源 224
05/09/2018 00:10:00 EDT 風力 40
05/09/2018 00:10:00 EDT 水力 3166

表中唯一令人不解就是燃料類別中的混合燃料。紐約的大多數天然氣工廠也透過燃燒其它型別的化石燃料發電。在冬季寒潮到來之際,家庭供暖的優先順序高於發電;但這種情況出現的次數不多,(在我們計算中)可以將混合燃料型別看作天然氣型別。

CSV 檔案全天更新。我編寫了一個簡單的資料泵,每隔 1 分鐘檢查是否有資料更新,並將新條目釋出到 MQTT 伺服器的一系列主題上,主題名稱基本與 CSV 檔案有一定的對應關係。資料內容被轉換為 JSON 物件,方便各種程式語言處理。

  1. ny-power/upstream/fuel-mix/Hydro {"units": "MW", "value": 3229, "ts": "05/09/2018 00:05:00"}

  2. ny-power/upstream/fuel-mix/Dual Fuel {"units": "MW", "value": 1400, "ts": "05/09/2018 00:05:00"}

  3. ny-power/upstream/fuel-mix/Natural Gas {"units": "MW", "value": 2144, "ts": "05/09/2018 00:05:00"}

  4. ny-power/upstream/fuel-mix/Other Fossil Fuels {"units": "MW", "value": 4, "ts": "05/09/2018 00:05:00"}

  5. ny-power/upstream/fuel-mix/Wind {"units": "MW", "value": 41, "ts": "05/09/2018 00:05:00"}

  6. ny-power/upstream/fuel-mix/Other Renewables {"units": "MW", "value": 226, "ts": "05/09/2018 00:05:00"}

  7. ny-power/upstream/fuel-mix/Nuclear {"units": "MW", "value": 4114, "ts": "05/09/2018 00:05:00"}

這種直接的轉換是種不錯的嘗試,可將公開資料轉換為公開事件。我們後續會繼續將資料轉換為二氧化碳排放強度,但這些原始資料還可被其它應用使用,用於其它計算用途。

MQTT 主題

主題和主題結構topic structure是 MQTT 的一個主要特色。與其它標準的企業級訊息匯流排不同,MQTT 的主題無需事先註冊。傳送者可以憑空建立主題,唯一的限制是主題的長度,不超過 220 字元。其中 / 字元有特殊含義,用於建立主題的層次結構。我們即將看到,你可以訂閱這些層次中的一些分片。

基於開箱即用的 Mosquitto,任何一個客戶端都可以向任何主題釋出訊息。在原型設計過程中,這種方式十分便利;但一旦部署到生產環境,你需要增加訪問控制串列access control list(ACL)只允許授權的應用釋出訊息。例如,任何人都能以只讀的方式訪問我的應用的主題層級,但只有那些具有特定憑證credentials的客戶端可以釋出內容。

主題中不包含自動樣式automatic schema,也沒有方法查詢客戶端可以釋出的全部主題。因此,對於那些從 MQTT 匯流排消費資料的應用,你需要讓其直接使用已知的主題和訊息格式樣式。

那麼應該如何設計主題呢?最佳實踐包括使用應用相關的根名稱,例如在我的應用中使用 ny-power。接著,為提高訂閱效率,構建足夠深的層次結構。upstream 層次結構包含了直接從資料源獲取的、不經處理的原始資料,而 fuel-mix 層次結構包含特定型別的資料;我們後續還可以增加其它的層次結構。

訂閱主題

在 MQTT 中,訂閱僅僅是簡單的字串匹配。為提高處理效率,只允許如下兩種萬用字元:

◈ # 以遞迴方式匹配,直到字串結束
◈ + 匹配下一個 / 之前的內容

為便於理解,下麵給出幾個例子:

  1. ny-power/#  - 匹配 ny-power 應用釋出的全部主題

  2. ny-power/upstream/#  - 匹配全部原始資料的主題

  3. ny-power/upstream/fuel-mix/+  - 匹配全部燃料型別的主題

  4. ny-power/+/+/Hydro - 匹配全部兩次層級之後為 Hydro 型別的主題(即使不位於 upstream 層次結構下)

類似 ny-power/# 的大範圍訂閱適用於低資料量low-volume的應用,應用從網路獲取全部資料並處理。但對高資料量high-volume應用而言則是一個災難,由於絕大多數訊息並不會被使用,大部分的網路頻寬被白白浪費了。

在大資料量情況下,為確保效能,應用需要使用恰當的主題篩選(如 ny-power/+/+/Hydro)儘量準確獲取業務所需的資料。

增加我們自己的資料層次

接下來,應用中的一切都依賴於已有的 MQTT 流並構建新流。第一個額外的資料層用於計算發電對應的二氧化碳排放。

利用美國能源情報署U.S. Energy Information Administration[7] 給出的 2016 年紐約各類燃料發電及排放情況,我們可以給出各類燃料的平均排放率[8],單位為克/兆瓦時。

上述結果被封裝到一個專用的微服務中。該微服務訂閱 ny-power/upstream/fuel-mix/+,即資料泵中燃料組成情況的原始資料,接著完成計算並將結果(單位為克/千瓦時)釋出到新的主題層次結構上:

  1. ny-power/computed/co2 {"units": "g / kWh", "value": 152.9486, "ts": "05/09/2018 00:05:00"}

接著,另一個服務會訂閱該主題層次結構並將資料打包到 InfluxDB[9] 行程中;同時,釋出 24 小時內的時間序列資料到 ny-power/archive/co2/24h 主題,這樣可以大大簡化當前變化資料的繪製。

這種層次結構的主題模型效果不錯,可以將上述程式之間的邏輯解耦合。在複雜系統中,各個元件可能使用不同的程式語言,但這並不重要,因為交換格式都是 MQTT 訊息,即主題和 JSON 格式的訊息內容。

從終端消費資料

為了更好的瞭解 MQTT 完成了什麼工作,將其系結到一個訊息匯流排並檢視訊息流是個不錯的方法。mosquitto-clients 包中的 mosquitto_sub 可以讓我們輕鬆實現該標的。

安裝程式後,你需要提供伺服器名稱以及你要訂閱的主題。如果有需要,使用引數 -v 可以讓你看到有新訊息釋出的那些主題;否則,你只能看到主題內的訊息資料。

  1. mosquitto_sub -h mqtt.ny-power.org -t ny-power/# -v

只要我編寫或除錯 MQTT 應用,我總會在一個終端中執行 mosquitto_sub

從網頁直接訪問 MQTT

到目前為止,我們已經有提供公開事件流的應用,可以用微服務或命令列工具訪問該應用。但考慮到網際網路仍佔據主導地位,因此讓使用者可以從瀏覽器直接獲取事件流是很重要。

MQTT 的設計者已經考慮到了這一點。協議標準支援三種不同的傳輸協議:TCP[10]UDP[11]和 WebSockets[12]。主流瀏覽器都支援 WebSockets,可以維持持久連線,用於實時應用。

Eclipse 專案提供了 MQTT 的一個 JavaScript 實現,叫做 Paho[13],可包含在你的應用中。工作樣式為與伺服器建立連線、建立一些訂閱,然後根據接收到的訊息進行響應。

  1. // ny-power web console application

  2. var client = new Paho.MQTT.Client(mqttHost, Number("80"), "client-" + Math.random());

  3. // set callback handlers

  4. client.onMessageArrived = onMessageArrived;

  5. // connect the client

  6. client.reconnect = true;

  7. client.connect({onSuccess: onConnect});

  8. // called when the client connects

  9. function onConnect() {

  10.     // Once a connection has been made, make a subscription and send a message.

  11.     console.log("onConnect");

  12.     client.subscribe("ny-power/computed/co2");

  13.     client.subscribe("ny-power/archive/co2/24h");

  14.     client.subscribe("ny-power/upstream/fuel-mix/#");

  15. }

  16. // called when a message arrives

  17. function onMessageArrived(message) {

  18.     console.log("onMessageArrived:"+message.destinationName + message.payloadString);

  19.     if (message.destinationName == "ny-power/computed/co2") {

  20.         var data = JSON.parse(message.payloadString);

  21.         $("#co2-per-kwh").html(Math.round(data.value));

  22.         $("#co2-units").html(data.units);

  23.         $("#co2-updated").html(data.ts);

  24.     }

  25.     if (message.destinationName.startsWith("ny-power/upstream/fuel-mix")) {

  26.         fuel_mix_graph(message);

  27.     }

  28.     if (message.destinationName == "ny-power/archive/co2/24h") {

  29.         var data = JSON.parse(message.payloadString);

  30.         var plot = [

  31.             {

  32.                 x: data.ts,

  33.                 y: data.values,

  34.                 type: 'scatter'

  35.             }

  36.         ];

  37.         var layout = {

  38.             yaxis: {

  39.                 title: "g CO2 / kWh",

  40.             }

  41.         };

  42.         Plotly.newPlot('co2_graph', plot, layout);

  43.     }

上述應用訂閱了不少主題,因為我們將要呈現若干種不同型別的資料;其中 ny-power/computed/co2 主題為我們提供當前二氧化碳排放的參考值。一旦收到該主題的新訊息,網站上的相應內容會被相應替換。

ny-power.org[4] 網站提供的 NYISO 二氧化碳排放圖。

ny-power/archive/co2/24h 主題提供了時間序列資料,用於為 Plotly[14] 線表提供資料。ny-power/upstream/fuel-mix 主題提供當前燃料組成情況,為漂亮的柱狀圖提供資料。

ny-power.org[4] 網站提供的燃料組成情況。

這是一個動態網站,資料不從伺服器拉取,而是結合 MQTT 訊息匯流排,監聽對外開放的 WebSocket。就像資料泵和打包器程式那樣,網站頁面也是一個釋出訂閱客戶端,只不過是在你的瀏覽器中執行,而不是在公有雲的微服務上。

你可以在 http://ny-power.org 站點點看到動態變更,包括影象和可以看到訊息到達的實時 MQTT 終端。

繼續深入

ny-power.org 應用的完整內容開源在 GitHub[16] 中。你也可以查閱 架構簡介[17],學習如何使用 Helm[18] 部署一系列 Kubernetes 微服務構建應用。另一個有趣的 MQTT 示例使用 MQTT 和 OpenWhisk 進行實時文字訊息翻譯,程式碼樣式code pattern參考連結[19]

MQTT 被廣泛應用於物聯網領域,更多關於 MQTT 用途的例子可以在 Home Assistant[20] 專案中找到。

如果你希望深入瞭解協議內容,可以從 mqtt.org[3] 獲得該公開標準的全部細節。

想瞭解更多,可以參加 Sean Dague 在 OSCON[21] 上的演講,主題為 將 MQTT 加入到你的工具箱[22],會議將於 7 月 16-19 日在奧爾良州波特蘭舉辦。


via: https://opensource.com/article/18/6/mqtt

作者:Sean Dague[24] 選題:lujun9972 譯者:pinewall 校對:wxy

本文由 LCTT 原創編譯,Linux中國 榮譽推出

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖