diff options
Diffstat (limited to 'bin/relay_mqtt_bot.py')
-rwxr-xr-x | bin/relay_mqtt_bot.py | 164 |
1 files changed, 164 insertions, 0 deletions
diff --git a/bin/relay_mqtt_bot.py b/bin/relay_mqtt_bot.py new file mode 100755 index 0000000..3ad0a9b --- /dev/null +++ b/bin/relay_mqtt_bot.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +import sys +import __py_include + +from enum import Enum +from typing import Optional, Union +from telegram import ReplyKeyboardMarkup +from functools import partial + +from homekit.config import config, AppConfigUnit, Translation +from homekit.telegram import bot +from homekit.telegram.config import TelegramBotConfig +from homekit.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule, MqttNodesConfig +from homekit.mqtt.module.relay import MqttRelayModule, MqttRelayState +from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload + + +if __name__ != '__main__': + print(f'this script can not be imported as module', file=sys.stderr) + sys.exit(1) + + +mqtt_nodes_config = MqttNodesConfig() + + +class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig): + NAME = 'relay_mqtt_bot' + + _strings: Translation + + def __init__(self): + super().__init__() + self._strings = Translation('mqtt_nodes') + + @classmethod + def schema(cls) -> Optional[dict]: + return { + **super(TelegramBotConfig).schema(), + 'relay_nodes': { + 'type': 'list', + 'required': True, + 'schema': { + 'type': 'string' + } + }, + } + + @staticmethod + def custom_validator(data): + relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True) + for node in data['relay_nodes']: + if node not in relay_node_names: + raise ValueError(f'unknown relay node "{node}"') + + def get_relay_name_translated(self, lang: str, relay_name: str) -> str: + return self._strings.get(lang)[relay_name]['relay'] + + +config.load_app(RelayMqttBotConfig) + +bot.initialize() +bot.lang.ru( + start_message="Выберите команду на клавиатуре", + unknown_command="Неизвестная команда", + done="Готово 👌", +) +bot.lang.en( + start_message="Select command on the keyboard", + unknown_command="Unknown command", + done="Done 👌", +) + + +type_emojis = { + 'lamp': '💡' +} +status_emoji = { + 'on': '✅', + 'off': '❌' +} + + +mqtt: MqttWrapper +relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {} +relay_states: dict[str, MqttRelayState] = {} + + +class UserAction(Enum): + ON = 'on' + OFF = 'off' + + +def on_mqtt_message(node: MqttNode, + message: MqttPayload): + if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): + kwargs = dict(rssi=message.rssi, enabled=message.flags.state) + if isinstance(message, InitialDiagnosticsPayload): + kwargs['fw_version'] = message.fw_version + if node.id not in relay_states: + relay_states[node.id] = MqttRelayState() + relay_states[node.id].update(**kwargs) + + +async def enable_handler(node_id: str, ctx: bot.Context) -> None: + relay_nodes[node_id].switchpower(True) + await ctx.reply(ctx.lang('done')) + + +async def disable_handler(node_id: str, ctx: bot.Context) -> None: + relay_nodes[node_id].switchpower(False) + await ctx.reply(ctx.lang('done')) + + +async def start(ctx: bot.Context) -> None: + await ctx.reply(ctx.lang('start_message')) + + +@bot.exceptionhandler +async def exception_handler(e: Exception, ctx: bot.Context) -> bool: + return False + + +@bot.defaultreplymarkup +def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [] + for node_id in config.app_config['relay_nodes']: + node_data = mqtt_nodes_config.get_node(node_id) + type_emoji = type_emojis[node_data['relay']['device_type']] + row = [f'{type_emoji}{status_emoji[i.value]} {config.app_config.get_relay_name_translated(ctx.user_lang, node_id)}' + for i in UserAction] + buttons.append(row) + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +devices = [] +mqtt = MqttWrapper(client_id='relay_mqtt_bot') +for node_id in config.app_config['relay_nodes']: + node_data = mqtt_nodes_config.get_node(node_id) + mqtt_node = MqttNode(node_id=node_id, + node_secret=node_data['password']) + module_kwargs = {} + try: + if node_data['relay']['legacy_topics']: + module_kwargs['legacy_topics'] = True + except KeyError: + pass + relay_nodes[node_id] = mqtt_node.load_module('relay', **module_kwargs) + mqtt_node.add_payload_callback(on_mqtt_message) + mqtt.add_node(mqtt_node) + + type_emoji = type_emojis[node_data['relay']['device_type']] + + for action in UserAction: + messages = [] + for _lang in Translation.LANGUAGES: + _label = config.app_config.get_relay_name_translated(_lang, node_id) + messages.append(f'{type_emoji}{status_emoji[action.value]} {_label}') + bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, node_id)) + +mqtt.connect_and_loop(loop_forever=False) + +bot.run(start_handler=start) + +mqtt.disconnect() |