diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/home/mqtt/_module.py | 5 | ||||
-rw-r--r-- | src/home/mqtt/_node.py | 16 | ||||
-rw-r--r-- | src/home/mqtt/module/diagnostics.py | 6 | ||||
-rw-r--r-- | src/home/mqtt/module/ota.py | 4 | ||||
-rw-r--r-- | src/home/mqtt/module/relay.py | 21 | ||||
-rw-r--r-- | src/home/mqtt/module/temphum.py | 10 | ||||
-rwxr-xr-x | src/pump_bot.py | 72 |
7 files changed, 107 insertions, 27 deletions
diff --git a/src/home/mqtt/_module.py b/src/home/mqtt/_module.py index 949c344..ef50e70 100644 --- a/src/home/mqtt/_module.py +++ b/src/home/mqtt/_module.py @@ -3,9 +3,10 @@ from __future__ import annotations import abc import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from ._node import MqttNode + from ._payload import MqttPayload class MqttModule(abc.ABC): @@ -29,5 +30,5 @@ class MqttModule(abc.ABC): def tick(self, mqtt: MqttNode): pass - def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes): + def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: pass diff --git a/src/home/mqtt/_node.py b/src/home/mqtt/_node.py index c76610f..688b30b 100644 --- a/src/home/mqtt/_node.py +++ b/src/home/mqtt/_node.py @@ -3,30 +3,32 @@ import paho.mqtt.client as mqtt from .mqtt import MqttBase from typing import List from ._module import MqttModule +from ._payload import MqttPayload class MqttNode(MqttBase): _modules: List[MqttModule] _module_subscriptions: dict[str, MqttModule] _node_id: str + _payload_callbacks: list[callable] # _devices: list[MqttEspDevice] # _message_callback: Optional[callable] # _ota_publish_callback: Optional[callable] def __init__(self, node_id: str, - # devices: Union[MqttEspDevice, list[MqttEspDevice]], - subscribe_to_updates=True): + # devices: Union[MqttEspDevice, list[MqttEspDevice]] + ): super().__init__(clean_session=True) self._modules = [] self._module_subscriptions = {} self._node_id = node_id + self._payload_callbacks = [] # if not isinstance(devices, list): # devices = [devices] # self._devices = devices # self._message_callback = None # self._ota_publish_callback = None - # self._subscribe_to_updates = subscribe_to_updates # self._ota_mid = None def on_connect(self, client: mqtt.Client, userdata, flags, rc): @@ -47,7 +49,10 @@ class MqttNode(MqttBase): actual_topic = topic[len(f'hk/{self._node_id}/'):] if actual_topic in self._module_subscriptions: - self._module_subscriptions[actual_topic].handle_payload(self, actual_topic, msg.payload) + payload = self._module_subscriptions[actual_topic].handle_payload(self, actual_topic, msg.payload) + if isinstance(payload, MqttPayload): + for f in self._payload_callbacks: + f(payload) except Exception as e: self._logger.exception(str(e)) @@ -85,3 +90,6 @@ class MqttNode(MqttBase): def publish(self, topic: str, payload: bytes, qos: int = 1): self._client.publish(f'hk/{self._node_id}/{topic}', payload, qos) self._client.loop_write() + + def add_payload_callback(self, callback: callable): + self._payload_callbacks.append(callback)
\ No newline at end of file diff --git a/src/home/mqtt/module/diagnostics.py b/src/home/mqtt/module/diagnostics.py index 8b5ea16..c31cce2 100644 --- a/src/home/mqtt/module/diagnostics.py +++ b/src/home/mqtt/module/diagnostics.py @@ -1,5 +1,6 @@ from ..mqtt import MqttPayload, MqttPayloadCustomField from .._node import MqttNode, MqttModule +from typing import Optional MODULE_NAME = 'MqttDiagnosticsModule' @@ -51,9 +52,10 @@ class MqttDiagnosticsModule(MqttModule): for topic in ('diag', 'd1ag', 'stat', 'stat1'): mqtt.subscribe_module(topic, self) - def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes): + def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: if topic in ('stat', 'diag'): message = DiagnosticsPayload.unpack(payload) elif topic in ('stat1', 'd1ag'): message = InitialDiagnosticsPayload.unpack(payload) - self._logger.debug(message)
\ No newline at end of file + self._logger.debug(message) + return message diff --git a/src/home/mqtt/module/ota.py b/src/home/mqtt/module/ota.py index 1d472d1..86d6839 100644 --- a/src/home/mqtt/module/ota.py +++ b/src/home/mqtt/module/ota.py @@ -1,5 +1,6 @@ import hashlib +from typing import Optional from ..mqtt import MqttPayload from .._node import MqttModule, MqttNode @@ -43,10 +44,11 @@ class MqttOtaModule(MqttModule): def init(self, mqtt: MqttNode): mqtt.subscribe_module("otares", self) - def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes): + def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: if topic == 'otares': message = OtaResultPayload.unpack(payload) self._logger.debug(message) + return message # def push_ota(self, # node_id, diff --git a/src/home/mqtt/module/relay.py b/src/home/mqtt/module/relay.py index 16877f6..721ceac 100644 --- a/src/home/mqtt/module/relay.py +++ b/src/home/mqtt/module/relay.py @@ -1,7 +1,6 @@ -import paho.mqtt.client as mqtt -import re import datetime +from typing import Optional from .. import MqttModule, MqttPayload, MqttNode MODULE_NAME = 'MqttRelayModule' @@ -22,6 +21,18 @@ class MqttPowerSwitchPayload(MqttPayload): state: bool +class MqttPowerStatusPayload(MqttPayload): + FORMAT = '=B' + PACKER = { + 'opened': lambda n: int(n), + } + UNPACKER = { + 'opened': lambda n: bool(n), + } + + opened: bool + + class MqttRelayState: enabled: bool update_time: datetime.datetime @@ -57,9 +68,11 @@ class MqttRelayModule(MqttModule): payload = MqttPowerSwitchPayload(secret=secret, state=enable) mqtt.publish('relay/switch', payload=payload.pack()) - def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes): + def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: if topic != 'relay/switch': return message = MqttPowerSwitchPayload.unpack(payload) - self._logger.debug(message)
\ No newline at end of file + self._logger.debug(message) + + return message
\ No newline at end of file diff --git a/src/home/mqtt/module/temphum.py b/src/home/mqtt/module/temphum.py index e1c4567..0e43f1b 100644 --- a/src/home/mqtt/module/temphum.py +++ b/src/home/mqtt/module/temphum.py @@ -3,13 +3,14 @@ from .._node import MqttNode from .._module import MqttModule from .._payload import MqttPayload from ...util import HashableEnum +from typing import Optional two_digits_precision = lambda x: round(x, 2) MODULE_NAME = 'MqttTempHumModule' -class TempHumDataPayload(MqttPayload): +class MqttTemphumDataPayload(MqttPayload): FORMAT = '=ddb' UNPACKER = { 'temp': two_digits_precision, @@ -49,7 +50,8 @@ class MqttTempHumModule(MqttModule): def handle_payload(self, mqtt: MqttNode, topic: str, - payload: bytes): + payload: bytes) -> Optional[MqttPayload]: if topic == 'temphum/data': - message = TempHumDataPayload.unpack(payload) - self._logger.debug(message)
\ No newline at end of file + message = MqttTemphumDataPayload.unpack(payload) + self._logger.debug(message) + return message
\ No newline at end of file diff --git a/src/pump_bot.py b/src/pump_bot.py index d3aa6b0..0e2b71d 100755 --- a/src/pump_bot.py +++ b/src/pump_bot.py @@ -2,18 +2,33 @@ from enum import Enum from typing import Optional from telegram import ReplyKeyboardMarkup, User +from time import time +from datetime import datetime from home.config import config, is_development_mode from home.telegram import bot from home.telegram._botutil import user_any_name from home.relay.sunxi_h3_client import RelayClient from home.api.types import BotType -from home.mqtt import MqttNode, MqttModule, add_mqtt_module +from home.mqtt import MqttNode, MqttModule, MqttPayload, add_mqtt_module +from home.mqtt.module.relay import MqttPowerStatusPayload +from home.mqtt.module.temphum import MqttTemphumDataPayload +from home.mqtt.module.diagnostics import InitialDiagnosticsPayload + config.load('pump_bot') mqtt: Optional[MqttNode] = None mqtt_relay_module: Optional[MqttModule] = None +time_format = '%d.%m.%Y, %H:%M:%S' + +watering_mcu_status = { + 'last_time': 0, + 'last_boot_time': 0, + 'relay_opened': False, + 'ambient_temp': 0.0, + 'ambient_rh': 0.0, +} bot.initialize() bot.lang.ru( @@ -31,7 +46,9 @@ bot.lang.ru( start_watering="Включить полив", stop_watering="Отключить полив", - status="Статус", + status="Статус насоса", + watering_status="Статус полива", + done="Готово 👌", user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.', user_watering_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> полив.', @@ -55,7 +72,9 @@ bot.lang.en( start_watering="Start watering", stop_watering="Stop watering", - status="Status", + status="Pump status", + watering_status="Watering status", + done="Done 👌", user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.', user_watering_notification='User <a href="tg://user?id=%d">%s</a> <b>%s</b> the watering.', @@ -153,27 +172,60 @@ def status(ctx: bot.Context) -> None: ) +def _get_timestamp_as_string(timestamp: int) -> str: + if timestamp != 0: + return datetime.fromtimestamp(timestamp).strftime(time_format) + else: + return 'unknown' + + +@bot.handler(message='watering_status') +def watering_status(ctx: bot.Context) -> None: + buf = f'last report time: <b>{_get_timestamp_as_string(watering_mcu_status["last_time"])}</b>\n' + if watering_mcu_status["last_boot_time"] != 0: + buf += f'boot time: <b>{_get_timestamp_as_string(watering_mcu_status["last_boot_time"])}</b>\n' + buf += 'relay opened: <b>' + ('yes' if watering_mcu_status['relay_opened'] else 'no') + '</b>\n' + buf += f'ambient temp & humidity: <b>{watering_mcu_status["ambient_temp"]} C, {watering_mcu_status["ambient_rh"]}%</b>' + ctx.reply(buf) + + @bot.defaultreplymarkup def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: - buttons = [ - [ctx.lang('enable'), ctx.lang('disable')], - ] - + buttons = [] if ctx.user_id in config['bot']['silent_users']: buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')]) - - buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering')]) - buttons.append([ctx.lang('status')]) + buttons.append([ctx.lang('enable'), ctx.lang('disable'), ctx.lang('status')],) + buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering'), ctx.lang('watering_status')]) return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) +def mqtt_payload_callback(payload: MqttPayload): + global watering_mcu_status + + watering_mcu_status['last_time'] = int(time()) + + if isinstance(payload, InitialDiagnosticsPayload): + watering_mcu_status['last_boot_time'] = int(time()) + + elif isinstance(payload, MqttTemphumDataPayload): + watering_mcu_status['ambient_temp'] = payload.temp + watering_mcu_status['ambient_rh'] = payload.rh + + elif isinstance(payload, MqttPowerStatusPayload): + watering_mcu_status['relay_opened'] = payload.opened + + if __name__ == '__main__': mqtt = MqttNode(node_id=config.get('mqtt_water_relay.node_id')) if is_development_mode(): add_mqtt_module(mqtt, 'diagnostics') + + mqtt_relay_module = add_mqtt_module(mqtt, 'temphum') mqtt_relay_module = add_mqtt_module(mqtt, 'relay') + mqtt.add_payload_callback(mqtt_payload_callback) + mqtt.configure_tls() mqtt.connect_and_loop(loop_forever=False) |