aboutsummaryrefslogtreecommitdiff
path: root/include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp')
-rw-r--r--include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp162
1 files changed, 162 insertions, 0 deletions
diff --git a/include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp b/include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp
new file mode 100644
index 0000000..aa769a5
--- /dev/null
+++ b/include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp
@@ -0,0 +1,162 @@
+#include "./mqtt.h"
+
+#include <homekit/config.h>
+#include <homekit/wifi.h>
+#include <homekit/logging.h>
+
+namespace homekit::mqtt {
+
+const uint8_t MQTT_CA_FINGERPRINT[] = { \
+ 0x0e, 0xb6, 0x3a, 0x02, 0x1f, \
+ 0x4e, 0x1e, 0xe1, 0x6a, 0x67, \
+ 0x62, 0xec, 0x64, 0xd4, 0x84, \
+ 0x8a, 0xb0, 0xc9, 0x9c, 0xbb \
+};;
+const char MQTT_SERVER[] = "mqtt.solarmon.ru";
+const uint16_t MQTT_PORT = 8883;
+const char MQTT_USERNAME[] = CONFIG_MQTT_USERNAME;
+const char MQTT_PASSWORD[] = CONFIG_MQTT_PASSWORD;
+const char MQTT_CLIENT_ID[] = CONFIG_MQTT_CLIENT_ID;
+const char MQTT_SECRET[CONFIG_NODE_SECRET_SIZE+1] = CONFIG_NODE_SECRET;
+
+static const uint16_t MQTT_KEEPALIVE = 30;
+
+using namespace espMqttClientTypes;
+
+Mqtt::Mqtt() {
+ auto cfg = config::read();
+ nodeId = String(cfg.flags.node_configured ? cfg.node_id : wifi::NODE_ID);
+
+ randomSeed(micros());
+
+ client.onConnect([&](bool sessionPresent) {
+ PRINTLN("mqtt: connected");
+
+ for (auto* module: modules) {
+ if (!module->initialized) {
+ module->onConnect(*this);
+ module->setInitialized();
+ }
+ }
+
+ connected = true;
+ });
+
+ client.onDisconnect([&](DisconnectReason reason) {
+ PRINTF("mqtt: disconnected, reason=%d\n", static_cast<int>(reason));
+#ifdef DEBUG
+ if (reason == DisconnectReason::TLS_BAD_FINGERPRINT)
+ PRINTLN("reason: bad fingerprint");
+#endif
+
+ for (auto* module: modules) {
+ module->onDisconnect(*this, reason);
+ module->unsetInitialized();
+ }
+
+ reconnectTimer.once(2, [&]() {
+ reconnect();
+ });
+ });
+
+ client.onSubscribe([&](uint16_t packetId, const SubscribeReturncode* returncodes, size_t len) {
+ PRINTF("mqtt: subscribe ack, packet_id=%d\n", packetId);
+ for (size_t i = 0; i < len; i++) {
+ PRINTF(" return code: %u\n", static_cast<unsigned int>(*(returncodes+i)));
+ }
+ });
+
+ client.onUnsubscribe([&](uint16_t packetId) {
+ PRINTF("mqtt: unsubscribe ack, packet_id=%d\n", packetId);
+ });
+
+ client.onMessage([&](const MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) {
+ PRINTF("mqtt: message received, topic=%s, qos=%d, dup=%d, retain=%d, len=%ul, index=%ul, total=%ul\n",
+ topic, properties.qos, (int)properties.dup, (int)properties.retain, len, index, total);
+
+ const char *ptr = topic + nodeId.length() + 4;
+ String relevantTopic(ptr);
+
+ auto it = moduleSubscriptions.find(relevantTopic);
+ if (it != moduleSubscriptions.end()) {
+ auto module = it->second;
+ module->handlePayload(*this, relevantTopic, properties.packetId, payload, len, index, total);
+ } else {
+ PRINTF("error: module subscription for topic %s not found\n", relevantTopic.c_str());
+ }
+ });
+
+ client.onPublish([&](uint16_t packetId) {
+ PRINTF("mqtt: publish ack, packet_id=%d\n", packetId);
+
+ for (auto* module: modules) {
+ if (module->receiveOnPublish) {
+ module->handleOnPublish(packetId);
+ }
+ }
+ });
+
+ client.setServer(MQTT_SERVER, MQTT_PORT);
+ client.setClientId(MQTT_CLIENT_ID);
+ client.setCredentials(MQTT_USERNAME, MQTT_PASSWORD);
+ client.setCleanSession(true);
+ client.setFingerprint(MQTT_CA_FINGERPRINT);
+ client.setKeepAlive(MQTT_KEEPALIVE);
+}
+
+void Mqtt::connect() {
+ reconnect();
+}
+
+void Mqtt::reconnect() {
+ if (client.connected()) {
+ PRINTLN("warning: already connected");
+ return;
+ }
+ client.connect();
+}
+
+void Mqtt::disconnect() {
+ // TODO test how this works???
+ reconnectTimer.detach();
+ client.disconnect();
+}
+
+void Mqtt::loop() {
+ client.loop();
+ for (auto& module: modules) {
+ if (module->getTickInterval() != 0)
+ module->tick(*this);
+ }
+}
+
+uint16_t Mqtt::publish(const String& topic, uint8_t* payload, size_t length) {
+ String fullTopic = "hk/" + nodeId + "/" + topic;
+ return client.publish(fullTopic.c_str(), 1, false, payload, length);
+}
+
+uint16_t Mqtt::subscribe(const String& topic, uint8_t qos) {
+ String fullTopic = "hk/" + nodeId + "/" + topic;
+ PRINTF("mqtt: subscribing to %s...\n", fullTopic.c_str());
+
+ uint16_t packetId = client.subscribe(fullTopic.c_str(), qos);
+ if (!packetId)
+ PRINTF("error: failed to subscribe to %s\n", fullTopic.c_str());
+
+ return packetId;
+}
+
+void Mqtt::addModule(MqttModule* module) {
+ modules.emplace_back(module);
+ if (connected) {
+ module->onConnect(*this);
+ module->setInitialized();
+ }
+}
+
+void Mqtt::subscribeModule(String& topic, MqttModule* module, uint8_t qos) {
+ moduleSubscriptions[topic] = module;
+ subscribe(topic, qos);
+}
+
+}