summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/home/mqtt/_module.py5
-rw-r--r--src/home/mqtt/_node.py16
-rw-r--r--src/home/mqtt/module/diagnostics.py6
-rw-r--r--src/home/mqtt/module/ota.py4
-rw-r--r--src/home/mqtt/module/relay.py21
-rw-r--r--src/home/mqtt/module/temphum.py10
-rwxr-xr-xsrc/pump_bot.py72
7 files changed, 107 insertions, 27 deletions
diff --git a/src/home/mqtt/_module.py b/src/home/mqtt/_module.py
index 949c344..ef50e70 100644
--- a/src/home/mqtt/_module.py
+++ b/src/home/mqtt/_module.py
@@ -3,9 +3,10 @@ from __future__ import annotations
import abc
import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from ._node import MqttNode
+ from ._payload import MqttPayload
class MqttModule(abc.ABC):
@@ -29,5 +30,5 @@ class MqttModule(abc.ABC):
def tick(self, mqtt: MqttNode):
pass
- def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes):
+ def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
pass
diff --git a/src/home/mqtt/_node.py b/src/home/mqtt/_node.py
index c76610f..688b30b 100644
--- a/src/home/mqtt/_node.py
+++ b/src/home/mqtt/_node.py
@@ -3,30 +3,32 @@ import paho.mqtt.client as mqtt
from .mqtt import MqttBase
from typing import List
from ._module import MqttModule
+from ._payload import MqttPayload
class MqttNode(MqttBase):
_modules: List[MqttModule]
_module_subscriptions: dict[str, MqttModule]
_node_id: str
+ _payload_callbacks: list[callable]
# _devices: list[MqttEspDevice]
# _message_callback: Optional[callable]
# _ota_publish_callback: Optional[callable]
def __init__(self,
node_id: str,
- # devices: Union[MqttEspDevice, list[MqttEspDevice]],
- subscribe_to_updates=True):
+ # devices: Union[MqttEspDevice, list[MqttEspDevice]]
+ ):
super().__init__(clean_session=True)
self._modules = []
self._module_subscriptions = {}
self._node_id = node_id
+ self._payload_callbacks = []
# if not isinstance(devices, list):
# devices = [devices]
# self._devices = devices
# self._message_callback = None
# self._ota_publish_callback = None
- # self._subscribe_to_updates = subscribe_to_updates
# self._ota_mid = None
def on_connect(self, client: mqtt.Client, userdata, flags, rc):
@@ -47,7 +49,10 @@ class MqttNode(MqttBase):
actual_topic = topic[len(f'hk/{self._node_id}/'):]
if actual_topic in self._module_subscriptions:
- self._module_subscriptions[actual_topic].handle_payload(self, actual_topic, msg.payload)
+ payload = self._module_subscriptions[actual_topic].handle_payload(self, actual_topic, msg.payload)
+ if isinstance(payload, MqttPayload):
+ for f in self._payload_callbacks:
+ f(payload)
except Exception as e:
self._logger.exception(str(e))
@@ -85,3 +90,6 @@ class MqttNode(MqttBase):
def publish(self, topic: str, payload: bytes, qos: int = 1):
self._client.publish(f'hk/{self._node_id}/{topic}', payload, qos)
self._client.loop_write()
+
+ def add_payload_callback(self, callback: callable):
+ self._payload_callbacks.append(callback) \ No newline at end of file
diff --git a/src/home/mqtt/module/diagnostics.py b/src/home/mqtt/module/diagnostics.py
index 8b5ea16..c31cce2 100644
--- a/src/home/mqtt/module/diagnostics.py
+++ b/src/home/mqtt/module/diagnostics.py
@@ -1,5 +1,6 @@
from ..mqtt import MqttPayload, MqttPayloadCustomField
from .._node import MqttNode, MqttModule
+from typing import Optional
MODULE_NAME = 'MqttDiagnosticsModule'
@@ -51,9 +52,10 @@ class MqttDiagnosticsModule(MqttModule):
for topic in ('diag', 'd1ag', 'stat', 'stat1'):
mqtt.subscribe_module(topic, self)
- def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes):
+ def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
if topic in ('stat', 'diag'):
message = DiagnosticsPayload.unpack(payload)
elif topic in ('stat1', 'd1ag'):
message = InitialDiagnosticsPayload.unpack(payload)
- self._logger.debug(message) \ No newline at end of file
+ self._logger.debug(message)
+ return message
diff --git a/src/home/mqtt/module/ota.py b/src/home/mqtt/module/ota.py
index 1d472d1..86d6839 100644
--- a/src/home/mqtt/module/ota.py
+++ b/src/home/mqtt/module/ota.py
@@ -1,5 +1,6 @@
import hashlib
+from typing import Optional
from ..mqtt import MqttPayload
from .._node import MqttModule, MqttNode
@@ -43,10 +44,11 @@ class MqttOtaModule(MqttModule):
def init(self, mqtt: MqttNode):
mqtt.subscribe_module("otares", self)
- def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes):
+ def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
if topic == 'otares':
message = OtaResultPayload.unpack(payload)
self._logger.debug(message)
+ return message
# def push_ota(self,
# node_id,
diff --git a/src/home/mqtt/module/relay.py b/src/home/mqtt/module/relay.py
index 16877f6..721ceac 100644
--- a/src/home/mqtt/module/relay.py
+++ b/src/home/mqtt/module/relay.py
@@ -1,7 +1,6 @@
-import paho.mqtt.client as mqtt
-import re
import datetime
+from typing import Optional
from .. import MqttModule, MqttPayload, MqttNode
MODULE_NAME = 'MqttRelayModule'
@@ -22,6 +21,18 @@ class MqttPowerSwitchPayload(MqttPayload):
state: bool
+class MqttPowerStatusPayload(MqttPayload):
+ FORMAT = '=B'
+ PACKER = {
+ 'opened': lambda n: int(n),
+ }
+ UNPACKER = {
+ 'opened': lambda n: bool(n),
+ }
+
+ opened: bool
+
+
class MqttRelayState:
enabled: bool
update_time: datetime.datetime
@@ -57,9 +68,11 @@ class MqttRelayModule(MqttModule):
payload = MqttPowerSwitchPayload(secret=secret, state=enable)
mqtt.publish('relay/switch', payload=payload.pack())
- def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes):
+ def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
if topic != 'relay/switch':
return
message = MqttPowerSwitchPayload.unpack(payload)
- self._logger.debug(message) \ No newline at end of file
+ self._logger.debug(message)
+
+ return message \ No newline at end of file
diff --git a/src/home/mqtt/module/temphum.py b/src/home/mqtt/module/temphum.py
index e1c4567..0e43f1b 100644
--- a/src/home/mqtt/module/temphum.py
+++ b/src/home/mqtt/module/temphum.py
@@ -3,13 +3,14 @@ from .._node import MqttNode
from .._module import MqttModule
from .._payload import MqttPayload
from ...util import HashableEnum
+from typing import Optional
two_digits_precision = lambda x: round(x, 2)
MODULE_NAME = 'MqttTempHumModule'
-class TempHumDataPayload(MqttPayload):
+class MqttTemphumDataPayload(MqttPayload):
FORMAT = '=ddb'
UNPACKER = {
'temp': two_digits_precision,
@@ -49,7 +50,8 @@ class MqttTempHumModule(MqttModule):
def handle_payload(self,
mqtt: MqttNode,
topic: str,
- payload: bytes):
+ payload: bytes) -> Optional[MqttPayload]:
if topic == 'temphum/data':
- message = TempHumDataPayload.unpack(payload)
- self._logger.debug(message) \ No newline at end of file
+ message = MqttTemphumDataPayload.unpack(payload)
+ self._logger.debug(message)
+ return message \ No newline at end of file
diff --git a/src/pump_bot.py b/src/pump_bot.py
index d3aa6b0..0e2b71d 100755
--- a/src/pump_bot.py
+++ b/src/pump_bot.py
@@ -2,18 +2,33 @@
from enum import Enum
from typing import Optional
from telegram import ReplyKeyboardMarkup, User
+from time import time
+from datetime import datetime
from home.config import config, is_development_mode
from home.telegram import bot
from home.telegram._botutil import user_any_name
from home.relay.sunxi_h3_client import RelayClient
from home.api.types import BotType
-from home.mqtt import MqttNode, MqttModule, add_mqtt_module
+from home.mqtt import MqttNode, MqttModule, MqttPayload, add_mqtt_module
+from home.mqtt.module.relay import MqttPowerStatusPayload
+from home.mqtt.module.temphum import MqttTemphumDataPayload
+from home.mqtt.module.diagnostics import InitialDiagnosticsPayload
+
config.load('pump_bot')
mqtt: Optional[MqttNode] = None
mqtt_relay_module: Optional[MqttModule] = None
+time_format = '%d.%m.%Y, %H:%M:%S'
+
+watering_mcu_status = {
+ 'last_time': 0,
+ 'last_boot_time': 0,
+ 'relay_opened': False,
+ 'ambient_temp': 0.0,
+ 'ambient_rh': 0.0,
+}
bot.initialize()
bot.lang.ru(
@@ -31,7 +46,9 @@ bot.lang.ru(
start_watering="Включить полив",
stop_watering="Отключить полив",
- status="Статус",
+ status="Статус насоса",
+ watering_status="Статус полива",
+
done="Готово 👌",
user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.',
user_watering_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> полив.',
@@ -55,7 +72,9 @@ bot.lang.en(
start_watering="Start watering",
stop_watering="Stop watering",
- status="Status",
+ status="Pump status",
+ watering_status="Watering status",
+
done="Done 👌",
user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.',
user_watering_notification='User <a href="tg://user?id=%d">%s</a> <b>%s</b> the watering.',
@@ -153,27 +172,60 @@ def status(ctx: bot.Context) -> None:
)
+def _get_timestamp_as_string(timestamp: int) -> str:
+ if timestamp != 0:
+ return datetime.fromtimestamp(timestamp).strftime(time_format)
+ else:
+ return 'unknown'
+
+
+@bot.handler(message='watering_status')
+def watering_status(ctx: bot.Context) -> None:
+ buf = f'last report time: <b>{_get_timestamp_as_string(watering_mcu_status["last_time"])}</b>\n'
+ if watering_mcu_status["last_boot_time"] != 0:
+ buf += f'boot time: <b>{_get_timestamp_as_string(watering_mcu_status["last_boot_time"])}</b>\n'
+ buf += 'relay opened: <b>' + ('yes' if watering_mcu_status['relay_opened'] else 'no') + '</b>\n'
+ buf += f'ambient temp & humidity: <b>{watering_mcu_status["ambient_temp"]} C, {watering_mcu_status["ambient_rh"]}%</b>'
+ ctx.reply(buf)
+
+
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
- buttons = [
- [ctx.lang('enable'), ctx.lang('disable')],
- ]
-
+ buttons = []
if ctx.user_id in config['bot']['silent_users']:
buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')])
-
- buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering')])
- buttons.append([ctx.lang('status')])
+ buttons.append([ctx.lang('enable'), ctx.lang('disable'), ctx.lang('status')],)
+ buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering'), ctx.lang('watering_status')])
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
+def mqtt_payload_callback(payload: MqttPayload):
+ global watering_mcu_status
+
+ watering_mcu_status['last_time'] = int(time())
+
+ if isinstance(payload, InitialDiagnosticsPayload):
+ watering_mcu_status['last_boot_time'] = int(time())
+
+ elif isinstance(payload, MqttTemphumDataPayload):
+ watering_mcu_status['ambient_temp'] = payload.temp
+ watering_mcu_status['ambient_rh'] = payload.rh
+
+ elif isinstance(payload, MqttPowerStatusPayload):
+ watering_mcu_status['relay_opened'] = payload.opened
+
+
if __name__ == '__main__':
mqtt = MqttNode(node_id=config.get('mqtt_water_relay.node_id'))
if is_development_mode():
add_mqtt_module(mqtt, 'diagnostics')
+
+ mqtt_relay_module = add_mqtt_module(mqtt, 'temphum')
mqtt_relay_module = add_mqtt_module(mqtt, 'relay')
+ mqtt.add_payload_callback(mqtt_payload_callback)
+
mqtt.configure_tls()
mqtt.connect_and_loop(loop_forever=False)