diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2022-12-18 06:31:24 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2022-12-24 12:57:55 +0300 |
commit | 0a065f48be99d4ebae49de622a335f23e50c6ca0 (patch) | |
tree | b591d91fac26e5bf7a4dd6d37178b978061ef060 /src | |
parent | 022ec129bb8f511a7bf8cf537f165afce2303262 (diff) |
pump-mqtt-bot: wip; relayctl: somewhat stable
Diffstat (limited to 'src')
-rw-r--r-- | src/home/api/types/types.py | 1 | ||||
-rw-r--r-- | src/home/api/web_api_client.py | 6 | ||||
-rw-r--r-- | src/home/mqtt/__init__.py | 1 | ||||
-rw-r--r-- | src/home/mqtt/mqtt.py | 21 | ||||
-rw-r--r-- | src/home/mqtt/payload/__init__.py | 1 | ||||
-rw-r--r-- | src/home/mqtt/payload/relay.py | 45 | ||||
-rw-r--r-- | src/home/mqtt/relay.py | 93 | ||||
-rw-r--r-- | src/home/telegram/bot.py | 2 | ||||
-rwxr-xr-x | src/pump_mqtt_bot.py | 191 |
9 files changed, 350 insertions, 11 deletions
diff --git a/src/home/api/types/types.py b/src/home/api/types/types.py index 1d5596f..3f77b19 100644 --- a/src/home/api/types/types.py +++ b/src/home/api/types/types.py @@ -8,6 +8,7 @@ class BotType(Enum): ADMIN = auto() SOUND = auto() POLARIS_KETTLE = auto() + PUMP_MQTT = auto() class TemperatureSensorLocation(Enum): diff --git a/src/home/api/web_api_client.py b/src/home/api/web_api_client.py index f74b5a1..ca9a9ee 100644 --- a/src/home/api/web_api_client.py +++ b/src/home/api/web_api_client.py @@ -183,9 +183,9 @@ class WebAPIClient: data = json.loads(r.text) if r.status_code != 200: raise ApiResponseError(r.status_code, - data['error']['type'], - data['error']['message'], - data['error']['stacktrace'] if 'stacktrace' in data['error'] else None) + data['error'], + data['message'], + data['stacktrace'] if 'stacktrace' in data['error'] else None) return data['response'] if 'response' in data else True finally: diff --git a/src/home/mqtt/__init__.py b/src/home/mqtt/__init__.py index c0ef9ba..a6f5f5e 100644 --- a/src/home/mqtt/__init__.py +++ b/src/home/mqtt/__init__.py @@ -1,2 +1,3 @@ from .mqtt import MQTTBase from .util import poll_tick +from .relay import MQTTRelay
\ No newline at end of file diff --git a/src/home/mqtt/mqtt.py b/src/home/mqtt/mqtt.py index b3334b5..9dd973b 100644 --- a/src/home/mqtt/mqtt.py +++ b/src/home/mqtt/mqtt.py @@ -6,8 +6,6 @@ import logging from typing import Tuple from ..config import config -logger = logging.getLogger(__name__) - def username_and_password() -> Tuple[str, str]: username = config['mqtt']['username'] if 'username' in config['mqtt'] else None @@ -23,11 +21,15 @@ class MQTTBase: self._client.on_connect = self.on_connect self._client.on_disconnect = self.on_disconnect self._client.on_message = self.on_message + self._client.on_log = self.on_log + self._client.on_publish = self.on_publish + self._loop_started = False self._logger = logging.getLogger(self.__class__.__name__) username, password = username_and_password() if username and password: + self._logger.debug(f'username={username} password={password}') self._client.username_pw_set(username, password) def configure_tls(self): @@ -50,6 +52,12 @@ class MQTTBase: self._client.loop_forever() else: self._client.loop_start() + self._loop_started = True + + def disconnect(self): + self._client.disconnect() + self._client.loop_write() + self._client.loop_stop() def on_connect(self, client: mqtt.Client, userdata, flags, rc): self._logger.info("Connected with result code " + str(rc)) @@ -57,5 +65,12 @@ class MQTTBase: def on_disconnect(self, client: mqtt.Client, userdata, rc): self._logger.info("Disconnected with result code " + str(rc)) + def on_log(self, client: mqtt.Client, userdata, level, buf): + level = mqtt.LOGGING_LEVEL[level] if level in mqtt.LOGGING_LEVEL else logging.INFO + self._logger.log(level, f'MQTT: {buf}') + def on_message(self, client: mqtt.Client, userdata, msg): - self._logger.info(msg.topic + ": " + str(msg.payload)) + self._logger.debug(msg.topic + ": " + str(msg.payload)) + + def on_publish(self, client: mqtt.Client, userdata, mid): + self._logger.debug(f'publish done, mid={mid}')
\ No newline at end of file diff --git a/src/home/mqtt/payload/__init__.py b/src/home/mqtt/payload/__init__.py index e69de29..9fcaf3e 100644 --- a/src/home/mqtt/payload/__init__.py +++ b/src/home/mqtt/payload/__init__.py @@ -0,0 +1 @@ +from .base_payload import MQTTPayload
\ No newline at end of file diff --git a/src/home/mqtt/payload/relay.py b/src/home/mqtt/payload/relay.py index 2a327ba..debc2c8 100644 --- a/src/home/mqtt/payload/relay.py +++ b/src/home/mqtt/payload/relay.py @@ -1,6 +1,10 @@ +import hashlib + from .base_payload import MQTTPayload, MQTTPayloadCustomField +# _logger = logging.getLogger(__name__) + class StatFlags(MQTTPayloadCustomField): state: bool config_changed_value_present: bool @@ -8,9 +12,11 @@ class StatFlags(MQTTPayloadCustomField): @staticmethod def unpack(flags: int): + # _logger.debug(f'StatFlags.unpack: flags={flags}') state = flags & 0x1 ccvp = (flags >> 1) & 0x1 cc = (flags >> 2) & 0x1 + # _logger.debug(f'StatFlags.unpack: state={state}') return StatFlags(state=(state == 1), config_changed_value_present=(ccvp == 1), config_changed=(cc == 1)) @@ -24,7 +30,7 @@ class StatFlags(MQTTPayloadCustomField): class InitialStatPayload(MQTTPayload): - FORMAT = 'IBbIB' + FORMAT = '=IBbIB' ip: int fw_version: int @@ -34,7 +40,7 @@ class InitialStatPayload(MQTTPayload): class StatPayload(MQTTPayload): - FORMAT = 'bIB' + FORMAT = '=bIB' rssi: int free_heap: int @@ -42,13 +48,42 @@ class StatPayload(MQTTPayload): class PowerPayload(MQTTPayload): - FORMAT = '12sB' + FORMAT = '=12sB' PACKER = { - 'state': lambda n: int(n) + 'state': lambda n: int(n), + 'secret': lambda s: s.encode('utf-8') } UNPACKER = { - 'state': lambda n: bool(n) + 'state': lambda n: bool(n), + 'secret': lambda s: s.decode('utf-8') } secret: str state: bool + + +class OTAPayload(MQTTPayload): + secret: str + filename: str + + # structure of returned data: + # + # uint8_t[len(secret)] secret; + # uint8_t[16] md5; + # *uint8_t data + + def pack(self): + buf = bytearray(self.secret.encode()) + m = hashlib.md5() + with open(self.filename, 'rb') as fd: + content = fd.read() + m.update(content) + buf.extend(m.digest()) + buf.extend(content) + return buf + + def unpack(cls, buf: bytes): + raise RuntimeError(f'{cls.__class__.__name__}.unpack: not implemented') + # secret = buf[:12].decode() + # filename = buf[12:].decode() + # return OTAPayload(secret=secret, filename=filename) diff --git a/src/home/mqtt/relay.py b/src/home/mqtt/relay.py new file mode 100644 index 0000000..0f97b5b --- /dev/null +++ b/src/home/mqtt/relay.py @@ -0,0 +1,93 @@ +import paho.mqtt.client as mqtt +import re + +from .mqtt import MQTTBase +from typing import Optional, Union +from .payload.relay import ( + InitialStatPayload, + StatPayload, + PowerPayload, + OTAPayload +) + + +class MQTTRelay(MQTTBase): + _home_id: Union[str, int] + _secret: str + _message_callback: Optional[callable] + _ota_publish_callback: Optional[callable] + + def __init__(self, + home_id: Union[str, int], + secret: str, + subscribe_to_updates=True): + super().__init__(clean_session=True) + self._home_id = home_id + self._secret = secret + self._message_callback = None + self._ota_publish_callback = None + self._subscribe_to_updates = subscribe_to_updates + self._ota_mid = None + + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + + if self._subscribe_to_updates: + topic = f'hk/{self._home_id}/relay/#' + self._logger.info(f"subscribing to {topic}") + + client.subscribe(topic, qos=1) + + def on_publish(self, client: mqtt.Client, userdata, mid): + if self._ota_mid is not None and mid == self._ota_mid and self._ota_publish_callback: + self._ota_publish_callback() + + def set_message_callback(self, callback: callable): + self._message_callback = callback + + def on_message(self, client: mqtt.Client, userdata, msg): + try: + match = re.match(r'^hk/(.*?)/relay/(stat|stat1|power|otares)$', msg.topic) + self._logger.debug(f'topic: {msg.topic}') + if not match: + return + + name = match.group(1) + subtopic = match.group(2) + + if name != self._home_id: + return + + message = None + if subtopic == 'stat': + message = StatPayload.unpack(msg.payload) + elif subtopic == 'stat1': + message = InitialStatPayload.unpack(msg.payload) + elif subtopic == 'power': + message = PowerPayload.unpack(msg.payload) + + if message and self._message_callback: + self._message_callback(message) + + except Exception as e: + self._logger.exception(str(e)) + + def set_power(self, enable: bool): + payload = PowerPayload(secret=self._secret, + state=enable) + self._client.publish(f'hk/{self._home_id}/relay/power', + payload=payload.pack(), + qos=1) + self._client.loop_write() + + def push_ota(self, + filename: str, + publish_callback: callable, + qos: int): + self._ota_publish_callback = publish_callback + payload = OTAPayload(secret=self._secret, filename=filename) + publish_result = self._client.publish(f'hk/{self._home_id}/relay/admin/ota', + payload=payload.pack(), + qos=qos) + self._ota_mid = publish_result.mid + self._client.loop_write() diff --git a/src/home/telegram/bot.py b/src/home/telegram/bot.py index 3c82587..bd09f42 100644 --- a/src/home/telegram/bot.py +++ b/src/home/telegram/bot.py @@ -110,6 +110,8 @@ def _handler_of_handler(*args, **kwargs): ctx.reply_exc(e) else: notify_user(ctx.user_id, exc2text(e)) + else: + _logger.exception(e) def handler(**kwargs): diff --git a/src/pump_mqtt_bot.py b/src/pump_mqtt_bot.py new file mode 100755 index 0000000..d87234b --- /dev/null +++ b/src/pump_mqtt_bot.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +import datetime + +from enum import Enum +from typing import Optional +from telegram import ReplyKeyboardMarkup, User + +from home.config import config +from home.telegram import bot +from home.telegram._botutil import user_any_name +from home.api.types import BotType +from home.mqtt import MQTTRelay +from home.mqtt.payload import MQTTPayload +from home.mqtt.payload.relay import InitialStatPayload, StatPayload + + +config.load('pump_mqtt_bot') + +bot.initialize() +bot.lang.ru( + start_message="Выберите команду на клавиатуре", + start_message_no_access="Доступ запрещён. Вы можете отправить заявку на получение доступа.", + unknown_command="Неизвестная команда", + send_access_request="Отправить заявку", + management="Админка", + + enable="Включить", + enabled="Включен ✅", + + disable="Выключить", + disabled="Выключен ❌", + + status="Статус", + status_updated=' (обновлено %s)', + + done="Готово 👌", + user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.', + user_action_on="включил", + user_action_off="выключил", + date_yday="вчера", + date_yyday="позавчера", + date_at="в" +) +bot.lang.en( + start_message="Select command on the keyboard", + start_message_no_access="You have no access.", + unknown_command="Unknown command", + send_access_request="Send request", + management="Admin options", + + enable="Turn ON", + enable_silently="Turn ON silently", + enabled="Turned ON ✅", + + disable="Turn OFF", + disable_silently="Turn OFF silently", + disabled="Turned OFF ❌", + + status="Status", + status_updated=' (updated %s)', + + done="Done 👌", + user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.', + user_action_on="ON", + user_action_off="OFF", + + date_yday="yesterday", + date_yyday="the day before yesterday", + date_at="at" +) + + +class RelayState: + enabled: bool + update_time: datetime.datetime + rssi: int + fw_version: int + ever_updated: bool + + def __init__(self): + self.ever_updated = False + + def update(self, + enabled: bool, + rssi: int, + fw_version=None): + self.ever_updated = True + self.enabled = enabled + self.rssi = rssi + self.update_time = datetime.datetime.now() + if fw_version: + self.fw_version = fw_version + + +mqtt_relay: Optional[MQTTRelay] = None +relay_state = RelayState() + + +class UserAction(Enum): + ON = 'on' + OFF = 'off' + + +def on_mqtt_message(message: MQTTPayload): + if isinstance(message, InitialStatPayload) or isinstance(message, StatPayload): + kwargs = dict(rssi=message.rssi, enabled=message.flags.state) + if isinstance(message, InitialStatPayload): + kwargs['fw_version'] = message.fw_version + relay_state.update(**kwargs) + + +def notify(user: User, action: UserAction) -> None: + def text_getter(lang: str): + action_name = bot.lang.get(f'user_action_{action.value}', lang) + user_name = user_any_name(user) + return 'ℹ ' + bot.lang.get('user_action_notification', lang, + user.id, user_name, action_name) + + bot.notify_all(text_getter, exclude=(user.id,)) + + +@bot.handler(message='enable') +def enable_handler(ctx: bot.Context) -> None: + mqtt_relay.set_power(True) + ctx.reply(ctx.lang('done')) + notify(ctx.user, UserAction.ON) + + +@bot.handler(message='disable') +def disable_handler(ctx: bot.Context) -> None: + mqtt_relay.set_power(False) + ctx.reply(ctx.lang('done')) + notify(ctx.user, UserAction.OFF) + + +@bot.handler(message='status') +def status(ctx: bot.Context) -> None: + label = ctx.lang('enabled') if relay_state.enabled else ctx.lang('disabled') + if relay_state.ever_updated: + date_label = '' + today = datetime.date.today() + if today != relay_state.update_time.date(): + yday = today - datetime.timedelta(days=1) + yyday = today - datetime.timedelta(days=2) + if yday == relay_state.update_time.date(): + date_label = ctx.lang('date_yday') + elif yyday == relay_state.update_time.date(): + date_label = ctx.lang('date_yyday') + else: + date_label = relay_state.update_time.strftime('%d.%m.%Y') + date_label += ' ' + date_label += ctx.lang('date_at') + ' ' + date_label += relay_state.update_time.strftime('%H:%M') + label += ctx.lang('status_updated', date_label) + ctx.reply(label) + + +def start(ctx: bot.Context) -> None: + if ctx.user_id in config['bot']['users'] or ctx.user_id in config['bot']['admin_users']: + ctx.reply(ctx.lang('start_message')) + else: + buttons = [ + [ctx.lang('send_access_request')] + ] + ctx.reply(ctx.lang('start_message_no_access'), markup=ReplyKeyboardMarkup(buttons, one_time_keyboard=False)) + + +@bot.exceptionhandler +def exception_handler(e: Exception, ctx: bot.Context) -> bool: + return False + + +@bot.defaultreplymarkup +def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [[ctx.lang('enable'), ctx.lang('disable')], [ctx.lang('status')]] + if ctx.user_id in config['bot']['admin_users']: + buttons.append([ctx.lang('management')]) + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +if __name__ == '__main__': + mqtt_relay = MQTTRelay(home_id=config['mqtt']['home_id'], + secret=config['mqtt']['relay']['secret']) + mqtt_relay.set_message_callback(on_mqtt_message) + mqtt_relay.configure_tls() + mqtt_relay.connect_and_loop(loop_forever=False) + + # bot.enable_logging(BotType.PUMP_MQTT) + bot.run(start_handler=start) + + mqtt_relay.disconnect() |