diff options
Diffstat (limited to 'src/relay_mqtt_bot.py')
-rwxr-xr-x | src/relay_mqtt_bot.py | 154 |
1 files changed, 103 insertions, 51 deletions
diff --git a/src/relay_mqtt_bot.py b/src/relay_mqtt_bot.py index ebbff82..9de8c7e 100755 --- a/src/relay_mqtt_bot.py +++ b/src/relay_mqtt_bot.py @@ -1,18 +1,62 @@ #!/usr/bin/env python3 +import sys + from enum import Enum -from typing import Optional +from typing import Optional, Union from telegram import ReplyKeyboardMarkup from functools import partial -from home.config import config +from home.config import config, AppConfigUnit, Translation from home.telegram import bot -from home.mqtt import MqttRelay, MqttRelayState -from home.mqtt.esp import MqttEspDevice -from home.mqtt.payload import MqttPayload -from home.mqtt.payload.relay import InitialDiagnosticsPayload, DiagnosticsPayload +from home.telegram.config import TelegramBotConfig +from home.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule +from home.mqtt import MqttNodesConfig +from home.mqtt.module.relay import MqttRelayModule, MqttRelayState +from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload + + +if __name__ != '__main__': + print(f'this script can not be imported as module', file=sys.stderr) + sys.exit(1) + + +mqtt_nodes_config = MqttNodesConfig() + + +class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig): + NAME = 'relay_mqtt_bot' + + _strings: Translation + + def __init__(self): + super().__init__() + self._strings = Translation('mqtt_nodes') + + @staticmethod + def schema() -> Optional[dict]: + return { + **super(TelegramBotConfig).schema(), + 'relay_nodes': { + 'type': 'list', + 'required': True, + 'schema': { + 'type': 'string' + } + }, + } + @staticmethod + def custom_validator(data): + relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True) + for node in data['relay_nodes']: + if node not in relay_node_names: + raise ValueError(f'unknown relay node "{node}"') -config.load('relay_mqtt_bot') + def get_relay_name_translated(self, lang: str, relay_name: str) -> str: + return self._strings.get(lang)[relay_name]['relay'] + + +config.load_app(RelayMqttBotConfig) bot.initialize() bot.lang.ru( @@ -34,7 +78,10 @@ status_emoji = { 'on': '✅', 'off': '❌' } -mqtt_relay: Optional[MqttRelay] = None + + +mqtt: MqttWrapper +relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {} relay_states: dict[str, MqttRelayState] = {} @@ -43,70 +90,75 @@ class UserAction(Enum): OFF = 'off' -def on_mqtt_message(home_id, message: MqttPayload): +def on_mqtt_message(node: MqttNode, + message: MqttPayload): if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): kwargs = dict(rssi=message.rssi, enabled=message.flags.state) if isinstance(message, InitialDiagnosticsPayload): kwargs['fw_version'] = message.fw_version - if home_id not in relay_states: - relay_states[home_id] = MqttRelayState() - relay_states[home_id].update(**kwargs) + if node.id not in relay_states: + relay_states[node.id] = MqttRelayState() + relay_states[node.id].update(**kwargs) -def enable_handler(home_id: str, ctx: bot.Context) -> None: - mqtt_relay.set_power(home_id, True) - ctx.reply(ctx.lang('done')) +async def enable_handler(node_id: str, ctx: bot.Context) -> None: + relay_nodes[node_id].switchpower(True) + await ctx.reply(ctx.lang('done')) -def disable_handler(home_id: str, ctx: bot.Context) -> None: - mqtt_relay.set_power(home_id, False) - ctx.reply(ctx.lang('done')) +async def disable_handler(node_id: str, ctx: bot.Context) -> None: + relay_nodes[node_id].switchpower(False) + await ctx.reply(ctx.lang('done')) -def start(ctx: bot.Context) -> None: - ctx.reply(ctx.lang('start_message')) +async def start(ctx: bot.Context) -> None: + await ctx.reply(ctx.lang('start_message')) @bot.exceptionhandler -def exception_handler(e: Exception, ctx: bot.Context) -> bool: +async def exception_handler(e: Exception, ctx: bot.Context) -> bool: return False @bot.defaultreplymarkup def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: buttons = [] - for device_id, data in config['relays'].items(): - labels = data['labels'] - type_emoji = type_emojis[data['type']] - row = [f'{type_emoji}{status_emoji[i.value]} {labels[ctx.user_lang]}' + for node_id in config.app_config['relay_nodes']: + node_data = mqtt_nodes_config.get_node(node_id) + type_emoji = type_emojis[node_data['relay']['device_type']] + row = [f'{type_emoji}{status_emoji[i.value]} {config.app_config.get_relay_name_translated(ctx.user_lang, node_id)}' for i in UserAction] buttons.append(row) return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) -if __name__ == '__main__': - devices = [] - for device_id, data in config['relays'].items(): - devices.append(MqttEspDevice(id=device_id, - secret=data['secret'])) - labels = data['labels'] - bot.lang.ru(**{device_id: labels['ru']}) - bot.lang.en(**{device_id: labels['en']}) - - type_emoji = type_emojis[data['type']] - - for action in UserAction: - messages = [] - for _lang, _label in labels.items(): - messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}') - bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id)) - - mqtt_relay = MqttRelay(devices=devices) - mqtt_relay.set_message_callback(on_mqtt_message) - mqtt_relay.configure_tls() - mqtt_relay.connect_and_loop(loop_forever=False) - - # bot.enable_logging(BotType.RELAY_MQTT) - bot.run(start_handler=start) - - mqtt_relay.disconnect() +devices = [] +mqtt = MqttWrapper(client_id='relay_mqtt_bot') +for node_id in config.app_config['relay_nodes']: + node_data = mqtt_nodes_config.get_node(node_id) + mqtt_node = MqttNode(node_id=node_id, + node_secret=node_data['password']) + module_kwargs = {} + try: + if node_data['relay']['legacy_topics']: + module_kwargs['legacy_topics'] = True + except KeyError: + pass + relay_nodes[node_id] = mqtt_node.load_module('relay', **module_kwargs) + mqtt_node.add_payload_callback(on_mqtt_message) + mqtt.add_node(mqtt_node) + + type_emoji = type_emojis[node_data['relay']['device_type']] + + for action in UserAction: + messages = [] + for _lang in Translation.LANGUAGES: + _label = config.app_config.get_relay_name_translated(_lang, node_id) + messages.append(f'{type_emoji}{status_emoji[action.value]} {_label}') + bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, node_id)) + +mqtt.connect_and_loop(loop_forever=False) + +bot.run(start_handler=start) + +mqtt.disconnect() |