diff options
Diffstat (limited to 'platformio/common/libs/mqtt/homekit/mqtt/mqtt.cpp')
-rw-r--r-- | platformio/common/libs/mqtt/homekit/mqtt/mqtt.cpp | 164 |
1 files changed, 164 insertions, 0 deletions
diff --git a/platformio/common/libs/mqtt/homekit/mqtt/mqtt.cpp b/platformio/common/libs/mqtt/homekit/mqtt/mqtt.cpp new file mode 100644 index 0000000..9d96f9f --- /dev/null +++ b/platformio/common/libs/mqtt/homekit/mqtt/mqtt.cpp @@ -0,0 +1,164 @@ +#include "./mqtt.h" + +#include <homekit/config.h> +#include <homekit/wifi.h> +#include <homekit/logging.h> + +namespace homekit::mqtt { + +const uint8_t MQTT_CA_FINGERPRINT[] = { \ + 0x0e, 0xb6, 0x3a, 0x02, 0x1f, \ + 0x4e, 0x1e, 0xe1, 0x6a, 0x67, \ + 0x62, 0xec, 0x64, 0xd4, 0x84, \ + 0x8a, 0xb0, 0xc9, 0x9c, 0xbb \ +};; +const char MQTT_SERVER[] = "mqtt.solarmon.ru"; +const uint16_t MQTT_PORT = 8883; +const char MQTT_USERNAME[] = CONFIG_MQTT_USERNAME; +const char MQTT_PASSWORD[] = CONFIG_MQTT_PASSWORD; +const char MQTT_CLIENT_ID[] = CONFIG_MQTT_CLIENT_ID; +const char MQTT_SECRET[CONFIG_NODE_SECRET_SIZE+1] = CONFIG_NODE_SECRET; + +static const uint16_t MQTT_KEEPALIVE = 30; + +using namespace espMqttClientTypes; + +Mqtt::Mqtt() { + auto cfg = config::read(); + homeId = String(cfg.flags.node_configured ? cfg.node_id : wifi::NODE_ID); + + randomSeed(micros()); + + client.onConnect([&](bool sessionPresent) { + PRINTLN("mqtt: connected"); + + for (auto* module: modules) { + if (!module->initialized) { + module->init(*this); + module->setInitialized(); + } + } + + connected = true; + }); + + client.onDisconnect([&](DisconnectReason reason) { + PRINTF("mqtt: disconnected, reason=%d\n", static_cast<int>(reason)); +#ifdef DEBUG + if (reason == DisconnectReason::TLS_BAD_FINGERPRINT) + PRINTLN("reason: bad fingerprint"); +#endif + + for (auto* module: modules) { + if (module->receiveOnDisconnect) { + module->handleOnDisconnect(reason); + } + } + +// if (ota.readyToRestart) { +// restartTimer.once(1, restart); +// } else { + reconnectTimer.once(2, [&]() { + reconnect(); + }); +// } + }); + + client.onSubscribe([&](uint16_t packetId, const SubscribeReturncode* returncodes, size_t len) { + PRINTF("mqtt: subscribe ack, packet_id=%d\n", packetId); + for (size_t i = 0; i < len; i++) { + PRINTF(" return code: %u\n", static_cast<unsigned int>(*(returncodes+i))); + } + }); + + client.onUnsubscribe([&](uint16_t packetId) { + PRINTF("mqtt: unsubscribe ack, packet_id=%d\n", packetId); + }); + + client.onMessage([&](const MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + PRINTF("mqtt: message received, topic=%s, qos=%d, dup=%d, retain=%d, len=%ul, index=%ul, total=%ul\n", + topic, properties.qos, (int)properties.dup, (int)properties.retain, len, index, total); + + const char *ptr = topic + homeId.length() + 10; + String relevantTopic(ptr); + + auto it = moduleSubscriptions.find(relevantTopic); + if (it != moduleSubscriptions.end()) { + auto module = it->second; + module->handlePayload(*this, relevantTopic, properties.packetId, payload, len, index, total); + } else { + PRINTF("error: module subscription for topic %s not found\n", topic); + } + }); + + client.onPublish([&](uint16_t packetId) { + PRINTF("mqtt: publish ack, packet_id=%d\n", packetId); + + for (auto* module: modules) { + if (module->receiveOnPublish) { + module->handleOnPublish(packetId); + } + } + }); + + client.setServer(MQTT_SERVER, MQTT_PORT); + client.setClientId(MQTT_CLIENT_ID); + client.setCredentials(MQTT_USERNAME, MQTT_PASSWORD); + client.setCleanSession(true); + client.setFingerprint(MQTT_CA_FINGERPRINT); + client.setKeepAlive(MQTT_KEEPALIVE); +} + +void Mqtt::connect() { + reconnect(); +} + +void Mqtt::reconnect() { + if (client.connected()) { + PRINTLN("warning: already connected"); + return; + } + client.connect(); +} + +void Mqtt::disconnect() { + // TODO test how this works??? + reconnectTimer.detach(); + client.disconnect(); +} + +void Mqtt::loop() { + client.loop(); + for (auto& module: modules) { + module->tick(*this); + } +} + +uint16_t Mqtt::publish(const String& topic, uint8_t* payload, size_t length) { + String fullTopic = "hk/" + homeId + "/temphum/" + topic; + return client.publish(fullTopic.c_str(), 1, false, payload, length); +} + +uint16_t Mqtt::subscribe(const String& topic, uint8_t qos) { + String fullTopic = "hk/" + homeId + "/temphum/" + topic; + PRINTF("mqtt: subscribing to %s...\n", fullTopic.c_str()); + + uint16_t packetId = client.subscribe(fullTopic.c_str(), qos); + if (!packetId) + PRINTF("error: failed to subscribe to %s\n", fullTopic.c_str()); + return packetId; +} + +void Mqtt::addModule(MqttModule* module) { + modules.emplace_back(module); + if (connected) { + module->init(*this); + module->setInitialized(); + } +} + +void Mqtt::subscribeModule(String& topic, MqttModule* module) { + moduleSubscriptions[topic] = module; +} + +} |