From 0f0a5fd44867b684bcbafd102b6b769ae61229b4 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Wed, 31 May 2023 09:22:00 +0300 Subject: wip --- src/esp_mqtt_util.py | 42 ---------- src/home/media/__init__.py | 1 + src/home/mqtt/__init__.py | 13 ++- src/home/mqtt/_module.py | 34 ++++++++ src/home/mqtt/_node.py | 95 ++++++++++++++++++++++ src/home/mqtt/_payload.py | 145 ++++++++++++++++++++++++++++++++++ src/home/mqtt/esp.py | 106 ------------------------- src/home/mqtt/module/diagnostics.py | 61 ++++++++++++++ src/home/mqtt/module/inverter.py | 77 ++++++++++++++++++ src/home/mqtt/module/ota.py | 67 ++++++++++++++++ src/home/mqtt/module/relay.py | 82 +++++++++++++++++++ src/home/mqtt/module/temphum.py | 57 +++++++++++++ src/home/mqtt/mqtt.py | 24 +++++- src/home/mqtt/payload/__init__.py | 1 - src/home/mqtt/payload/base_payload.py | 145 ---------------------------------- src/home/mqtt/payload/esp.py | 78 ------------------ src/home/mqtt/payload/inverter.py | 73 ----------------- src/home/mqtt/payload/relay.py | 22 ------ src/home/mqtt/payload/sensors.py | 20 ----- src/home/mqtt/payload/temphum.py | 15 ---- src/home/mqtt/relay.py | 86 +++++++++----------- src/home/mqtt/temphum.py | 54 ------------- src/home/mqtt/util.py | 30 +++++++ src/home/pio/products.py | 4 - src/mqtt_node_util.py | 58 ++++++++++++++ src/pio_ini.py | 10 ++- src/pump_bot.py | 143 +++++++++++++++++++++++++++++---- src/pump_mqtt_bot.py | 25 +++--- 28 files changed, 924 insertions(+), 644 deletions(-) delete mode 100755 src/esp_mqtt_util.py create mode 100644 src/home/mqtt/_module.py create mode 100644 src/home/mqtt/_node.py create mode 100644 src/home/mqtt/_payload.py delete mode 100644 src/home/mqtt/esp.py create mode 100644 src/home/mqtt/module/diagnostics.py create mode 100644 src/home/mqtt/module/inverter.py create mode 100644 src/home/mqtt/module/ota.py create mode 100644 src/home/mqtt/module/relay.py create mode 100644 src/home/mqtt/module/temphum.py delete mode 100644 src/home/mqtt/payload/__init__.py delete mode 100644 src/home/mqtt/payload/base_payload.py delete mode 100644 src/home/mqtt/payload/esp.py delete mode 100644 src/home/mqtt/payload/inverter.py delete mode 100644 src/home/mqtt/payload/relay.py delete mode 100644 src/home/mqtt/payload/sensors.py delete mode 100644 src/home/mqtt/payload/temphum.py delete mode 100644 src/home/mqtt/temphum.py create mode 100755 src/mqtt_node_util.py (limited to 'src') diff --git a/src/esp_mqtt_util.py b/src/esp_mqtt_util.py deleted file mode 100755 index 263128c..0000000 --- a/src/esp_mqtt_util.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python3 -from typing import Optional -from argparse import ArgumentParser -from enum import Enum - -from home.config import config -from home.mqtt import MqttRelay -from home.mqtt.esp import MqttEspBase -from home.mqtt.temphum import MqttTempHum -from home.mqtt.esp import MqttEspDevice - -mqtt_client: Optional[MqttEspBase] = None - - -class NodeType(Enum): - RELAY = 'relay' - TEMPHUM = 'temphum' - - -if __name__ == '__main__': - parser = ArgumentParser() - parser.add_argument('--device-id', type=str, required=True) - parser.add_argument('--type', type=str, required=True, - choices=[i.name.lower() for i in NodeType]) - - config.load('mqtt_util', parser=parser) - arg = parser.parse_args() - - mqtt_node_type = NodeType(arg.type) - devices = MqttEspDevice(id=arg.device_id) - - if mqtt_node_type == NodeType.RELAY: - mqtt_client = MqttRelay(devices=devices) - elif mqtt_node_type == NodeType.TEMPHUM: - mqtt_client = MqttTempHum(devices=devices) - - mqtt_client.set_message_callback(lambda device_id, payload: print(payload)) - mqtt_client.configure_tls() - try: - mqtt_client.connect_and_loop() - except KeyboardInterrupt: - mqtt_client.disconnect() diff --git a/src/home/media/__init__.py b/src/home/media/__init__.py index 976c990..6923105 100644 --- a/src/home/media/__init__.py +++ b/src/home/media/__init__.py @@ -12,6 +12,7 @@ __map__ = { __all__ = list(itertools.chain(*__map__.values())) + def __getattr__(name): if name in __all__: for file, names in __map__.items(): diff --git a/src/home/mqtt/__init__.py b/src/home/mqtt/__init__.py index 982e2b6..c95061f 100644 --- a/src/home/mqtt/__init__.py +++ b/src/home/mqtt/__init__.py @@ -1,4 +1,9 @@ -from .mqtt import MqttBase -from .util import poll_tick -from .relay import MqttRelay, MqttRelayState -from .temphum import MqttTempHum \ No newline at end of file +from .mqtt import MqttBase, MqttPayload, MqttPayloadCustomField +from ._node import MqttNode +from ._module import MqttModule +from .util import ( + poll_tick, + get_modules as get_mqtt_modules, + import_module as import_mqtt_module, + add_module as add_mqtt_module +) \ No newline at end of file diff --git a/src/home/mqtt/_module.py b/src/home/mqtt/_module.py new file mode 100644 index 0000000..ef50e70 --- /dev/null +++ b/src/home/mqtt/_module.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import abc +import logging + +from typing import TYPE_CHECKING, Optional +if TYPE_CHECKING: + from ._node import MqttNode + from ._payload import MqttPayload + + +class MqttModule(abc.ABC): + tick_interval: int + _initialized: bool + + def __init__(self, tick_interval=0): + self.tick_interval = tick_interval + self._initialized = False + self._logger = logging.getLogger(self.__class__.__name__) + + def init(self, mqtt: MqttNode): + pass + + def is_initialized(self): + return self._initialized + + def set_initialized(self): + self._initialized = True + + def tick(self, mqtt: MqttNode): + pass + + def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: + pass diff --git a/src/home/mqtt/_node.py b/src/home/mqtt/_node.py new file mode 100644 index 0000000..688b30b --- /dev/null +++ b/src/home/mqtt/_node.py @@ -0,0 +1,95 @@ +import paho.mqtt.client as mqtt + +from .mqtt import MqttBase +from typing import List +from ._module import MqttModule +from ._payload import MqttPayload + + +class MqttNode(MqttBase): + _modules: List[MqttModule] + _module_subscriptions: dict[str, MqttModule] + _node_id: str + _payload_callbacks: list[callable] + # _devices: list[MqttEspDevice] + # _message_callback: Optional[callable] + # _ota_publish_callback: Optional[callable] + + def __init__(self, + node_id: str, + # devices: Union[MqttEspDevice, list[MqttEspDevice]] + ): + super().__init__(clean_session=True) + self._modules = [] + self._module_subscriptions = {} + self._node_id = node_id + self._payload_callbacks = [] + # if not isinstance(devices, list): + # devices = [devices] + # self._devices = devices + # self._message_callback = None + # self._ota_publish_callback = None + # self._ota_mid = None + + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + for module in self._modules: + if not module.is_initialized(): + module.init(self) + module.set_initialized() + + def on_publish(self, client: mqtt.Client, userdata, mid): + pass # FIXME + # if self._ota_mid is not None and mid == self._ota_mid and self._ota_publish_callback: + # self._ota_publish_callback() + + def on_message(self, client: mqtt.Client, userdata, msg): + try: + topic = msg.topic + actual_topic = topic[len(f'hk/{self._node_id}/'):] + + if actual_topic in self._module_subscriptions: + payload = self._module_subscriptions[actual_topic].handle_payload(self, actual_topic, msg.payload) + if isinstance(payload, MqttPayload): + for f in self._payload_callbacks: + f(payload) + + except Exception as e: + self._logger.exception(str(e)) + + # def push_ota(self, + # device_id, + # filename: str, + # publish_callback: callable, + # qos: int): + # device = next(d for d in self._devices if d.id == device_id) + # assert device.secret is not None, 'device secret not specified' + # + # self._ota_publish_callback = publish_callback + # payload = OtaPayload(secret=device.secret, filename=filename) + # publish_result = self._client.publish(f'hk/{device.id}/{self.TOPIC_LEAF}/admin/ota', + # payload=payload.pack(), + # qos=qos) + # self._ota_mid = publish_result.mid + # self._client.loop_write() + # + # @classmethod + # def get_mqtt_topics(cls, additional_topics: Optional[list[str]] = None): + # return rf'^hk/(.*?)/{cls.TOPIC_LEAF}/(stat|stat1|otares'+('|'+('|'.join(additional_topics)) if additional_topics else '')+')$' + + def add_module(self, module: MqttModule): + self._modules.append(module) + if self._connected: + module.init(self) + module.set_initialized() + + def subscribe_module(self, topic: str, module: MqttModule, qos: int = 1): + self._module_subscriptions[topic] = module + self._client.subscribe(f'hk/{self._node_id}/{topic}', qos) + + def publish(self, topic: str, payload: bytes, qos: int = 1): + self._client.publish(f'hk/{self._node_id}/{topic}', payload, qos) + self._client.loop_write() + + def add_payload_callback(self, callback: callable): + self._payload_callbacks.append(callback) \ No newline at end of file diff --git a/src/home/mqtt/_payload.py b/src/home/mqtt/_payload.py new file mode 100644 index 0000000..58eeae3 --- /dev/null +++ b/src/home/mqtt/_payload.py @@ -0,0 +1,145 @@ +import struct +import abc +import re + +from typing import Optional, Tuple + + +def pldstr(self) -> str: + attrs = [] + for field in self.__class__.__annotations__: + if hasattr(self, field): + attr = getattr(self, field) + attrs.append(f'{field}={attr}') + if attrs: + attrs_s = ' ' + attrs_s += ', '.join(attrs) + else: + attrs_s = '' + return f'<%s{attrs_s}>' % (self.__class__.__name__,) + + +class MqttPayload(abc.ABC): + FORMAT = '' + PACKER = {} + UNPACKER = {} + + def __init__(self, **kwargs): + for field in self.__class__.__annotations__: + setattr(self, field, kwargs[field]) + + def pack(self): + args = [] + bf_number = -1 + bf_arg = 0 + bf_progress = 0 + + for field, field_type in self.__class__.__annotations__.items(): + bfp = _bit_field_params(field_type) + if bfp: + n, s, b = bfp + if n != bf_number: + if bf_number != -1: + args.append(bf_arg) + bf_number = n + bf_progress = 0 + bf_arg = 0 + bf_arg |= (getattr(self, field) & (2 ** b - 1)) << bf_progress + bf_progress += b + + else: + if bf_number != -1: + args.append(bf_arg) + bf_number = -1 + bf_progress = 0 + bf_arg = 0 + + args.append(self._pack_field(field)) + + if bf_number != -1: + args.append(bf_arg) + + return struct.pack(self.FORMAT, *args) + + @classmethod + def unpack(cls, buf: bytes): + data = struct.unpack(cls.FORMAT, buf) + kwargs = {} + i = 0 + bf_number = -1 + bf_progress = 0 + + for field, field_type in cls.__annotations__.items(): + bfp = _bit_field_params(field_type) + if bfp: + n, s, b = bfp + if n != bf_number: + bf_number = n + bf_progress = 0 + kwargs[field] = (data[i] >> bf_progress) & (2 ** b - 1) + bf_progress += b + continue # don't increment i + + if bf_number != -1: + bf_number = -1 + i += 1 + + if issubclass(field_type, MqttPayloadCustomField): + kwargs[field] = field_type.unpack(data[i]) + else: + kwargs[field] = cls._unpack_field(field, data[i]) + i += 1 + + return cls(**kwargs) + + def _pack_field(self, name): + val = getattr(self, name) + if self.PACKER and name in self.PACKER: + return self.PACKER[name](val) + else: + return val + + @classmethod + def _unpack_field(cls, name, val): + if isinstance(val, MqttPayloadCustomField): + return + if cls.UNPACKER and name in cls.UNPACKER: + return cls.UNPACKER[name](val) + else: + return val + + def __str__(self): + return pldstr(self) + + +class MqttPayloadCustomField(abc.ABC): + def __init__(self, **kwargs): + for field in self.__class__.__annotations__: + setattr(self, field, kwargs[field]) + + @abc.abstractmethod + def __index__(self): + pass + + @classmethod + @abc.abstractmethod + def unpack(cls, *args, **kwargs): + pass + + def __str__(self): + return pldstr(self) + + +def bit_field(seq_no: int, total_bits: int, bits: int): + return type(f'MQTTPayloadBitField_{seq_no}_{total_bits}_{bits}', (object,), { + 'seq_no': seq_no, + 'total_bits': total_bits, + 'bits': bits + }) + + +def _bit_field_params(cl) -> Optional[Tuple[int, ...]]: + match = re.match(r'MQTTPayloadBitField_(\d+)_(\d+)_(\d)$', cl.__name__) + if match is not None: + return tuple([int(match.group(i)) for i in range(1, 4)]) + return None \ No newline at end of file diff --git a/src/home/mqtt/esp.py b/src/home/mqtt/esp.py deleted file mode 100644 index 56ced83..0000000 --- a/src/home/mqtt/esp.py +++ /dev/null @@ -1,106 +0,0 @@ -import re -import paho.mqtt.client as mqtt - -from .mqtt import MqttBase -from typing import Optional, Union -from .payload.esp import ( - OTAPayload, - OTAResultPayload, - DiagnosticsPayload, - InitialDiagnosticsPayload -) - - -class MqttEspDevice: - id: str - secret: Optional[str] - - def __init__(self, id: str, secret: Optional[str] = None): - self.id = id - self.secret = secret - - -class MqttEspBase(MqttBase): - _devices: list[MqttEspDevice] - _message_callback: Optional[callable] - _ota_publish_callback: Optional[callable] - - TOPIC_LEAF = 'esp' - - def __init__(self, - devices: Union[MqttEspDevice, list[MqttEspDevice]], - subscribe_to_updates=True): - super().__init__(clean_session=True) - if not isinstance(devices, list): - devices = [devices] - self._devices = devices - self._message_callback = None - self._ota_publish_callback = None - self._subscribe_to_updates = subscribe_to_updates - self._ota_mid = None - - def on_connect(self, client: mqtt.Client, userdata, flags, rc): - super().on_connect(client, userdata, flags, rc) - - if self._subscribe_to_updates: - for device in self._devices: - topic = f'hk/{device.id}/{self.TOPIC_LEAF}/#' - self._logger.debug(f"subscribing to {topic}") - client.subscribe(topic, qos=1) - - def on_publish(self, client: mqtt.Client, userdata, mid): - if self._ota_mid is not None and mid == self._ota_mid and self._ota_publish_callback: - self._ota_publish_callback() - - def set_message_callback(self, callback: callable): - self._message_callback = callback - - def on_message(self, client: mqtt.Client, userdata, msg): - try: - match = re.match(self.get_mqtt_topics(), msg.topic) - self._logger.debug(f'topic: {msg.topic}') - if not match: - return - - device_id = match.group(1) - subtopic = match.group(2) - - # try: - next(d for d in self._devices if d.id == device_id) - # except StopIteration:h - # return - - message = None - if subtopic == 'stat': - message = DiagnosticsPayload.unpack(msg.payload) - elif subtopic == 'stat1': - message = InitialDiagnosticsPayload.unpack(msg.payload) - elif subtopic == 'otares': - message = OTAResultPayload.unpack(msg.payload) - - if message and self._message_callback: - self._message_callback(device_id, message) - return True - - except Exception as e: - self._logger.exception(str(e)) - - def push_ota(self, - device_id, - filename: str, - publish_callback: callable, - qos: int): - device = next(d for d in self._devices if d.id == device_id) - assert device.secret is not None, 'device secret not specified' - - self._ota_publish_callback = publish_callback - payload = OTAPayload(secret=device.secret, filename=filename) - publish_result = self._client.publish(f'hk/{device.id}/{self.TOPIC_LEAF}/admin/ota', - payload=payload.pack(), - qos=qos) - self._ota_mid = publish_result.mid - self._client.loop_write() - - @classmethod - def get_mqtt_topics(cls, additional_topics: Optional[list[str]] = None): - return rf'^hk/(.*?)/{cls.TOPIC_LEAF}/(stat|stat1|otares'+('|'+('|'.join(additional_topics)) if additional_topics else '')+')$' \ No newline at end of file diff --git a/src/home/mqtt/module/diagnostics.py b/src/home/mqtt/module/diagnostics.py new file mode 100644 index 0000000..c31cce2 --- /dev/null +++ b/src/home/mqtt/module/diagnostics.py @@ -0,0 +1,61 @@ +from ..mqtt import MqttPayload, MqttPayloadCustomField +from .._node import MqttNode, MqttModule +from typing import Optional + +MODULE_NAME = 'MqttDiagnosticsModule' + + +class DiagnosticsFlags(MqttPayloadCustomField): + state: bool + config_changed_value_present: bool + config_changed: bool + + @staticmethod + def unpack(flags: int): + # _logger.debug(f'StatFlags.unpack: flags={flags}') + state = flags & 0x1 + ccvp = (flags >> 1) & 0x1 + cc = (flags >> 2) & 0x1 + # _logger.debug(f'StatFlags.unpack: state={state}') + return DiagnosticsFlags(state=(state == 1), + config_changed_value_present=(ccvp == 1), + config_changed=(cc == 1)) + + def __index__(self): + bits = 0 + bits |= (int(self.state) & 0x1) + bits |= (int(self.config_changed_value_present) & 0x1) << 1 + bits |= (int(self.config_changed) & 0x1) << 2 + return bits + + +class InitialDiagnosticsPayload(MqttPayload): + FORMAT = '=IBbIB' + + ip: int + fw_version: int + rssi: int + free_heap: int + flags: DiagnosticsFlags + + +class DiagnosticsPayload(MqttPayload): + FORMAT = '=bIB' + + rssi: int + free_heap: int + flags: DiagnosticsFlags + + +class MqttDiagnosticsModule(MqttModule): + def init(self, mqtt: MqttNode): + for topic in ('diag', 'd1ag', 'stat', 'stat1'): + mqtt.subscribe_module(topic, self) + + def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: + if topic in ('stat', 'diag'): + message = DiagnosticsPayload.unpack(payload) + elif topic in ('stat1', 'd1ag'): + message = InitialDiagnosticsPayload.unpack(payload) + self._logger.debug(message) + return message diff --git a/src/home/mqtt/module/inverter.py b/src/home/mqtt/module/inverter.py new file mode 100644 index 0000000..9cf2978 --- /dev/null +++ b/src/home/mqtt/module/inverter.py @@ -0,0 +1,77 @@ +import struct + +from .._node import MqttNode +from .._payload import MqttPayload, bit_field + +_mult_10 = lambda n: int(n*10) +_div_10 = lambda n: n/10 + + +class Status(MqttPayload): + # 46 bytes + FORMAT = 'IHHHHHHBHHHHHBHHHHHHHH' + + PACKER = { + 'grid_voltage': _mult_10, + 'grid_freq': _mult_10, + 'ac_output_voltage': _mult_10, + 'ac_output_freq': _mult_10, + 'battery_voltage': _mult_10, + 'battery_voltage_scc': _mult_10, + 'battery_voltage_scc2': _mult_10, + 'pv1_input_voltage': _mult_10, + 'pv2_input_voltage': _mult_10 + } + UNPACKER = { + 'grid_voltage': _div_10, + 'grid_freq': _div_10, + 'ac_output_voltage': _div_10, + 'ac_output_freq': _div_10, + 'battery_voltage': _div_10, + 'battery_voltage_scc': _div_10, + 'battery_voltage_scc2': _div_10, + 'pv1_input_voltage': _div_10, + 'pv2_input_voltage': _div_10 + } + + time: int + grid_voltage: float + grid_freq: float + ac_output_voltage: float + ac_output_freq: float + ac_output_apparent_power: int + ac_output_active_power: int + output_load_percent: int + battery_voltage: float + battery_voltage_scc: float + battery_voltage_scc2: float + battery_discharge_current: int + battery_charge_current: int + battery_capacity: int + inverter_heat_sink_temp: int + mppt1_charger_temp: int + mppt2_charger_temp: int + pv1_input_power: int + pv2_input_power: int + pv1_input_voltage: float + pv2_input_voltage: float + + # H + mppt1_charger_status: bit_field(0, 16, 2) + mppt2_charger_status: bit_field(0, 16, 2) + battery_power_direction: bit_field(0, 16, 2) + dc_ac_power_direction: bit_field(0, 16, 2) + line_power_direction: bit_field(0, 16, 2) + load_connected: bit_field(0, 16, 1) + + +class Generation(MqttPayload): + # 8 bytes + FORMAT = 'II' + + time: int + wh: int + + +class MqttInverterModule(MqttNode): + pass diff --git a/src/home/mqtt/module/ota.py b/src/home/mqtt/module/ota.py new file mode 100644 index 0000000..86d6839 --- /dev/null +++ b/src/home/mqtt/module/ota.py @@ -0,0 +1,67 @@ +import hashlib + +from typing import Optional +from ..mqtt import MqttPayload +from .._node import MqttModule, MqttNode + +MODULE_NAME = 'MqttOtaModule' + + +class OtaResultPayload(MqttPayload): + FORMAT = '=BB' + result: int + error_code: int + + +class OtaPayload(MqttPayload): + secret: str + filename: str + + # structure of returned data: + # + # uint8_t[len(secret)] secret; + # uint8_t[16] md5; + # *uint8_t data + + def pack(self): + buf = bytearray(self.secret.encode()) + m = hashlib.md5() + with open(self.filename, 'rb') as fd: + content = fd.read() + m.update(content) + buf.extend(m.digest()) + buf.extend(content) + return buf + + def unpack(cls, buf: bytes): + raise RuntimeError(f'{cls.__class__.__name__}.unpack: not implemented') + # secret = buf[:12].decode() + # filename = buf[12:].decode() + # return OTAPayload(secret=secret, filename=filename) + + +class MqttOtaModule(MqttModule): + def init(self, mqtt: MqttNode): + mqtt.subscribe_module("otares", self) + + def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: + if topic == 'otares': + message = OtaResultPayload.unpack(payload) + self._logger.debug(message) + return message + + # def push_ota(self, + # node_id, + # filename: str, + # publish_callback: callable, + # qos: int): + # device = next(d for d in self._devices if d.id == device_id) + # assert device.secret is not None, 'device secret not specified' + # + # self._ota_publish_callback = publish_callback + # payload = OtaPayload(secret=device.secret, filename=filename) + # publish_result = self._client.publish(f'hk/{device.id}/{self.TOPIC_LEAF}/admin/ota', + # payload=payload.pack(), + # qos=qos) + # self._ota_mid = publish_result.mid + # self._client.loop_write() \ No newline at end of file diff --git a/src/home/mqtt/module/relay.py b/src/home/mqtt/module/relay.py new file mode 100644 index 0000000..bf22bfe --- /dev/null +++ b/src/home/mqtt/module/relay.py @@ -0,0 +1,82 @@ +import datetime + +from typing import Optional +from .. import MqttModule, MqttPayload, MqttNode + +MODULE_NAME = 'MqttRelayModule' + + +class MqttPowerSwitchPayload(MqttPayload): + FORMAT = '=12sB' + PACKER = { + 'state': lambda n: int(n), + 'secret': lambda s: s.encode('utf-8') + } + UNPACKER = { + 'state': lambda n: bool(n), + 'secret': lambda s: s.decode('utf-8') + } + + secret: str + state: bool + + +class MqttPowerStatusPayload(MqttPayload): + FORMAT = '=B' + PACKER = { + 'opened': lambda n: int(n), + } + UNPACKER = { + 'opened': lambda n: bool(n), + } + + opened: bool + + +class MqttRelayState: + enabled: bool + update_time: datetime.datetime + rssi: int + fw_version: int + ever_updated: bool + + def __init__(self): + self.ever_updated = False + self.enabled = False + self.rssi = 0 + + def update(self, + enabled: bool, + rssi: int, + fw_version=None): + self.ever_updated = True + self.enabled = enabled + self.rssi = rssi + self.update_time = datetime.datetime.now() + if fw_version: + self.fw_version = fw_version + + +class MqttRelayModule(MqttModule): + def init(self, mqtt: MqttNode): + mqtt.subscribe_module('relay/switch', self) + mqtt.subscribe_module('relay/status', self) + + @staticmethod + def switchpower(mqtt: MqttNode, + enable: bool, + secret: str): + payload = MqttPowerSwitchPayload(secret=secret, state=enable) + mqtt.publish('relay/switch', payload=payload.pack()) + + def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: + message = None + + if topic == 'relay/switch': + message = MqttPowerSwitchPayload.unpack(payload) + elif topic == 'relay/status': + message = MqttPowerStatusPayload.unpack(payload) + + if message is not None: + self._logger.debug(message) + return message diff --git a/src/home/mqtt/module/temphum.py b/src/home/mqtt/module/temphum.py new file mode 100644 index 0000000..0e43f1b --- /dev/null +++ b/src/home/mqtt/module/temphum.py @@ -0,0 +1,57 @@ +from enum import auto +from .._node import MqttNode +from .._module import MqttModule +from .._payload import MqttPayload +from ...util import HashableEnum +from typing import Optional + +two_digits_precision = lambda x: round(x, 2) + +MODULE_NAME = 'MqttTempHumModule' + + +class MqttTemphumDataPayload(MqttPayload): + FORMAT = '=ddb' + UNPACKER = { + 'temp': two_digits_precision, + 'rh': two_digits_precision + } + + temp: float + rh: float + error: int + + +class MqttTempHumNodes(HashableEnum): + KBN_SH_HALL = auto() + KBN_SH_BATHROOM = auto() + KBN_SH_LIVINGROOM = auto() + KBN_SH_BEDROOM = auto() + + KBN_BH_2FL = auto() + KBN_BH_2FL_STREET = auto() + KBN_BH_1FL_LIVINGROOM = auto() + KBN_BH_1FL_BEDROOM = auto() + KBN_BH_1FL_BATHROOM = auto() + + KBN_NH_1FL_INV = auto() + KBN_NH_1FL_CENTER = auto() + KBN_NH_1LF_KT = auto() + KBN_NH_1FL_DS = auto() + KBN_NH_1FS_EZ = auto() + + SPB_FLAT120_CABINET = auto() + + +class MqttTempHumModule(MqttModule): + def init(self, mqtt: MqttNode): + mqtt.subscribe_module('temphum/data', self) + + def handle_payload(self, + mqtt: MqttNode, + topic: str, + payload: bytes) -> Optional[MqttPayload]: + if topic == 'temphum/data': + message = MqttTemphumDataPayload.unpack(payload) + self._logger.debug(message) + return message \ No newline at end of file diff --git a/src/home/mqtt/mqtt.py b/src/home/mqtt/mqtt.py index 4acd4f6..fad5d26 100644 --- a/src/home/mqtt/mqtt.py +++ b/src/home/mqtt/mqtt.py @@ -3,8 +3,8 @@ import paho.mqtt.client as mqtt import ssl import logging -from typing import Tuple from ..config import config +from ._payload import * def username_and_password() -> Tuple[str, str]: @@ -14,6 +14,8 @@ def username_and_password() -> Tuple[str, str]: class MqttBase: + _connected: bool + def __init__(self, clean_session=True): self._client = mqtt.Client(client_id=config['mqtt']['client_id'], protocol=mqtt.MQTTv311, @@ -24,6 +26,7 @@ class MqttBase: self._client.on_log = self.on_log self._client.on_publish = self.on_publish self._loop_started = False + self._connected = False self._logger = logging.getLogger(self.__class__.__name__) @@ -41,7 +44,9 @@ class MqttBase: 'assets', 'mqtt_ca.crt' )) - self._client.tls_set(ca_certs=ca_certs, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2) + self._client.tls_set(ca_certs=ca_certs, + cert_reqs=ssl.CERT_REQUIRED, + tls_version=ssl.PROTOCOL_TLSv1_2) def connect_and_loop(self, loop_forever=True): host = config['mqtt']['host'] @@ -61,9 +66,11 @@ class MqttBase: def on_connect(self, client: mqtt.Client, userdata, flags, rc): self._logger.info("Connected with result code " + str(rc)) + self._connected = True def on_disconnect(self, client: mqtt.Client, userdata, rc): self._logger.info("Disconnected with result code " + str(rc)) + self._connected = False def on_log(self, client: mqtt.Client, userdata, level, buf): level = mqtt.LOGGING_LEVEL[level] if level in mqtt.LOGGING_LEVEL else logging.INFO @@ -73,4 +80,15 @@ class MqttBase: self._logger.debug(msg.topic + ": " + str(msg.payload)) def on_publish(self, client: mqtt.Client, userdata, mid): - self._logger.debug(f'publish done, mid={mid}') \ No newline at end of file + self._logger.debug(f'publish done, mid={mid}') + + +class MqttEspDevice: + id: str + secret: Optional[str] + + def __init__(self, + node_id: str, + secret: Optional[str] = None): + self.id = node_id + self.secret = secret diff --git a/src/home/mqtt/payload/__init__.py b/src/home/mqtt/payload/__init__.py deleted file mode 100644 index eee6709..0000000 --- a/src/home/mqtt/payload/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .base_payload import MqttPayload \ No newline at end of file diff --git a/src/home/mqtt/payload/base_payload.py b/src/home/mqtt/payload/base_payload.py deleted file mode 100644 index 1abd898..0000000 --- a/src/home/mqtt/payload/base_payload.py +++ /dev/null @@ -1,145 +0,0 @@ -import abc -import struct -import re - -from typing import Optional, Tuple - - -def pldstr(self) -> str: - attrs = [] - for field in self.__class__.__annotations__: - if hasattr(self, field): - attr = getattr(self, field) - attrs.append(f'{field}={attr}') - if attrs: - attrs_s = ' ' - attrs_s += ', '.join(attrs) - else: - attrs_s = '' - return f'<%s{attrs_s}>' % (self.__class__.__name__,) - - -class MqttPayload(abc.ABC): - FORMAT = '' - PACKER = {} - UNPACKER = {} - - def __init__(self, **kwargs): - for field in self.__class__.__annotations__: - setattr(self, field, kwargs[field]) - - def pack(self): - args = [] - bf_number = -1 - bf_arg = 0 - bf_progress = 0 - - for field, field_type in self.__class__.__annotations__.items(): - bfp = _bit_field_params(field_type) - if bfp: - n, s, b = bfp - if n != bf_number: - if bf_number != -1: - args.append(bf_arg) - bf_number = n - bf_progress = 0 - bf_arg = 0 - bf_arg |= (getattr(self, field) & (2 ** b - 1)) << bf_progress - bf_progress += b - - else: - if bf_number != -1: - args.append(bf_arg) - bf_number = -1 - bf_progress = 0 - bf_arg = 0 - - args.append(self._pack_field(field)) - - if bf_number != -1: - args.append(bf_arg) - - return struct.pack(self.FORMAT, *args) - - @classmethod - def unpack(cls, buf: bytes): - data = struct.unpack(cls.FORMAT, buf) - kwargs = {} - i = 0 - bf_number = -1 - bf_progress = 0 - - for field, field_type in cls.__annotations__.items(): - bfp = _bit_field_params(field_type) - if bfp: - n, s, b = bfp - if n != bf_number: - bf_number = n - bf_progress = 0 - kwargs[field] = (data[i] >> bf_progress) & (2 ** b - 1) - bf_progress += b - continue # don't increment i - - if bf_number != -1: - bf_number = -1 - i += 1 - - if issubclass(field_type, MqttPayloadCustomField): - kwargs[field] = field_type.unpack(data[i]) - else: - kwargs[field] = cls._unpack_field(field, data[i]) - i += 1 - - return cls(**kwargs) - - def _pack_field(self, name): - val = getattr(self, name) - if self.PACKER and name in self.PACKER: - return self.PACKER[name](val) - else: - return val - - @classmethod - def _unpack_field(cls, name, val): - if isinstance(val, MqttPayloadCustomField): - return - if cls.UNPACKER and name in cls.UNPACKER: - return cls.UNPACKER[name](val) - else: - return val - - def __str__(self): - return pldstr(self) - - -class MqttPayloadCustomField(abc.ABC): - def __init__(self, **kwargs): - for field in self.__class__.__annotations__: - setattr(self, field, kwargs[field]) - - @abc.abstractmethod - def __index__(self): - pass - - @classmethod - @abc.abstractmethod - def unpack(cls, *args, **kwargs): - pass - - def __str__(self): - return pldstr(self) - - -def bit_field(seq_no: int, total_bits: int, bits: int): - return type(f'MQTTPayloadBitField_{seq_no}_{total_bits}_{bits}', (object,), { - 'seq_no': seq_no, - 'total_bits': total_bits, - 'bits': bits - }) - - -def _bit_field_params(cl) -> Optional[Tuple[int, ...]]: - match = re.match(r'MQTTPayloadBitField_(\d+)_(\d+)_(\d)$', cl.__name__) - if match is not None: - return tuple([int(match.group(i)) for i in range(1, 4)]) - return None diff --git a/src/home/mqtt/payload/esp.py b/src/home/mqtt/payload/esp.py deleted file mode 100644 index 171cdb9..0000000 --- a/src/home/mqtt/payload/esp.py +++ /dev/null @@ -1,78 +0,0 @@ -import hashlib - -from .base_payload import MqttPayload, MqttPayloadCustomField - - -class OTAResultPayload(MqttPayload): - FORMAT = '=BB' - result: int - error_code: int - - -class OTAPayload(MqttPayload): - secret: str - filename: str - - # structure of returned data: - # - # uint8_t[len(secret)] secret; - # uint8_t[16] md5; - # *uint8_t data - - def pack(self): - buf = bytearray(self.secret.encode()) - m = hashlib.md5() - with open(self.filename, 'rb') as fd: - content = fd.read() - m.update(content) - buf.extend(m.digest()) - buf.extend(content) - return buf - - def unpack(cls, buf: bytes): - raise RuntimeError(f'{cls.__class__.__name__}.unpack: not implemented') - # secret = buf[:12].decode() - # filename = buf[12:].decode() - # return OTAPayload(secret=secret, filename=filename) - - -class DiagnosticsFlags(MqttPayloadCustomField): - state: bool - config_changed_value_present: bool - config_changed: bool - - @staticmethod - def unpack(flags: int): - # _logger.debug(f'StatFlags.unpack: flags={flags}') - state = flags & 0x1 - ccvp = (flags >> 1) & 0x1 - cc = (flags >> 2) & 0x1 - # _logger.debug(f'StatFlags.unpack: state={state}') - return DiagnosticsFlags(state=(state == 1), - config_changed_value_present=(ccvp == 1), - config_changed=(cc == 1)) - - def __index__(self): - bits = 0 - bits |= (int(self.state) & 0x1) - bits |= (int(self.config_changed_value_present) & 0x1) << 1 - bits |= (int(self.config_changed) & 0x1) << 2 - return bits - - -class InitialDiagnosticsPayload(MqttPayload): - FORMAT = '=IBbIB' - - ip: int - fw_version: int - rssi: int - free_heap: int - flags: DiagnosticsFlags - - -class DiagnosticsPayload(MqttPayload): - FORMAT = '=bIB' - - rssi: int - free_heap: int - flags: DiagnosticsFlags diff --git a/src/home/mqtt/payload/inverter.py b/src/home/mqtt/payload/inverter.py deleted file mode 100644 index 09388df..0000000 --- a/src/home/mqtt/payload/inverter.py +++ /dev/null @@ -1,73 +0,0 @@ -import struct - -from .base_payload import MqttPayload, bit_field -from typing import Tuple - -_mult_10 = lambda n: int(n*10) -_div_10 = lambda n: n/10 - - -class Status(MqttPayload): - # 46 bytes - FORMAT = 'IHHHHHHBHHHHHBHHHHHHHH' - - PACKER = { - 'grid_voltage': _mult_10, - 'grid_freq': _mult_10, - 'ac_output_voltage': _mult_10, - 'ac_output_freq': _mult_10, - 'battery_voltage': _mult_10, - 'battery_voltage_scc': _mult_10, - 'battery_voltage_scc2': _mult_10, - 'pv1_input_voltage': _mult_10, - 'pv2_input_voltage': _mult_10 - } - UNPACKER = { - 'grid_voltage': _div_10, - 'grid_freq': _div_10, - 'ac_output_voltage': _div_10, - 'ac_output_freq': _div_10, - 'battery_voltage': _div_10, - 'battery_voltage_scc': _div_10, - 'battery_voltage_scc2': _div_10, - 'pv1_input_voltage': _div_10, - 'pv2_input_voltage': _div_10 - } - - time: int - grid_voltage: float - grid_freq: float - ac_output_voltage: float - ac_output_freq: float - ac_output_apparent_power: int - ac_output_active_power: int - output_load_percent: int - battery_voltage: float - battery_voltage_scc: float - battery_voltage_scc2: float - battery_discharge_current: int - battery_charge_current: int - battery_capacity: int - inverter_heat_sink_temp: int - mppt1_charger_temp: int - mppt2_charger_temp: int - pv1_input_power: int - pv2_input_power: int - pv1_input_voltage: float - pv2_input_voltage: float - - # H - mppt1_charger_status: bit_field(0, 16, 2) - mppt2_charger_status: bit_field(0, 16, 2) - battery_power_direction: bit_field(0, 16, 2) - dc_ac_power_direction: bit_field(0, 16, 2) - line_power_direction: bit_field(0, 16, 2) - load_connected: bit_field(0, 16, 1) - - -class Generation(MqttPayload): - # 8 bytes - FORMAT = 'II' - - time: int - wh: int diff --git a/src/home/mqtt/payload/relay.py b/src/home/mqtt/payload/relay.py deleted file mode 100644 index 4902991..0000000 --- a/src/home/mqtt/payload/relay.py +++ /dev/null @@ -1,22 +0,0 @@ -from .base_payload import MqttPayload -from .esp import ( - OTAResultPayload, - OTAPayload, - InitialDiagnosticsPayload, - DiagnosticsPayload -) - - -class PowerPayload(MqttPayload): - FORMAT = '=12sB' - PACKER = { - 'state': lambda n: int(n), - 'secret': lambda s: s.encode('utf-8') - } - UNPACKER = { - 'state': lambda n: bool(n), - 'secret': lambda s: s.decode('utf-8') - } - - secret: str - state: bool diff --git a/src/home/mqtt/payload/sensors.py b/src/home/mqtt/payload/sensors.py deleted file mode 100644 index f99b307..0000000 --- a/src/home/mqtt/payload/sensors.py +++ /dev/null @@ -1,20 +0,0 @@ -from .base_payload import MqttPayload - -_mult_100 = lambda n: int(n*100) -_div_100 = lambda n: n/100 - - -class Temperature(MqttPayload): - FORMAT = 'IhH' - PACKER = { - 'temp': _mult_100, - 'rh': _mult_100, - } - UNPACKER = { - 'temp': _div_100, - 'rh': _div_100, - } - - time: int - temp: float - rh: float diff --git a/src/home/mqtt/payload/temphum.py b/src/home/mqtt/payload/temphum.py deleted file mode 100644 index c0b744e..0000000 --- a/src/home/mqtt/payload/temphum.py +++ /dev/null @@ -1,15 +0,0 @@ -from .base_payload import MqttPayload - -two_digits_precision = lambda x: round(x, 2) - - -class TempHumDataPayload(MqttPayload): - FORMAT = '=ddb' - UNPACKER = { - 'temp': two_digits_precision, - 'rh': two_digits_precision - } - - temp: float - rh: float - error: int diff --git a/src/home/mqtt/relay.py b/src/home/mqtt/relay.py index a90f19c..cf657f7 100644 --- a/src/home/mqtt/relay.py +++ b/src/home/mqtt/relay.py @@ -1,71 +1,59 @@ +#!/usr/bin/env python3 import paho.mqtt.client as mqtt import re -import datetime +import logging -from .payload.relay import ( - PowerPayload, -) -from .esp import MqttEspBase +from .mqtt import MQTTBase -class MqttRelay(MqttEspBase): - TOPIC_LEAF = 'relay' +class MQTTRelayClient(MQTTBase): + _home_id: str - def set_power(self, device_id, enable: bool, secret=None): - device = next(d for d in self._devices if d.id == device_id) - secret = secret if secret else device.secret + def __init__(self, home_id: str): + super().__init__(clean_session=True) + self._home_id = home_id - assert secret is not None, 'device secret not specified' + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) - payload = PowerPayload(secret=secret, - state=enable) - self._client.publish(f'hk/{device.id}/{self.TOPIC_LEAF}/power', - payload=payload.pack(), - qos=1) - self._client.loop_write() + topic = f'home/{self._home_id}/#' + self._logger.info(f"subscribing to {topic}") - def on_message(self, client: mqtt.Client, userdata, msg): - if super().on_message(client, userdata, msg): - return + client.subscribe(topic, qos=1) + def on_message(self, client: mqtt.Client, userdata, msg): try: - match = re.match(self.get_mqtt_topics(['power']), msg.topic) + match = re.match(r'^home/(.*?)/relay/(stat|power)(?:/(.+))?$', msg.topic) + self._logger.info(f'topic: {msg.topic}') if not match: return - device_id = match.group(1) + name = match.group(1) subtopic = match.group(2) - message = None - if subtopic == 'power': - message = PowerPayload.unpack(msg.payload) + if name != self._home_id: + return - if message and self._message_callback: - self._message_callback(device_id, message) + if subtopic == 'stat': + stat_name, stat_value = match.group(3).split('/') + self._logger.info(f'stat: {stat_name} = {stat_value}') except Exception as e: self._logger.exception(str(e)) -class MqttRelayState: - enabled: bool - update_time: datetime.datetime - rssi: int - fw_version: int - ever_updated: bool - - def __init__(self): - self.ever_updated = False - self.enabled = False - self.rssi = 0 - - def update(self, - enabled: bool, - rssi: int, - fw_version=None): - self.ever_updated = True - self.enabled = enabled - self.rssi = rssi - self.update_time = datetime.datetime.now() - if fw_version: - self.fw_version = fw_version +class MQTTRelayController(MQTTBase): + _home_id: str + + def __init__(self, home_id: str): + super().__init__(clean_session=True) + self._home_id = home_id + + def set_power(self, enable: bool): + self._client.publish(f'home/{self._home_id}/relay/power', + payload=int(enable), + qos=1) + self._client.loop_write() + + def send_stat(self, stat: dict): + pass diff --git a/src/home/mqtt/temphum.py b/src/home/mqtt/temphum.py deleted file mode 100644 index 44810ef..0000000 --- a/src/home/mqtt/temphum.py +++ /dev/null @@ -1,54 +0,0 @@ -import paho.mqtt.client as mqtt -import re - -from enum import auto -from .payload.temphum import TempHumDataPayload -from .esp import MqttEspBase -from ..util import HashableEnum - - -class MqttTempHumNodes(HashableEnum): - KBN_SH_HALL = auto() - KBN_SH_BATHROOM = auto() - KBN_SH_LIVINGROOM = auto() - KBN_SH_BEDROOM = auto() - - KBN_BH_2FL = auto() - KBN_BH_2FL_STREET = auto() - KBN_BH_1FL_LIVINGROOM = auto() - KBN_BH_1FL_BEDROOM = auto() - KBN_BH_1FL_BATHROOM = auto() - - KBN_NH_1FL_INV = auto() - KBN_NH_1FL_CENTER = auto() - KBN_NH_1LF_KT = auto() - KBN_NH_1FL_DS = auto() - KBN_NH_1FS_EZ = auto() - - SPB_FLAT120_CABINET = auto() - - -class MqttTempHum(MqttEspBase): - TOPIC_LEAF = 'temphum' - - def on_message(self, client: mqtt.Client, userdata, msg): - if super().on_message(client, userdata, msg): - return - - try: - match = re.match(self.get_mqtt_topics(['data']), msg.topic) - if not match: - return - - device_id = match.group(1) - subtopic = match.group(2) - - message = None - if subtopic == 'data': - message = TempHumDataPayload.unpack(msg.payload) - - if message and self._message_callback: - self._message_callback(device_id, message) - - except Exception as e: - self._logger.exception(str(e)) diff --git a/src/home/mqtt/util.py b/src/home/mqtt/util.py index f71ffd8..91b6baf 100644 --- a/src/home/mqtt/util.py +++ b/src/home/mqtt/util.py @@ -1,4 +1,11 @@ import time +import os +import re +import importlib + +from ._node import MqttNode +from . import MqttModule +from typing import List def poll_tick(freq): @@ -6,3 +13,26 @@ def poll_tick(freq): while True: t += freq yield max(t - time.time(), 0) + + +def get_modules() -> List[str]: + modules = [] + for name in os.listdir(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'module')): + name = re.sub(r'\.py$', '', name) + modules.append(name) + return modules + + +def import_module(module: str): + return importlib.import_module( + f'..module.{module}', __name__) + + +def add_module(mqtt_node: MqttNode, module: str) -> MqttModule: + module = import_module(module) + if not hasattr(module, 'MODULE_NAME'): + raise RuntimeError(f'MODULE_NAME not found in module {module}') + cl = getattr(module, getattr(module, 'MODULE_NAME')) + instance = cl() + mqtt_node.add_module(instance) + return instance \ No newline at end of file diff --git a/src/home/pio/products.py b/src/home/pio/products.py index 7649078..388da03 100644 --- a/src/home/pio/products.py +++ b/src/home/pio/products.py @@ -16,10 +16,6 @@ _products_dir = os.path.join( def get_products(): products = [] for f in os.listdir(_products_dir): - # temp hack - if f.endswith('-esp01'): - continue - # skip the common dir if f in ('common',): continue diff --git a/src/mqtt_node_util.py b/src/mqtt_node_util.py new file mode 100755 index 0000000..674b60c --- /dev/null +++ b/src/mqtt_node_util.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +from typing import Optional +from argparse import ArgumentParser, ArgumentError + +from home.config import config +from home.mqtt import MqttNode, get_mqtt_modules, import_mqtt_module, MqttModule + +mqtt: Optional[MqttNode] = None + + +def add_module(module: str) -> MqttModule: + module = import_mqtt_module(module) + if not hasattr(module, 'MODULE_NAME'): + raise RuntimeError(f'MODULE_NAME not found in module {m}') + cl = getattr(module, getattr(module, 'MODULE_NAME')) + instance = cl() + mqtt.add_module(instance) + return instance + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--node-id', type=str, required=True) + parser.add_argument('--modules', type=str, choices=get_mqtt_modules(), nargs='*', + help='mqtt modules to include') + parser.add_argument('--switch-relay', choices=[0, 1], type=int, + help='send relay state') + parser.add_argument('--switch-relay-secret', type=str, + help='secret password to switch relay') + + config.load('mqtt_util', parser=parser) + arg = parser.parse_args() + + if (arg.switch_relay is not None or arg.switch_relay_secret is not None) and 'relay' not in arg.modules: + raise ArgumentError(None, '--relay is only allowed when \'relay\' module included in --modules') + + if (arg.switch_relay is not None and arg.switch_relay_secret is None) or (arg.switch_relay is None and arg.switch_relay_secret is not None): + raise ArgumentError(None, 'both --switch-relay and --switch-relay-secret are required') + + mqtt = MqttNode(node_id=arg.node_id) + + # must-have modules + add_module('ota') + add_module('diagnostics') + + if arg.modules: + for m in arg.modules: + module_instance = add_module(m) + if m == 'relay' and arg.switch_relay is not None: + module_instance.switchpower(mqtt, + arg.switch_relay == 1, + arg.switch_relay_secret) + + mqtt.configure_tls() + try: + mqtt.connect_and_loop() + except KeyboardInterrupt: + mqtt.disconnect() diff --git a/src/pio_ini.py b/src/pio_ini.py index 19dd707..920c3e5 100755 --- a/src/pio_ini.py +++ b/src/pio_ini.py @@ -54,12 +54,17 @@ def bsd_parser(product_config: dict, arg_kwargs['type'] = int elif kwargs['type'] == 'int': arg_kwargs['type'] = int + elif kwargs['type'] == 'bool': + arg_kwargs['action'] = 'store_true' + arg_kwargs['required'] = False else: raise TypeError(f'unsupported type {kwargs["type"]} for define {define_name}') else: arg_kwargs['action'] = 'store_true' - parser.add_argument(f'--{define_name}', required=True, **arg_kwargs) + if 'required' not in arg_kwargs: + arg_kwargs['required'] = True + parser.add_argument(f'--{define_name}', **arg_kwargs) bsd_walk(product_config, f) @@ -76,6 +81,9 @@ def bsd_get(product_config: dict, enums.append(f'CONFIG_{define_name}') defines[f'CONFIG_{define_name}'] = f'HOMEKIT_{attr_value.upper()}' return + if kwargs['type'] == 'bool': + defines[f'CONFIG_{define_name}'] = True + return defines[f'CONFIG_{define_name}'] = str(attr_value) bsd_walk(product_config, f) return defines, enums diff --git a/src/pump_bot.py b/src/pump_bot.py index de925db..fa884ab 100755 --- a/src/pump_bot.py +++ b/src/pump_bot.py @@ -2,15 +2,34 @@ from enum import Enum from typing import Optional from telegram import ReplyKeyboardMarkup, User +from time import time +from datetime import datetime -from home.config import config +from home.config import config, is_development_mode from home.telegram import bot from home.telegram._botutil import user_any_name from home.relay.sunxi_h3_client import RelayClient from home.api.types import BotType +from home.mqtt import MqttNode, MqttModule, MqttPayload, add_mqtt_module +from home.mqtt.module.relay import MqttPowerStatusPayload +from home.mqtt.module.temphum import MqttTemphumDataPayload +from home.mqtt.module.diagnostics import InitialDiagnosticsPayload + config.load('pump_bot') +mqtt: Optional[MqttNode] = None +mqtt_relay_module: Optional[MqttModule] = None +time_format = '%d.%m.%Y, %H:%M:%S' + +watering_mcu_status = { + 'last_time': 0, + 'last_boot_time': 0, + 'relay_opened': False, + 'ambient_temp': 0.0, + 'ambient_rh': 0.0, +} + bot.initialize() bot.lang.ru( start_message="Выберите команду на клавиатуре", @@ -18,17 +37,27 @@ bot.lang.ru( enable="Включить", enable_silently="Включить тихо", - enabled="Включен ✅", + enabled="Насос включен ✅", disable="Выключить", disable_silently="Выключить тихо", - disabled="Выключен ❌", + disabled="Насос выключен ❌", + + start_watering="Включить полив", + stop_watering="Отключить полив", + + status="Статус насоса", + watering_status="Статус полива", - status="Статус", done="Готово 👌", + sent="Команда отправлена", + user_action_notification='Пользователь %s %s насос.', + user_watering_notification='Пользователь %s %s полив.', user_action_on="включил", user_action_off="выключил", + user_action_watering_on="включил", + user_action_watering_off="выключил", ) bot.lang.en( start_message="Select command on the keyboard", @@ -36,23 +65,35 @@ bot.lang.en( enable="Turn ON", enable_silently="Turn ON silently", - enabled="Turned ON ✅", + enabled="The pump is turned ON ✅", disable="Turn OFF", disable_silently="Turn OFF silently", - disabled="Turned OFF ❌", + disabled="The pump is turned OFF ❌", + + start_watering="Start watering", + stop_watering="Stop watering", + + status="Pump status", + watering_status="Watering status", - status="Status", done="Done 👌", + sent="Request sent", + user_action_notification='User %s turned the pump %s.', + user_watering_notification='User %s %s the watering.', user_action_on="ON", user_action_off="OFF", + user_action_watering_on="started", + user_action_watering_off="stopped", ) class UserAction(Enum): ON = 'on' OFF = 'off' + WATERING_ON = 'watering_on' + WATERING_OFF = 'watering_off' def get_relay() -> RelayClient: @@ -75,11 +116,24 @@ def off(ctx: bot.Context, silent=False) -> None: notify(ctx.user, UserAction.OFF) +def watering_on(ctx: bot.Context) -> None: + mqtt_relay_module.switchpower(mqtt, True, config.get('mqtt_water_relay.secret')) + ctx.reply(ctx.lang('sent')) + notify(ctx.user, UserAction.WATERING_ON) + + +def watering_off(ctx: bot.Context) -> None: + mqtt_relay_module.switchpower(mqtt, False, config.get('mqtt_water_relay.secret')) + ctx.reply(ctx.lang('sent')) + notify(ctx.user, UserAction.WATERING_OFF) + + def notify(user: User, action: UserAction) -> None: + notification_key = 'user_watering_notification' if action in (UserAction.WATERING_ON, UserAction.WATERING_OFF) else 'user_action_notification' def text_getter(lang: str): action_name = bot.lang.get(f'user_action_{action.value}', lang) user_name = user_any_name(user) - return 'ℹ ' + bot.lang.get('user_action_notification', lang, + return 'ℹ ' + bot.lang.get(notification_key, lang, user.id, user_name, action_name) bot.notify_all(text_getter, exclude=(user.id,)) @@ -100,6 +154,16 @@ def disable_handler(ctx: bot.Context) -> None: off(ctx) +@bot.handler(message='start_watering') +def start_watering(ctx: bot.Context) -> None: + watering_on(ctx) + + +@bot.handler(message='stop_watering') +def stop_watering(ctx: bot.Context) -> None: + watering_off(ctx) + + @bot.handler(message='disable_silently') def disable_s_handler(ctx: bot.Context) -> None: off(ctx, True) @@ -112,20 +176,71 @@ def status(ctx: bot.Context) -> None: ) +def _get_timestamp_as_string(timestamp: int) -> str: + if timestamp != 0: + return datetime.fromtimestamp(timestamp).strftime(time_format) + else: + return 'unknown' + + +@bot.handler(message='watering_status') +def watering_status(ctx: bot.Context) -> None: + buf = '' + if 0 < watering_mcu_status["last_time"] < time()-1800: + buf += 'WARNING! long time no reports from mcu! maybe something\'s wrong\n' + buf += f'last report time: {_get_timestamp_as_string(watering_mcu_status["last_time"])}\n' + if watering_mcu_status["last_boot_time"] != 0: + buf += f'boot time: {_get_timestamp_as_string(watering_mcu_status["last_boot_time"])}\n' + buf += 'relay opened: ' + ('yes' if watering_mcu_status['relay_opened'] else 'no') + '\n' + buf += f'ambient temp & humidity: {watering_mcu_status["ambient_temp"]} °C, {watering_mcu_status["ambient_rh"]}%' + ctx.reply(buf) + + @bot.defaultreplymarkup def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: - buttons = [ - [ctx.lang('enable'), ctx.lang('disable')], - ] - + buttons = [] if ctx.user_id in config['bot']['silent_users']: buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')]) - - buttons.append([ctx.lang('status')]) + buttons.append([ctx.lang('enable'), ctx.lang('disable'), ctx.lang('status')],) + buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering'), ctx.lang('watering_status')]) return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) +def mqtt_payload_callback(payload: MqttPayload): + global watering_mcu_status + + watering_mcu_status['last_time'] = int(time()) + + if isinstance(payload, InitialDiagnosticsPayload): + watering_mcu_status['last_boot_time'] = int(time()) + + elif isinstance(payload, MqttTemphumDataPayload): + watering_mcu_status['ambient_temp'] = payload.temp + watering_mcu_status['ambient_rh'] = payload.rh + + elif isinstance(payload, MqttPowerStatusPayload): + watering_mcu_status['relay_opened'] = payload.opened + + if __name__ == '__main__': + mqtt = MqttNode(node_id=config.get('mqtt_water_relay.node_id')) + if is_development_mode(): + add_mqtt_module(mqtt, 'diagnostics') + + mqtt_relay_module = add_mqtt_module(mqtt, 'temphum') + mqtt_relay_module = add_mqtt_module(mqtt, 'relay') + + mqtt.add_payload_callback(mqtt_payload_callback) + + mqtt.configure_tls() + mqtt.connect_and_loop(loop_forever=False) + bot.enable_logging(BotType.PUMP) bot.run() + + try: + mqtt.disconnect() + except: + pass + diff --git a/src/pump_mqtt_bot.py b/src/pump_mqtt_bot.py index d3b6de4..6a63caf 100755 --- a/src/pump_mqtt_bot.py +++ b/src/pump_mqtt_bot.py @@ -8,10 +8,9 @@ from telegram import ReplyKeyboardMarkup, User from home.config import config from home.telegram import bot from home.telegram._botutil import user_any_name -from home.mqtt.esp import MqttEspDevice -from home.mqtt import MqttRelay, MqttRelayState -from home.mqtt.payload import MqttPayload -from home.mqtt.payload.relay import InitialDiagnosticsPayload, DiagnosticsPayload +from home.mqtt import MqttNode, MqttPayload +from home.mqtt.module.relay import MqttRelayState +from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload config.load('pump_mqtt_bot') @@ -70,7 +69,7 @@ bot.lang.en( ) -mqtt_relay: Optional[MqttRelay] = None +mqtt: Optional[MqttNode] = None relay_state = MqttRelayState() @@ -99,14 +98,14 @@ def notify(user: User, action: UserAction) -> None: @bot.handler(message='enable') def enable_handler(ctx: bot.Context) -> None: - mqtt_relay.set_power(config['mqtt']['home_id'], True) + mqtt.set_power(config['mqtt']['home_id'], True) ctx.reply(ctx.lang('done')) notify(ctx.user, UserAction.ON) @bot.handler(message='disable') def disable_handler(ctx: bot.Context) -> None: - mqtt_relay.set_power(config['mqtt']['home_id'], False) + mqtt.set_power(config['mqtt']['home_id'], False) ctx.reply(ctx.lang('done')) notify(ctx.user, UserAction.OFF) @@ -157,13 +156,13 @@ def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: if __name__ == '__main__': - mqtt_relay = MqttRelay(devices=MqttEspDevice(id=config['mqtt']['home_id'], - secret=config['mqtt']['home_secret'])) - mqtt_relay.set_message_callback(on_mqtt_message) - mqtt_relay.configure_tls() - mqtt_relay.connect_and_loop(loop_forever=False) + mqtt = MqttRelay(devices=MqttEspDevice(id=config['mqtt']['home_id'], + secret=config['mqtt']['home_secret'])) + mqtt.set_message_callback(on_mqtt_message) + mqtt.configure_tls() + mqtt.connect_and_loop(loop_forever=False) # bot.enable_logging(BotType.PUMP_MQTT) bot.run(start_handler=start) - mqtt_relay.disconnect() + mqtt.disconnect() -- cgit v1.2.3