diff options
Diffstat (limited to 'include/pio/libs/mqtt')
-rw-r--r-- | include/pio/libs/mqtt/homekit/mqtt/module.cpp | 26 | ||||
-rw-r--r-- | include/pio/libs/mqtt/homekit/mqtt/module.h | 56 | ||||
-rw-r--r-- | include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp | 162 | ||||
-rw-r--r-- | include/pio/libs/mqtt/homekit/mqtt/mqtt.h | 48 | ||||
-rw-r--r-- | include/pio/libs/mqtt/homekit/mqtt/payload.h | 15 | ||||
-rw-r--r-- | include/pio/libs/mqtt/library.json | 7 |
6 files changed, 314 insertions, 0 deletions
diff --git a/include/pio/libs/mqtt/homekit/mqtt/module.cpp b/include/pio/libs/mqtt/homekit/mqtt/module.cpp new file mode 100644 index 0000000..0ac7637 --- /dev/null +++ b/include/pio/libs/mqtt/homekit/mqtt/module.cpp @@ -0,0 +1,26 @@ +#include "./module.h" +#include <homekit/logging.h> + +namespace homekit::mqtt { + +bool MqttModule::tickElapsed() { + if (!tickSw.elapsed(tickInterval*1000)) + return false; + + tickSw.save(); + return true; +} + +void MqttModule::handlePayload(Mqtt& mqtt, String& topic, uint16_t packetId, const uint8_t* payload, size_t length, + size_t index, size_t total) { + if (length != total) + PRINTLN("mqtt: received partial message, not supported"); + + // TODO +} + +void MqttModule::handleOnPublish(uint16_t packetId) {} + +void MqttModule::onDisconnect(Mqtt& mqtt, espMqttClientTypes::DisconnectReason reason) {} + +} diff --git a/include/pio/libs/mqtt/homekit/mqtt/module.h b/include/pio/libs/mqtt/homekit/mqtt/module.h new file mode 100644 index 0000000..0a328f3 --- /dev/null +++ b/include/pio/libs/mqtt/homekit/mqtt/module.h @@ -0,0 +1,56 @@ +#ifndef HOMEKIT_LIB_MQTT_MODULE_H +#define HOMEKIT_LIB_MQTT_MODULE_H + +#include "./mqtt.h" +#include "./payload.h" +#include <homekit/stopwatch.h> + + +namespace homekit::mqtt { + +class Mqtt; + +class MqttModule { +protected: + bool initialized; + StopWatch tickSw; + short tickInterval; + + bool receiveOnPublish; + bool receiveOnDisconnect; + + bool tickElapsed(); + +public: + MqttModule(short _tickInterval, bool _receiveOnPublish = false, bool _receiveOnDisconnect = false) + : initialized(false) + , tickInterval(_tickInterval) + , receiveOnPublish(_receiveOnPublish) + , receiveOnDisconnect(_receiveOnDisconnect) {} + + virtual void tick(Mqtt& mqtt) = 0; + + virtual void onConnect(Mqtt& mqtt) = 0; + virtual void onDisconnect(Mqtt& mqtt, espMqttClientTypes::DisconnectReason reason); + + virtual void handlePayload(Mqtt& mqtt, String& topic, uint16_t packetId, const uint8_t *payload, size_t length, size_t index, size_t total); + virtual void handleOnPublish(uint16_t packetId); + + inline void setInitialized() { + initialized = true; + } + + inline void unsetInitialized() { + initialized = false; + } + + inline short getTickInterval() const { + return tickInterval; + } + + friend class Mqtt; +}; + +} + +#endif //HOMEKIT_LIB_MQTT_MODULE_H diff --git a/include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp b/include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp new file mode 100644 index 0000000..aa769a5 --- /dev/null +++ b/include/pio/libs/mqtt/homekit/mqtt/mqtt.cpp @@ -0,0 +1,162 @@ +#include "./mqtt.h" + +#include <homekit/config.h> +#include <homekit/wifi.h> +#include <homekit/logging.h> + +namespace homekit::mqtt { + +const uint8_t MQTT_CA_FINGERPRINT[] = { \ + 0x0e, 0xb6, 0x3a, 0x02, 0x1f, \ + 0x4e, 0x1e, 0xe1, 0x6a, 0x67, \ + 0x62, 0xec, 0x64, 0xd4, 0x84, \ + 0x8a, 0xb0, 0xc9, 0x9c, 0xbb \ +};; +const char MQTT_SERVER[] = "mqtt.solarmon.ru"; +const uint16_t MQTT_PORT = 8883; +const char MQTT_USERNAME[] = CONFIG_MQTT_USERNAME; +const char MQTT_PASSWORD[] = CONFIG_MQTT_PASSWORD; +const char MQTT_CLIENT_ID[] = CONFIG_MQTT_CLIENT_ID; +const char MQTT_SECRET[CONFIG_NODE_SECRET_SIZE+1] = CONFIG_NODE_SECRET; + +static const uint16_t MQTT_KEEPALIVE = 30; + +using namespace espMqttClientTypes; + +Mqtt::Mqtt() { + auto cfg = config::read(); + nodeId = String(cfg.flags.node_configured ? cfg.node_id : wifi::NODE_ID); + + randomSeed(micros()); + + client.onConnect([&](bool sessionPresent) { + PRINTLN("mqtt: connected"); + + for (auto* module: modules) { + if (!module->initialized) { + module->onConnect(*this); + module->setInitialized(); + } + } + + connected = true; + }); + + client.onDisconnect([&](DisconnectReason reason) { + PRINTF("mqtt: disconnected, reason=%d\n", static_cast<int>(reason)); +#ifdef DEBUG + if (reason == DisconnectReason::TLS_BAD_FINGERPRINT) + PRINTLN("reason: bad fingerprint"); +#endif + + for (auto* module: modules) { + module->onDisconnect(*this, reason); + module->unsetInitialized(); + } + + reconnectTimer.once(2, [&]() { + reconnect(); + }); + }); + + client.onSubscribe([&](uint16_t packetId, const SubscribeReturncode* returncodes, size_t len) { + PRINTF("mqtt: subscribe ack, packet_id=%d\n", packetId); + for (size_t i = 0; i < len; i++) { + PRINTF(" return code: %u\n", static_cast<unsigned int>(*(returncodes+i))); + } + }); + + client.onUnsubscribe([&](uint16_t packetId) { + PRINTF("mqtt: unsubscribe ack, packet_id=%d\n", packetId); + }); + + client.onMessage([&](const MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { + PRINTF("mqtt: message received, topic=%s, qos=%d, dup=%d, retain=%d, len=%ul, index=%ul, total=%ul\n", + topic, properties.qos, (int)properties.dup, (int)properties.retain, len, index, total); + + const char *ptr = topic + nodeId.length() + 4; + String relevantTopic(ptr); + + auto it = moduleSubscriptions.find(relevantTopic); + if (it != moduleSubscriptions.end()) { + auto module = it->second; + module->handlePayload(*this, relevantTopic, properties.packetId, payload, len, index, total); + } else { + PRINTF("error: module subscription for topic %s not found\n", relevantTopic.c_str()); + } + }); + + client.onPublish([&](uint16_t packetId) { + PRINTF("mqtt: publish ack, packet_id=%d\n", packetId); + + for (auto* module: modules) { + if (module->receiveOnPublish) { + module->handleOnPublish(packetId); + } + } + }); + + client.setServer(MQTT_SERVER, MQTT_PORT); + client.setClientId(MQTT_CLIENT_ID); + client.setCredentials(MQTT_USERNAME, MQTT_PASSWORD); + client.setCleanSession(true); + client.setFingerprint(MQTT_CA_FINGERPRINT); + client.setKeepAlive(MQTT_KEEPALIVE); +} + +void Mqtt::connect() { + reconnect(); +} + +void Mqtt::reconnect() { + if (client.connected()) { + PRINTLN("warning: already connected"); + return; + } + client.connect(); +} + +void Mqtt::disconnect() { + // TODO test how this works??? + reconnectTimer.detach(); + client.disconnect(); +} + +void Mqtt::loop() { + client.loop(); + for (auto& module: modules) { + if (module->getTickInterval() != 0) + module->tick(*this); + } +} + +uint16_t Mqtt::publish(const String& topic, uint8_t* payload, size_t length) { + String fullTopic = "hk/" + nodeId + "/" + topic; + return client.publish(fullTopic.c_str(), 1, false, payload, length); +} + +uint16_t Mqtt::subscribe(const String& topic, uint8_t qos) { + String fullTopic = "hk/" + nodeId + "/" + topic; + PRINTF("mqtt: subscribing to %s...\n", fullTopic.c_str()); + + uint16_t packetId = client.subscribe(fullTopic.c_str(), qos); + if (!packetId) + PRINTF("error: failed to subscribe to %s\n", fullTopic.c_str()); + + return packetId; +} + +void Mqtt::addModule(MqttModule* module) { + modules.emplace_back(module); + if (connected) { + module->onConnect(*this); + module->setInitialized(); + } +} + +void Mqtt::subscribeModule(String& topic, MqttModule* module, uint8_t qos) { + moduleSubscriptions[topic] = module; + subscribe(topic, qos); +} + +} diff --git a/include/pio/libs/mqtt/homekit/mqtt/mqtt.h b/include/pio/libs/mqtt/homekit/mqtt/mqtt.h new file mode 100644 index 0000000..9e0c2be --- /dev/null +++ b/include/pio/libs/mqtt/homekit/mqtt/mqtt.h @@ -0,0 +1,48 @@ +#ifndef HOMEKIT_LIB_MQTT_H +#define HOMEKIT_LIB_MQTT_H + +#include <vector> +#include <map> +#include <cstdint> +#include <espMqttClient.h> +#include <Ticker.h> +#include "./module.h" + +namespace homekit::mqtt { + +extern const uint8_t MQTT_CA_FINGERPRINT[]; +extern const char MQTT_SERVER[]; +extern const uint16_t MQTT_PORT; +extern const char MQTT_USERNAME[]; +extern const char MQTT_PASSWORD[]; +extern const char MQTT_CLIENT_ID[]; +extern const char MQTT_SECRET[CONFIG_NODE_SECRET_SIZE+1]; + +class MqttModule; + +class Mqtt { +private: + String nodeId; + WiFiClientSecure httpsSecureClient; + espMqttClientSecure client; + Ticker reconnectTimer; + std::vector<MqttModule*> modules; + std::map<String, MqttModule*> moduleSubscriptions; + bool connected; + + uint16_t subscribe(const String& topic, uint8_t qos = 0); + +public: + Mqtt(); + void connect(); + void disconnect(); + void reconnect(); + void loop(); + void addModule(MqttModule* module); + void subscribeModule(String& topic, MqttModule* module, uint8_t qos = 0); + uint16_t publish(const String& topic, uint8_t* payload, size_t length); +}; + +} + +#endif //HOMEKIT_LIB_MQTT_H diff --git a/include/pio/libs/mqtt/homekit/mqtt/payload.h b/include/pio/libs/mqtt/homekit/mqtt/payload.h new file mode 100644 index 0000000..3e0fe0c --- /dev/null +++ b/include/pio/libs/mqtt/homekit/mqtt/payload.h @@ -0,0 +1,15 @@ +#ifndef HOMEKIT_MQTT_PAYLOAD_H +#define HOMEKIT_MQTT_PAYLOAD_H + +#include <unistd.h> + +namespace homekit::mqtt { + +struct MqttPayload { + virtual ~MqttPayload() = default; + virtual size_t size() const = 0; +}; + +} + +#endif
\ No newline at end of file diff --git a/include/pio/libs/mqtt/library.json b/include/pio/libs/mqtt/library.json new file mode 100644 index 0000000..f3f2504 --- /dev/null +++ b/include/pio/libs/mqtt/library.json @@ -0,0 +1,7 @@ +{ + "name": "homekit_mqtt", + "version": "1.0.11", + "build": { + "flags": "-I../../include" + } +} |