1.計測用デバイスの基本要件
まず、ブロードキャスター(計測用デバイス)としてどのような機能が必要になるかをまとめておきましょう。
○計測と送信
決められた時間間隔で温度と湿度を計測して、計測値メッセージをトピック「device1/measured_value」で
パブリッシュする。
さほど厳密さを要求しないので、QoSレベルはゼロとする。
○ボタン押下情報の送信
赤色ボタンをクリックすると、トピック「device1/button_on」で押下メッセージをパブリッシュする。
これによって受信側のLEDをON/OFFするので、取りこぼしがないようQoSレベルは2とする。
○デバイスIDの発信
複数のデバイスからパブリッシュするケースを考えて、適切なトピックを設定する。
○ボタン押下情報の受信
監視用デバイスのボタン押下を受信するために、トピック「device2/button_on」をサブスクライブする。
これによってLEDをON/OFFするので、取りこぼしがないようQoSレベルは2とする。
受信時には黄色LEDの状態を反転(点灯または消灯)させる。
○キープアライブ間隔の設定
計測間隔10秒より大きめの30秒を設定する。
○ラーストウィルメッセージの設定
ネットワークエラーによる通信断に備えてLWTを設定する。
設定の内容は、監視用デバイスのOLEDディスプレイに表示する文字列「*Error!,Device1」とする。
2.Async MQTTクライアントライブラリー
ESP32でMQTT通信を行うにはクライアントライブラリーを使用します。さまざまなメソッド(関数)が準備されていますが、ここでは今回使用するものを中心に見ておきましょう。使用に当たって、まず次のようにMQTTクライアントをインスタンス化します。
AsyncMqttClient mqttClient;
インスタンス化すると、以降はmqttClient.~のようにメソッドを指定して実行させることができます。
(1)動作設定
①AsyncMqttClient& setServer(IPAddress、uint16_t port)
ブローカーを設定します。
・IPAddress: IPAddress(192,168,~,~)のように指定します。
・port: 1883(標準で1883/TCPポートを使用します)。
②AsyncMqttClient& setKeepAlive(uint16_t keepAlive)
キープアライブ間隔を設定します。
・keepAlive: 秒数を指定します。
③AsyncMqttClient& setWill(const char* topic, uint8_t qos, bool retain,
const char* payload = nullptr, size_t length = 0)
LWT(Last Will and Testament)を設定します。
・topic: LWTを送信するトピック。
・qos: LWTのQoS。
・retain: trueならLWTのメッセージを保持する。
・payload: LWTの本文。
・length: ペイロード長。設定しないかゼロならstrlen(payload)を採用する。
(2)イベントハンドラー
6種類のイベントハンドラーがあります。いずれもユーザー定義のコールバック関数を設定するためのもので、引数で指定した関数を該当するイベントハンドラーに追加します。それぞれのイベントが発生すると、ここで設定したコールバック関数が呼び出されて実行されます。
○ AsyncMqttClient& onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback)
○ AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback)
○ AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback)
○ AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback)
○ AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback)
○ AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback)
○ AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback)
○ AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback)
○ AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback)
○ AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback)
○ AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback)
(3)操作関数
①void connect()
ブローカーに接続します。
②uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr,
size_t length = 0, bool dup = false, uint16_t message_id = 0)
指定したトピックでメッセージをパブリッシュします。
・topic: トピック。
・qos: QoS。
・retain: trueならメッセージを保持する。
・payload: メッセージの本文。
・length: ペイロード長。設定しないかゼロならstrlen(payload)を採用する。
・dup: trueならペイロードの重複フラグが立てられる。
・message_id: メッセージID。設定しないかゼロなら自動的に割り付けられる。
③uint16_t subscribe(const char* topic, uint8_t qos)
指定したトピックをサブスクライブします。
・topic: トピック。
・qos: QoS。
3.LWT(Last Will and Testament)の設定
すでに述べたようにLWTとは、ブローカーが意図しない原因でPublisherと通信できなくなった場合に、Subscriberへ送信するメッセージです。SubscriberはLWTを受信したことで、受信対象のPublisherが異常停止していることを知ることができます。
このようにとても重要なメッセージですが、その実装方法についての情報が少なく、あれこれ試行錯誤することになりました。ちょとした解説か例示があればきわめて簡単であるにもかかわらず、それが得られず無駄な時間を費やすことは何としても避けたいですね。そんなことから、LWTの設定を独立した項目でまとめておくことにします。
AsyncMqttClient mqttClient; // AsyncMqttClientをインスタンス化する const char* publishMeasuredValue = "device1/measured_value"; // トピック String measuredString = ""; // 計測結果文字列 /* パブリッシュ処理 */ void publishProcess() { // トピックpublishMeasuredValueのメッセージとして、QoS0で測定文字列をパブリッシュする uint16_t packetId = mqttClient.publish( publishMeasuredValue, 0, true, measuredString.c_str()); } /* ESP32をMQTTブローカーに接続する */ void connectToMqtt() { Serial.println("Connecting to MQTT..."); // KeepAliveとLWTを設定して mqttClient.setKeepAlive(30); // 30秒 mqttClient.setWill(publishMeasuredValue, 1, true, "*Error!,Device1"); // ブローカーに接続する mqttClient.connect(); } |
例えばpublishProcess()で示すようなパブリッシュ処理をするとして、この場合のLWTの設定はブローカー接続処理connectToMqtt(この名前は何でもいいのですが)で行うのが適当かと思います。
それに先だって、setKeepAliveでキープアライブ間隔を秒単位で設定します。設定値の1.5倍を超えてブローカーへの通信がなければ、LWTで設定したメッセージがブローカーからサブスクライバーへ送信されます。
さてそのLWTですが、setWill関数で設定します。第1引数では、publishProcess()のpublish関数の第1引数で指定したトピックと同じものを指定していることに注意してください。つまり、LWTは正常時と同じトピックとしてパブリッシュしています。第2・3引数ではQosとretainをそれぞれ1, trueとし、続くpayloadで通知文字列「*Error!,Device1」を指定しています。payloadは正常時と同様に処理されるので、OLEDディスプレイへの表示を考慮して、温度表示部に"*Error!"を、湿度表示部にパブリッシャーのデバイス番号を表示するよう記述しています。
これらの設定を行った後で、ブローカーへの接続connect()を実行します。setKeepAlive()とSetWill()で設定した諸条件は、connect()実行時にブローカーへ送られて準備が完了します。
4.コードの解説
async-mqtt-clientライブラリーのサンプルプログラムであるFullyFeatured-ESP32.inoをベースにして開発します。MQTTクライアントとの切断時に再接続する仕組みは、サンプルと同様にFreeRTOSソフトウェアタイマーを使用しています。コードにはかなり細かくコメントを記述しているので、以下ではポイントだけを解説します。
①ヘッダーファイルとデータ等の定義
・18行目 PlatformIO+VSCode用。
・21~22行 AmazonのFreeRTOSのタイマーAPI(xTimerCreate, xTimerStart, xTimerStop)用。
・24行目 Async MQTTクライアントライブラリー用。
・27~40行 PlatformIO+VSCode用の関数プロトタイプ宣言。
・47~48、51行 環境に合った情報を設定してください!
・52行目 MQTT専用ポートです。
・55行目 AsyncMqttClientをインスタンス化します。
・56行目 MQTT再接続用ソフトタイマー。
・57行目 WiFi再接続用ソフトタイマー。
・59~62行 計測値データ送信、ボタン押下情報送信、ボタン押下情報受信のトピックス。
#include <Arduino.h>
#include <WiFi.h>
extern "C" {
#include "freertos/FreeRTOS.h"
#include "freertos/timers.h"
}
#include <AsyncMqttClient.h>
#include <DHT.h> // DHTセンサー用
/* Function Prototype */
void doInitialize();
void doPrepare();
void doProcess();
void connectToWifi();
void connectToMqtt();
void WiFiEvent(WiFiEvent_t);
void onMqttConnect(bool);
void onMqttDisconnect(AsyncMqttClientDisconnectReason);
void onMqttSubscribe(uint16_t, uint8_t);
void onMqttUnsubscribe(uint16_t);
void onMqttPublish(uint16_t);
void onMqttMessage(char*, char*, AsyncMqttClientMessageProperties, size_t, size_t, size_t);
bool isPushbuttonClicked();
/* 基本属性定義 */
#define SPI_SPEED 115200 // SPI通信速度
#define DHTTYPE DHT11 // DHTセンサーの型式
// ルーター接続情報
#define WIFI_SSID "your_ssid"
#define WIFI_PASSWORD "your_password"
// Mosquitto MQTT(Raspberry Pi)ブローカー接続情報
#define MQTT_HOST IPAddress(192, 168, ?, ??)
#define MQTT_PORT 1883
// MQTTクライアント操作用オブジェクト
AsyncMqttClient mqttClient;
TimerHandle_t mqttReconnectTimer;
TimerHandle_t wifiReconnectTimer;
// パブリッシュ/サブスクライブ・トピックス
const char* publishMeasuredValue = "device1/measured_value";
const char* publishPushbutton = "device1/button_on";
const char* subscribePushbutton = "device2/button_on";
String measuredString = ""; // 計測結果
unsigned long LastPubTime = 0; // 最新パブリッシュ時刻
const long iInterval = 10000; // 計測間隔(10秒ごとに設定)
/* DHTセンサー*/
const int DHTPin = 14; // DHTセンサーの接続ピン
DHT dht(DHTPin, DHTTYPE); // DHTクラスの生成
/* LEDピン */
const int ledPin = 25; // LED出力接続ピン
int ledState = LOW; // 出力ピンの状態
/* プッシュボタン */
const int buttonPin = 32; // 接続ピン
int buttonState; // 状態
int lastButtonState = LOW; // 直前の状態
unsigned long lastDebounceTime = 0; // 直前の切替時刻
unsigned long debounceDelay = 50; // デバウンスタイム(mSec.)
/* 受信制御用 */
bool bReady = false;
②setup()初期化処理
図のように個別の関数コールで構成するシンプルなものです。GPIOの設定とセンサーの「初期化処理」をして、再接続用タイマーの作成、コールバック関数の割り当て、ブローカーの設定などの「準備処理」を行い、Wi-Fiルーターに接続しています。
void setup() {
doInitialize(); // 初期化処理をする
doPrepare(); // 準備処理をして
connectToWifi(); // Wi-Fiルーターに接続する
}
③loop()反復処理
所定の間隔で計測値をパブリッシュします。プッシュボタンが押されていたら、ボタン押下メッセージをパブリッシュします。
void loop() {
unsigned long now = millis();
// 所定の間隔で計測値をパブリッシュする
if (now - LastPubTime >= iInterval) {
// 最新時刻を控え
LastPubTime = now;
doProcess();
}
if (isPushbuttonClicked()) { // プッシュボタンが押されていたら
// トピックpublishPushbuttonのメッセージとして、QoS2で測定文字列をパブリッシュする
mqttClient.publish(publishPushbutton, 2, true, "clicked");
Serial.println("Publish 'publishPushbutton' QoS2");
}
}
④初期化処理
GPIOの設定とセンサーを起動します。
void doInitialize() {
Serial.begin(SPI_SPEED);
pinMode(buttonPin, INPUT); // GPIO設定:プッシュボタン
pinMode(ledPin, OUTPUT); // GPIO設定:LED
digitalWrite(ledPin, LOW);
dht.begin(); // DHTセンサー起動
}
⑤準備処理
再接続用タイマーの作成、すべてのコールバック関数の割り当てとブローカーの設定をします。
・123~126行 MQTT、Wi-Fi再接続タイマーを作成する。
「xTimerCreate() API関数のプロトタイプ」
TimerHandle_t xTimerCreate(const char* const pcTimerName, TickType_t xTimerPeriodInTicks,
UBaseType_t uxAutoReload, void* pvTimerID, TimerCallbackFunction_t pxCallbackFunction );
・pcTimerName: タイマーの名前。
・xTimerPeriodInTicks: pdMS_TO_TICKS() マクロによるタイマーの期間(ミリ秒)。
・uxAutoReload: 自動再ロードタイマーならpdTRUE、ワンショットタイマーならpdFALSE。
・pvTimerID: ソフトウェアタイマーごとのID値。必要ならIDをvoidポインターで指定。
・pxCallbackFunction: 作成するソフトウェアタイマーのコールバック関数。
・戻り値: メモリー不足ならNULL。
・140行目 ブローカーのIPアドレスとポート番号で対応付ける。
void doPrepare() {
// 再接続タイマーを作成する
mqttReconnectTimer = xTimerCreate("mqttTimer", pdMS_TO_TICKS(2000), pdFALSE,
(void*)0, reinterpret_cast<TimerCallbackFunction_t>(connectToMqtt));
wifiReconnectTimer = xTimerCreate("wifiTimer", pdMS_TO_TICKS(2000), pdFALSE,
(void*)0, reinterpret_cast<TimerCallbackFunction_t>(connectToWifi));
// WiFiイベントのコールバック関数を割り当てて
WiFi.onEvent(WiFiEvent);
// すべてのMQTTコールバック関数を割り当てる
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onSubscribe(onMqttSubscribe);
mqttClient.onUnsubscribe(onMqttUnsubscribe);
mqttClient.onMessage(onMqttMessage);
mqttClient.onPublish(onMqttPublish);
// ブローカーを設定する
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
}
⑥主処理ロジック
温湿度の計測と計測値の編集を行ってパブリッシュします。
・155~160行 測定値をOLEDディスプレイ表示用文字列に編集する。
・163~164行 測定値文字列をQoS0、retain=trueでパブリッシュする。
void doProcess() {
// 温度と湿度を読み取る
float t = dht.readTemperature();
float h = dht.readHumidity();
// 読み取りに失敗したかどうかを確認し、失敗なら再試行させる
if (isnan(h) || isnan(t)) {
Serial.println("Failed to read from DHT!");
return;
}
//温度と湿度の測定値を文字列に変換する
char temperatureTemp[7];
dtostrf(t, 6, 2, temperatureTemp);
char humidityTemp[7];
dtostrf(h, 6, 2, humidityTemp);
measuredString = String(temperatureTemp) + "C," + String(humidityTemp) + "%";
// トピックpublishMeasuredValueのメッセージとして、QoS0で測定文字列をパブリッシュする
uint16_t packetId = mqttClient.publish(
publishMeasuredValue, 0, true, measuredString.c_str());
Serial.print("Publish 'MeasuredValuee' QoS0, packetId:");
Serial.print(packetId); Serial.print(" ");
Serial.println(measuredString);
}
⑦コネクション確立用
・174行目 所定のSSIDとパスワードでWi-Fiルーターに接続する。
・181行目 キープアライブ間隔を30秒に設定する。
・182行目 LWTメッセージを設定する。
・184行目 ブローカーに接続する。
/* ESP32をWi-Fiルーターに接続する */
void connectToWifi() {
Serial.println("Connecting to Wi-Fi...");
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
}
/* ESP32をMQTTブローカーに接続する */
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
// 必要であれば、KeepAliveとラーストウィル・メッセージを設定して
mqttClient.setKeepAlive(30);
mqttClient.setWill(publishMeasuredValue, 1, true, "*Error!,Device1");
// ブローカーに接続する
mqttClient.connect();
}
⑧コールバック関数
・195行目 MQTTブローカーに接続する。
・199行目 実行中ソフトウェアタイマーを停止する。
「xTimerStop() API関数のプロトタイプ」
TimerHandle_t xTimerStop(TimerHandle_t xTimer, TickType_t xTicksToWait);
・xTimer: xTimerCreate()で作成したソフトウェアタイマーのハンドル。
・xTicksToWait: 待機する最大時間。
・200行目 停止中のソフトウェアタイマーを開始する。
「xTimerStart() API関数のプロトタイプ」
TimerHandle_t xTimerStart(TimerHandle_t xTimer, TickType_t xTicksToWait);
・xTimer: xTimerCreate()で作成したソフトウェアタイマーのハンドル。
・xTicksToWait: 待機する最大時間。
・211行目 監視用デバイスからのボタンシグナルのサブスクライブを宣言する。
・243~244行 メッセージ受信時のコールバック関数
onMqttMessage(char* topic, char* payload,
AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total);
・topic: トピック。
・payload: ペイロード(メッセージ本文)。
・properties
properties.qos: QoSレベル。
properties.dup: 重複フラグ
properties.retain:
・len: ペイロード長
・index: インデックス
・total: 合計ペイロード長
・246~248行 メッセージ本文をString変数に取り出す。
・260行目 受信したトピックが目的とするボタンシグナルかどうかを確認する。
・262,273行目 起動直後に1回だけサブスクライブが自動実行される。
これによってLEDが点灯してしまうのを防ぐために、bReady変数で実行タイミングを調整している。
/* WiFiEvent関数 */
void WiFiEvent(WiFiEvent_t event) {
switch(event) {
case SYSTEM_EVENT_STA_GOT_IP: // WiFi接続が完了したらMQTTブローカーに接続する
Serial.println("WiFi connected!");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
connectToMqtt();
break;
case SYSTEM_EVENT_STA_DISCONNECTED: // WiFiが切断されたらタイマーを起動して再接続を試行
Serial.println("WiFi disconnected!");
xTimerStop(mqttReconnectTimer, 0); // WiFi再接続中はMQTTをストップさせる
xTimerStart(wifiReconnectTimer, 0);
break;
default:
break;
}
}
/* MQTT割込: MQTT接続 */
void onMqttConnect(bool sessionPresent) {
Serial.println("MQTT connected!");
// subscribePushbuttonトピックをQoS2でサブスクライブする
uint16_t packetId = mqttClient.subscribe(subscribePushbutton, 2);
Serial.print("Prepare 'subscribePushbutton' QoS2, packetId: ");
Serial.println(packetId);
}
/* MQTT割込: MQTT切断 */
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
Serial.println("MQTT disconnected!");
if (WiFi.isConnected()) {
xTimerStart(mqttReconnectTimer, 0);
}
}
/* MQTT割込: サブスクライブ */
void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
Serial.print("Subscribe now. packetId:");
Serial.println(packetId);
}
/* MQTT割込: サブスクライブ解除 */
void onMqttUnsubscribe(uint16_t packetId) {
Serial.print("Unsubscribe now. packet Id:");
Serial.println(packetId);
}
/* MQTT割込: パブリッシュ */
void onMqttPublish(uint16_t packetId) {
Serial.print("Publish now. packetId:");
Serial.println(packetId);
}
/* MQTT割込: メッセージ受信 */
void onMqttMessage(char* topic, char* payload,
AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
String messageTemp;
for (int i = 0; i < len; i++) {
messageTemp += (char)payload[i];
}
Serial.println("Received message.");
Serial.print(" ・Topic: ");
Serial.println(topic);
Serial.print(" ・Message: ");
Serial.println(messageTemp);
Serial.print(" ・QoS: "); Serial.println(properties.qos);
Serial.print(" ・Retain: "); Serial.println(properties.retain);
Serial.print(" ・Len: "); Serial.println(len);
Serial.print(" ・Index: "); Serial.println(index);
Serial.print(" ・Total: "); Serial.println(total);
// トピックがボタンシグナルならLEDを制御する
if (strcmp(topic, subscribePushbutton) == 0) {
// 準備ができていれば、LEDが消灯状態なら点灯し点灯状態なら消灯する
if (bReady) {
if (ledState == LOW) {
ledState = HIGH;
Serial.println(" *LED: ON!");
} else {
ledState = LOW;
Serial.println(" *LED: OFF!");
}
}
// 設定したLEDの状態を物理的に反映させる
digitalWrite(ledPin, ledState);
bReady = true;
}
}
5.実行結果
〔コンパイルエラーへの対処〕
非同期MQTTクライアントが使用する非同期TCPライブラリー(AsyncTCP)においてコンパイルエラーが発生しました。使用しているバージョンが原因かも知れませんが、エラーの内容と原因、対処方法は次の通りです。なお、以下の~~の部分はユーザー名です。
○エラーメッセージ(部分)
C:\Users\~~\AppData\Local\Arduino15\packages\esp32\hardware\esp32\1.0.1\libraries\ async-mqtt-client\src\AsyncMqttClient.cpp:40:61: error: format '%x' expects argument of type 'unsigned int', but argument 3 has type 'uint64_t {aka long long unsigned int}' [-Werror=format=] cc1plus.exe: some warnings being treated as errors 次のフォルダのライブラリasync-mqtt-clientバージョン0.8.2を使用中:C:\Users\~~\App Data\Local\Arduino15\packages\esp32\hardware\esp32\1.0.1\libraries\async-mqtt-client 次のフォルダのライブラリAsyncTCPバージョン1.0.3を使用中:C:\Users\~~\AppData\Local \Arduino15\packages\esp32\hardware\esp32\1.0.1\libraries\AsyncTCP exit status 1 ボードDOIT ESP32 DEVKIT V1に対するコンパイル時にエラーが発生しました。 |
○原因の特定
「AsyncMqttClient.cpp:40:61: error: format '%x' expects argument of」の部分に注目して、AsyncMqttClient.cppの40行目を確認しました。
#ifdef ESP32 sprintf(_generatedClientId, "esp32%06x", ESP.getEfuseMac()); <== ★ #elif defined(ESP8266) sprintf(_generatedClientId, "esp8266%06x", ESP.getChipId()); #endif |
○対処方法
uint64_tに対応できるよう(long long unsigned intが表示できるよう)、次のように修正しました。
sprintf(_generatedClientId, "esp32%06llx", ESP.getEfuseMac()); |
これでOKです。
〔実行結果〕
事前にブローカーのRaspberry Piを立ち上げてください。
準備ができたところで、今回作成したスケッチMQTTDevice1.inoをコンパイルしてボードに書き込みます。相方の監視用ボードのスケッチが未完成のため、実行の確認はシリアルモニターだけで行うことになります。
まずWi-Fiに接続し、MQTTのコネクションを確立します。成功すると、監視用ボードからのボタン押下情報をサブスクライブします。(手元ではすでに監視用ボード側のスケッチのテストを行っているので、それによってブローカーに「Retainフラグ=true」で送信されている内容が受信されています。)
その後は設定した時間間隔で計測データをパブリッシュしますが、プッシュボタンをクリックするとボタンシグナルをパブリッシュします。QoS0の計測データは毎回Packet Idが同じですが、QoS2のボタンシグナルではPacket Idがカウントアップされています。これによって、MQTTがどのようにサービス品質(QoS)を管理しているかを窺うことができますね。
スケッチ全体は、ページ先頭の[Download]ボタンでダウンロードしてください。
次回は監視デバイス用のスケッチを作成して、双方向通信のテストを行います。
お楽しみに!