summaryrefslogtreecommitdiff
path: root/include/pio/libs/mqtt
diff options
context:
space:
mode:
Diffstat (limited to 'include/pio/libs/mqtt')
-rw-r--r--include/pio/libs/mqtt/homekit/mqtt/module.cpp26
-rw-r--r--include/pio/libs/mqtt/homekit/mqtt/module.h56
-rw-r--r--include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp162
-rw-r--r--include/pio/libs/mqtt/homekit/mqtt/mqtt.h48
-rw-r--r--include/pio/libs/mqtt/homekit/mqtt/payload.h15
-rw-r--r--include/pio/libs/mqtt/library.json7
6 files changed, 314 insertions, 0 deletions
diff --git a/include/pio/libs/mqtt/homekit/mqtt/module.cpp b/include/pio/libs/mqtt/homekit/mqtt/module.cpp
new file mode 100644
index 0000000..0ac7637
--- /dev/null
+++ b/include/pio/libs/mqtt/homekit/mqtt/module.cpp
@@ -0,0 +1,26 @@
+#include "./module.h"
+#include <homekit/logging.h>
+
+namespace homekit::mqtt {
+
+bool MqttModule::tickElapsed() {
+ if (!tickSw.elapsed(tickInterval*1000))
+ return false;
+
+ tickSw.save();
+ return true;
+}
+
+void MqttModule::handlePayload(Mqtt& mqtt, String& topic, uint16_t packetId, const uint8_t* payload, size_t length,
+ size_t index, size_t total) {
+ if (length != total)
+ PRINTLN("mqtt: received partial message, not supported");
+
+ // TODO
+}
+
+void MqttModule::handleOnPublish(uint16_t packetId) {}
+
+void MqttModule::onDisconnect(Mqtt& mqtt, espMqttClientTypes::DisconnectReason reason) {}
+
+}
diff --git a/include/pio/libs/mqtt/homekit/mqtt/module.h b/include/pio/libs/mqtt/homekit/mqtt/module.h
new file mode 100644
index 0000000..0a328f3
--- /dev/null
+++ b/include/pio/libs/mqtt/homekit/mqtt/module.h
@@ -0,0 +1,56 @@
+#ifndef HOMEKIT_LIB_MQTT_MODULE_H
+#define HOMEKIT_LIB_MQTT_MODULE_H
+
+#include "./mqtt.h"
+#include "./payload.h"
+#include <homekit/stopwatch.h>
+
+
+namespace homekit::mqtt {
+
+class Mqtt;
+
+class MqttModule {
+protected:
+ bool initialized;
+ StopWatch tickSw;
+ short tickInterval;
+
+ bool receiveOnPublish;
+ bool receiveOnDisconnect;
+
+ bool tickElapsed();
+
+public:
+ MqttModule(short _tickInterval, bool _receiveOnPublish = false, bool _receiveOnDisconnect = false)
+ : initialized(false)
+ , tickInterval(_tickInterval)
+ , receiveOnPublish(_receiveOnPublish)
+ , receiveOnDisconnect(_receiveOnDisconnect) {}
+
+ virtual void tick(Mqtt& mqtt) = 0;
+
+ virtual void onConnect(Mqtt& mqtt) = 0;
+ virtual void onDisconnect(Mqtt& mqtt, espMqttClientTypes::DisconnectReason reason);
+
+ virtual void handlePayload(Mqtt& mqtt, String& topic, uint16_t packetId, const uint8_t *payload, size_t length, size_t index, size_t total);
+ virtual void handleOnPublish(uint16_t packetId);
+
+ inline void setInitialized() {
+ initialized = true;
+ }
+
+ inline void unsetInitialized() {
+ initialized = false;
+ }
+
+ inline short getTickInterval() const {
+ return tickInterval;
+ }
+
+ friend class Mqtt;
+};
+
+}
+
+#endif //HOMEKIT_LIB_MQTT_MODULE_H
diff --git a/include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp b/include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp
new file mode 100644
index 0000000..aa769a5
--- /dev/null
+++ b/include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp
@@ -0,0 +1,162 @@
+#include "./mqtt.h"
+
+#include <homekit/config.h>
+#include <homekit/wifi.h>
+#include <homekit/logging.h>
+
+namespace homekit::mqtt {
+
+const uint8_t MQTT_CA_FINGERPRINT[] = { \
+ 0x0e, 0xb6, 0x3a, 0x02, 0x1f, \
+ 0x4e, 0x1e, 0xe1, 0x6a, 0x67, \
+ 0x62, 0xec, 0x64, 0xd4, 0x84, \
+ 0x8a, 0xb0, 0xc9, 0x9c, 0xbb \
+};;
+const char MQTT_SERVER[] = "mqtt.solarmon.ru";
+const uint16_t MQTT_PORT = 8883;
+const char MQTT_USERNAME[] = CONFIG_MQTT_USERNAME;
+const char MQTT_PASSWORD[] = CONFIG_MQTT_PASSWORD;
+const char MQTT_CLIENT_ID[] = CONFIG_MQTT_CLIENT_ID;
+const char MQTT_SECRET[CONFIG_NODE_SECRET_SIZE+1] = CONFIG_NODE_SECRET;
+
+static const uint16_t MQTT_KEEPALIVE = 30;
+
+using namespace espMqttClientTypes;
+
+Mqtt::Mqtt() {
+ auto cfg = config::read();
+ nodeId = String(cfg.flags.node_configured ? cfg.node_id : wifi::NODE_ID);
+
+ randomSeed(micros());
+
+ client.onConnect([&](bool sessionPresent) {
+ PRINTLN("mqtt: connected");
+
+ for (auto* module: modules) {
+ if (!module->initialized) {
+ module->onConnect(*this);
+ module->setInitialized();
+ }
+ }
+
+ connected = true;
+ });
+
+ client.onDisconnect([&](DisconnectReason reason) {
+ PRINTF("mqtt: disconnected, reason=%d\n", static_cast<int>(reason));
+#ifdef DEBUG
+ if (reason == DisconnectReason::TLS_BAD_FINGERPRINT)
+ PRINTLN("reason: bad fingerprint");
+#endif
+
+ for (auto* module: modules) {
+ module->onDisconnect(*this, reason);
+ module->unsetInitialized();
+ }
+
+ reconnectTimer.once(2, [&]() {
+ reconnect();
+ });
+ });
+
+ client.onSubscribe([&](uint16_t packetId, const SubscribeReturncode* returncodes, size_t len) {
+ PRINTF("mqtt: subscribe ack, packet_id=%d\n", packetId);
+ for (size_t i = 0; i < len; i++) {
+ PRINTF(" return code: %u\n", static_cast<unsigned int>(*(returncodes+i)));
+ }
+ });
+
+ client.onUnsubscribe([&](uint16_t packetId) {
+ PRINTF("mqtt: unsubscribe ack, packet_id=%d\n", packetId);
+ });
+
+ client.onMessage([&](const MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) {
+ PRINTF("mqtt: message received, topic=%s, qos=%d, dup=%d, retain=%d, len=%ul, index=%ul, total=%ul\n",
+ topic, properties.qos, (int)properties.dup, (int)properties.retain, len, index, total);
+
+ const char *ptr = topic + nodeId.length() + 4;
+ String relevantTopic(ptr);
+
+ auto it = moduleSubscriptions.find(relevantTopic);
+ if (it != moduleSubscriptions.end()) {
+ auto module = it->second;
+ module->handlePayload(*this, relevantTopic, properties.packetId, payload, len, index, total);
+ } else {
+ PRINTF("error: module subscription for topic %s not found\n", relevantTopic.c_str());
+ }
+ });
+
+ client.onPublish([&](uint16_t packetId) {
+ PRINTF("mqtt: publish ack, packet_id=%d\n", packetId);
+
+ for (auto* module: modules) {
+ if (module->receiveOnPublish) {
+ module->handleOnPublish(packetId);
+ }
+ }
+ });
+
+ client.setServer(MQTT_SERVER, MQTT_PORT);
+ client.setClientId(MQTT_CLIENT_ID);
+ client.setCredentials(MQTT_USERNAME, MQTT_PASSWORD);
+ client.setCleanSession(true);
+ client.setFingerprint(MQTT_CA_FINGERPRINT);
+ client.setKeepAlive(MQTT_KEEPALIVE);
+}
+
+void Mqtt::connect() {
+ reconnect();
+}
+
+void Mqtt::reconnect() {
+ if (client.connected()) {
+ PRINTLN("warning: already connected");
+ return;
+ }
+ client.connect();
+}
+
+void Mqtt::disconnect() {
+ // TODO test how this works???
+ reconnectTimer.detach();
+ client.disconnect();
+}
+
+void Mqtt::loop() {
+ client.loop();
+ for (auto& module: modules) {
+ if (module->getTickInterval() != 0)
+ module->tick(*this);
+ }
+}
+
+uint16_t Mqtt::publish(const String& topic, uint8_t* payload, size_t length) {
+ String fullTopic = "hk/" + nodeId + "/" + topic;
+ return client.publish(fullTopic.c_str(), 1, false, payload, length);
+}
+
+uint16_t Mqtt::subscribe(const String& topic, uint8_t qos) {
+ String fullTopic = "hk/" + nodeId + "/" + topic;
+ PRINTF("mqtt: subscribing to %s...\n", fullTopic.c_str());
+
+ uint16_t packetId = client.subscribe(fullTopic.c_str(), qos);
+ if (!packetId)
+ PRINTF("error: failed to subscribe to %s\n", fullTopic.c_str());
+
+ return packetId;
+}
+
+void Mqtt::addModule(MqttModule* module) {
+ modules.emplace_back(module);
+ if (connected) {
+ module->onConnect(*this);
+ module->setInitialized();
+ }
+}
+
+void Mqtt::subscribeModule(String& topic, MqttModule* module, uint8_t qos) {
+ moduleSubscriptions[topic] = module;
+ subscribe(topic, qos);
+}
+
+}
diff --git a/include/pio/libs/mqtt/homekit/mqtt/mqtt.h b/include/pio/libs/mqtt/homekit/mqtt/mqtt.h
new file mode 100644
index 0000000..9e0c2be
--- /dev/null
+++ b/include/pio/libs/mqtt/homekit/mqtt/mqtt.h
@@ -0,0 +1,48 @@
+#ifndef HOMEKIT_LIB_MQTT_H
+#define HOMEKIT_LIB_MQTT_H
+
+#include <vector>
+#include <map>
+#include <cstdint>
+#include <espMqttClient.h>
+#include <Ticker.h>
+#include "./module.h"
+
+namespace homekit::mqtt {
+
+extern const uint8_t MQTT_CA_FINGERPRINT[];
+extern const char MQTT_SERVER[];
+extern const uint16_t MQTT_PORT;
+extern const char MQTT_USERNAME[];
+extern const char MQTT_PASSWORD[];
+extern const char MQTT_CLIENT_ID[];
+extern const char MQTT_SECRET[CONFIG_NODE_SECRET_SIZE+1];
+
+class MqttModule;
+
+class Mqtt {
+private:
+ String nodeId;
+ WiFiClientSecure httpsSecureClient;
+ espMqttClientSecure client;
+ Ticker reconnectTimer;
+ std::vector<MqttModule*> modules;
+ std::map<String, MqttModule*> moduleSubscriptions;
+ bool connected;
+
+ uint16_t subscribe(const String& topic, uint8_t qos = 0);
+
+public:
+ Mqtt();
+ void connect();
+ void disconnect();
+ void reconnect();
+ void loop();
+ void addModule(MqttModule* module);
+ void subscribeModule(String& topic, MqttModule* module, uint8_t qos = 0);
+ uint16_t publish(const String& topic, uint8_t* payload, size_t length);
+};
+
+}
+
+#endif //HOMEKIT_LIB_MQTT_H
diff --git a/include/pio/libs/mqtt/homekit/mqtt/payload.h b/include/pio/libs/mqtt/homekit/mqtt/payload.h
new file mode 100644
index 0000000..3e0fe0c
--- /dev/null
+++ b/include/pio/libs/mqtt/homekit/mqtt/payload.h
@@ -0,0 +1,15 @@
+#ifndef HOMEKIT_MQTT_PAYLOAD_H
+#define HOMEKIT_MQTT_PAYLOAD_H
+
+#include <unistd.h>
+
+namespace homekit::mqtt {
+
+struct MqttPayload {
+ virtual ~MqttPayload() = default;
+ virtual size_t size() const = 0;
+};
+
+}
+
+#endif \ No newline at end of file
diff --git a/include/pio/libs/mqtt/library.json b/include/pio/libs/mqtt/library.json
new file mode 100644
index 0000000..f3f2504
--- /dev/null
+++ b/include/pio/libs/mqtt/library.json
@@ -0,0 +1,7 @@
+{
+ "name": "homekit_mqtt",
+ "version": "1.0.11",
+ "build": {
+ "flags": "-I../../include"
+ }
+}