1.監視用デバイスの基本要件
まず、ブロードキャスター(監視用デバイス)としてどのような機能が必要になるかをまとめておきましょう。
○計測データの受信と表示
トピック「device1/measured_value」で計測デバイスからパブリッシュされた計測値データを受信して、
OLEDディスプレイに編集・表示する。
計測デバイスのパブリッシュと同様に、QoSレベルはゼロとする。
○ボタン押下情報の受信
計測用デバイスのボタン押下を受信するために、トピック「device1/button_on」をサブスクライブする。
これによってLEDをON/OFFするので、取りこぼしがないようQoSレベルは2とする。
受信時には赤色LEDの状態を反転(点灯または消灯)させる。
○ボタン押下情報の送信
黄色ボタンをクリックすると、トピック「device2/button_on」で押下メッセージをパブリッシュする。
これによって受信側のLEDをON/OFFするので、取りこぼしがないようQoSレベルは2とする。
○デバイスIDの発信
複数のデバイスからパブリッシュするケースを考えて、適切なトピックを設定する。
2.ブロッキング関数の実行制限
うっかりやってしまって、デバッグで時間を費やしてしまいがちな問題です。実際、今回もコールバック関数onMqttMessage()内にOLEDディスプレイのハンドリングを書いて、誤表示に悩まされました。
基本的に、コールバック関数内ではネットワーク通信、シリアル通信、ファイル読み書きなどのブロッキングI/O処理を行ってはいけませんし、それらを使用した関数も呼び出さないのが無難です。そうしないと、いろんな誤動作が発生して右往左往することになります。
監視用デバイスでの表示は、グローバル変数「bReceived=false;」を設け、onMqttMessage()内で計測値を受信した場合は、bReceivedを trueに設定し、loop()内で bReceivedを判定してtrueであれば表示させるようにしています。
3.コードの解説
監視用デバイスのスケッチの大半は、計測用デバイスと同じです。計測とパブリッシュの処理を取り除いて、計測値のサブスクライブとOLEDディスプレイへの編集・表示を処理を加えると完成します。
ほとんど既存コードを修正するだけで出来上がるので、MQTT通信の弾力性と融通性には感心させられますね。的確なトピックの設定と、デバイス固有のセンサー類等の処理方法が決まれば、比較的簡単に通信できてしまいます。
以下では、計測用コードと異なる部分だけをマークし要点のみ解説します。
①ヘッダーファイルとデータ等の定義
・48~49、52行 環境に合った情報を設定してください!
・86行目 「受信データあり」を表す。コールバック関数onMqttMessage()内でのブロッキング関数実行回避用。
#include <Arduino.h>
#include <WiFi.h>
extern "C" {
#include "freertos/FreeRTOS.h"
#include "freertos/timers.h"
}
#include <AsyncMqttClient.h>
#include <Wire.h> // For I2C interface
#include <Adafruit_SSD1306.h> // For SSD1306 display
#include <Adafruit_GFX.h> // For SSD1306 display
/* Function Prototype */
void doInitialize();
void doPrepare();
void connectToWifi();
void connectToMqtt();
void WiFiEvent(WiFiEvent_t);
void onMqttConnect(bool);
void onMqttDisconnect(AsyncMqttClientDisconnectReason);
void onMqttSubscribe(uint16_t, uint8_t);
void onMqttUnsubscribe(uint16_t);
void onMqttPublish(uint16_t);
void onMqttMessage(char*, char*, AsyncMqttClientMessageProperties, size_t, size_t, size_t);
void displayValues(char*, char*, char*);
bool isPushbuttonClicked();
/* 基本属性定義 */
#define SPI_SPEED 115200 // SPI通信速度
// ルーター接続情報
#define WIFI_SSID "your_ssid"
#define WIFI_PASSWORD "your_password"
// Mosquitto MQTT(Raspberry Pi)ブローカー接続情報
#define MQTT_HOST IPAddress(192, 168, ?, ??)
#define MQTT_PORT 1883
// MQTTクライアント操作用オブジェクト
AsyncMqttClient mqttClient;
TimerHandle_t mqttReconnectTimer;
TimerHandle_t wifiReconnectTimer;
// パブリッシュ/サブスクライブ・トピックス
const char* publishPushbutton = "device2/button_on";
const char* subscribeMeasuredValue = "device1/measured_value";
const char* subscribePushbutton = "device1/button_on";
/* OLEDディスプレイの設定 */
#define SCREEN_WIDTH 128 // 幅 (単位:ピクセル)
#define SCREEN_HEIGHT 64 // 高さ(単位:ピクセル)
Adafruit_SSD1306 display(SCREEN_WIDTH, SCREEN_HEIGHT, &Wire);
/* LEDピン */
const int ledPin = 25; // LED出力接続ピン
int ledState = LOW; // 出力ピンの状態
/* プッシュボタン */
const int buttonPin = 32; // 接続ピン
int buttonState; // 状態
int lastButtonState = LOW; // 直前の状態
unsigned long lastDebounceTime = 0; // 直前の切替時刻
unsigned long debounceDelay = 50; // デバウンスタイム(mSec.)
/* 温度と湿度用の変数 */
char* temperature; // 温度
char* humidity; // 湿度
/* 受信制御用 */
bool bReceived = false;
bool bReady = false;
②setup()初期化処理
GPIOの設定とセンサーの「初期化処理」をして、再接続用タイマーの作成、コールバック関数の割り当て、ブローカーの設定などの「準備処理」を行い、Wi-Fiルーターに接続します。
void setup() {
doInitialize(); // 初期化処理をする
doPrepare(); // 準備処理をして
connectToWifi(); // Wi-Fiルーターに接続する
}
③loop()反復処理
コールバック関数onMqttMessage()で計測値が受信されていたら計測値を表示させます。そして、プッシュボタンが押されていたら、ボタン押下シグナルをパブリッシュします。
・100~101行 計測値が受信されていればOLEDディスプレイに表示する。
・104~106行 プッシュボタンが押されていたら、ボタン押下メッセージをパブリッシュする。
void loop() {
// 測定値が受信されていればOLEDディスプレイに表示する
if (bReceived) {
displayValues("DHT SENSOR", temperature, humidity);
bReceived = false;
}
if (isPushbuttonClicked()) { // プッシュボタンが押されていたら
// トピックpublishPushbuttonのメッセージとして、QoS2でパブリッシュする
mqttClient.publish(publishPushbutton, 2, true, "clicked");
Serial.println("Publish 'publishPushbutton' QoS2");
}
}
④初期化処理
GPIOを設定しOLEDディスプレイを初期化します。
void doInitialize() {
Serial.begin(SPI_SPEED);
pinMode(buttonPin, INPUT); // GPIO設定:プッシュボタン
pinMode(ledPin, OUTPUT); // GPIO設定:LED
digitalWrite(ledPin, LOW);
// OLEDディスプレイを初期化する
if(!display.begin(SSD1306_SWITCHCAPVCC, 0x3C)) { // Address 0x3C for 128x32
Serial.println(F("** SSD1306 doesn't work! **"));
for(;;); // 処理を進ませない!
}
displayValues("DHT SENSOR", "", "");
}
⑤準備処理
計測用デバイスと同じです。
void doPrepare() {
// 再接続タイマーを作成する
mqttReconnectTimer = xTimerCreate("mqttTimer", pdMS_TO_TICKS(2000), pdFALSE,
(void*)0, reinterpret_cast<TimerCallbackFunction_t>(connectToMqtt));
wifiReconnectTimer = xTimerCreate("wifiTimer", pdMS_TO_TICKS(2000), pdFALSE,
(void*)0, reinterpret_cast<TimerCallbackFunction_t>(connectToWifi));
// WiFiイベントのコールバック関数を割り当てて
WiFi.onEvent(WiFiEvent);
// すべてのMQTTコールバック関数を割り当てる
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onSubscribe(onMqttSubscribe);
mqttClient.onUnsubscribe(onMqttUnsubscribe);
mqttClient.onMessage(onMqttMessage);
mqttClient.onPublish(onMqttPublish);
// ブローカーを設定する
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
}
⑥コネクション確立用
・「計測デバイス版」からキープアライブとLWTの設定を除去しています。
/* ESP32をWi-Fiルーターに接続する */
void connectToWifi() {
Serial.println("Connecting to Wi-Fi...");
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
}
/* ESP32をMQTTブローカーに接続する */
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
}
⑦コールバック関数
・186行目 計測データのサブスクライブを宣言する。
・190行目 計測デバイスからのボタンシグナルのサブスクライブを宣言する。。
・226~233行 ペイロード(計測データ)から温度と湿度を取得する。
・245行目 受信したトピックが計測データかどうかを確認する。
・248~249行 温度と湿度を編集する。
・250行目 loop()部でOLEDディスプレイ表示をするため、bReceivedを設定する(ブロッキング処理実行回避用)。
/* WiFiEvent関数 */
void WiFiEvent(WiFiEvent_t event) {
switch(event) {
case SYSTEM_EVENT_STA_GOT_IP:
Serial.println("WiFi connected!");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
connectToMqtt();
break;
case SYSTEM_EVENT_STA_DISCONNECTED:
Serial.println("WiFi disconnected!");
xTimerStop(mqttReconnectTimer, 0); // WiFi再接続中はMQTTをストップさせる
xTimerStart(wifiReconnectTimer, 0);
break;
default:
break;
}
}
/* MQTT割込: MQTT接続 */
void onMqttConnect(bool sessionPresent) {
Serial.println("MQTT connected!");
// subscribeMeasuredValueトピックをQoS0でサブスクライブする
uint16_t packetId1 = mqttClient.subscribe(subscribeMeasuredValue, 0);
Serial.print("Prepare 'subscribeMeasuredValue' QoS0, packetId:");
Serial.println(packetId1);
// subscribePushbuttonトピックをQoS2でサブスクライブする
uint16_t packetId2 = mqttClient.subscribe(subscribePushbutton, 2);
Serial.print("Prepare 'subscribePushbutton' QoS2, packetId:");
Serial.println(packetId2);
}
/* MQTT割込: MQTT切断 */
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
Serial.println("MQTT disconnected!");
if (WiFi.isConnected()) {
xTimerStart(mqttReconnectTimer, 0);
}
}
/* MQTT割込: サブスクライブ */
void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
Serial.print("Subscribe now. packetId:");
Serial.println(packetId);
}
/* MQTT割込: サブスクライブ解除 */
void onMqttUnsubscribe(uint16_t packetId) {
Serial.print("Unsubscribe now. packet Id:");
Serial.println(packetId);
}
/* MQTT割込: パブリッシュ */
void onMqttPublish(uint16_t packetId) {
Serial.print("Publish now. packetId:");
Serial.println(packetId);
}
/* MQTT割込: メッセージ受信 */
void onMqttMessage(char* topic, char* payload,
AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
String messageTemp;
int pos = 0;
for (int i = 0; i < len; i++) {
messageTemp += (char)payload[i];
if ((char)payload[i] == ',') {
payload[i] = 0x00;
pos = i + 1;
}
}
payload[len] = 0x00;
Serial.println("Received message.");
Serial.print(" ・Topic: ");
Serial.println(topic);
Serial.print(" ・Message: ");
Serial.println(messageTemp);
Serial.print(" ・QoS: "); Serial.println(properties.qos);
Serial.print(" ・Retain: "); Serial.println(properties.retain);
Serial.print(" ・Len: "); Serial.println(len);
Serial.print(" ・Index: "); Serial.println(index);
Serial.print(" ・Total: "); Serial.println(total);
// トピックがsubscribeMeasuredValueなら受信計測値を編集する
if (strcmp(topic, subscribeMeasuredValue) == 0) {
Serial.print("Receive measured data: ");
Serial.println(messageTemp);
temperature = payload;
humidity = payload + pos;
bReceived = true;
}
// トピックがボタンシグナルならLEDを制御する
else if (strcmp(topic, subscribePushbutton) == 0) {
// 準備ができていれば、LEDが消灯状態なら点灯し点灯状態なら消灯する
if (bReady) {
if (ledState == LOW) {
ledState = HIGH;
Serial.println(" *LED: ON!");
} else {
ledState = LOW;
Serial.println(" *LED: OFF!");
}
}
// 設定したLEDの状態を物理的に反映させる
digitalWrite(ledPin, ledState);
bReady = true;
}
}
4.動作検証
〔計測用デバイス〕
ブローカー、計測用デバイス、監視用デバイスを立ち上げます。
まずは計測用デバイス側のシリアルモニターを起動して、リセットします。一連の接続を確立した後、計測データの送信が始まりシリアルモニターに表示されます。同時に、監視用デバイスのOLEDディスプレイに温度と湿度が表示されます。
計測用デバイスの赤色ボタンを押すと、シリアルモニターに「publishPushbutton」と表示され、監視用デバイスの赤色LEDが点灯します。もう一度押すと同様の内容をシリアルモニターに表示して赤色LEDは消灯します。
監視用デバイスに移動して黄色ボタンを押すと、計測用デバイスの黄色LEDが点灯し、もう一度押すと消灯します。シリアルモニター上には、ボタン押下の受信情報と共にLEDの点滅状態が表示されます。
〔監視用デバイス〕
シリアルモニターを監視用デバイス側に切り替えます。
一連の接続処理を行った後、直ちに初回の計測データとボタン押下情報のサブスクライブが始まります。これは、計測用デバイスで、どちらもRetainフラグをtrueにしてパブリッシュしているためで、ブローカーに残っている前回のパブリッシュしたメッセージを受信しています。シリアルモニター上のRetainは1(true)が表示されています。
続いてRetainがゼロの計測データ、つまり計測用デバイスで現在パブリッシュされた計測データをサブスクライブして表示しています。いずれの場合も、サブスクライブと同時にOLEDディスプレイの温度と湿度の表示が更新されます。
計測用デバイスの赤色ボタンを押すと、監視用ボード上の赤色LEDが点灯して、シリアルモニターにサブスクライブ情報の詳細とLEDの状態が表示されます。もう一度押すと同様の内容をシリアルモニターに表示して赤色LEDは消灯します。
次に監視用ボードの黄色ボタンを押すと、シリアルモニターに「publishPushbutton」を表示してパブリッシュし、計測用ボードの黄色LEDが点灯します。もう一度押すと、同様に表示してLEDは消灯します。
〔通信断絶とLWTの表示〕
計測用デバイスとブローカー間の通信が遮断された場合に、計画通りにLWTで設定した内容が表示されるかどうかを試してみましょう。
正常に通信が行われている状態で、計測用デバイスのUSBケーブルを引き抜きます。約40~45秒経過すると、写真のように、監視用デバイスのOLEDディスプレイにエラーメッセージとデバイス番号が表示されます。
シリアルモニターには、通常の計測データ受信用と同じトピック「device1/measured_value」のMessageに、LWTの本文「*Error!,Device1」を受信していることがわかります。
スケッチ全体は、ページ先頭の[Download]ボタンでダウンロードしてください。
このように、複数の事象の双方向通信を比較的簡単に行えることがわかりました。
次回から赤外線通信を取り入れて、家電製品のリモートコントロールに取り組む予定です。
お楽しみに!