#!/usr/bin/env python3 import sys from enum import Enum from typing import Optional, Union from telegram import ReplyKeyboardMarkup from functools import partial from home.config import config, AppConfigUnit, Translation from home.telegram import bot from home.telegram.config import TelegramBotConfig from home.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule from home.mqtt import MqttNodesConfig from home.mqtt.module.relay import MqttRelayModule, MqttRelayState from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload if __name__ != '__main__': print(f'this script can not be imported as module', file=sys.stderr) sys.exit(1) mqtt_nodes_config = MqttNodesConfig() class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig): NAME = 'relay_mqtt_bot' _strings: Translation def __init__(self): super().__init__() self._strings = Translation('mqtt_nodes') @staticmethod def schema() -> Optional[dict]: return { **super(TelegramBotConfig).schema(), 'relay_nodes': { 'type': 'list', 'required': True, 'schema': { 'type': 'string' } }, } @staticmethod def custom_validator(data): relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True) for node in data['relay_nodes']: if node not in relay_node_names: raise ValueError(f'unknown relay node "{node}"') def get_relay_name_translated(self, lang: str, relay_name: str) -> str: return self._strings.get(lang)[relay_name]['relay'] config.load_app(RelayMqttBotConfig) bot.initialize() bot.lang.ru( start_message="Выберите команду на клавиатуре", unknown_command="Неизвестная команда", done="Готово 👌", ) bot.lang.en( start_message="Select command on the keyboard", unknown_command="Unknown command", done="Done 👌", ) type_emojis = { 'lamp': '💡' } status_emoji = { 'on': '✅', 'off': '❌' } mqtt: MqttWrapper relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {} relay_states: dict[str, MqttRelayState] = {} class UserAction(Enum): ON = 'on' OFF = 'off' def on_mqtt_message(node: MqttNode, message: MqttPayload): if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): kwargs = dict(rssi=message.rssi, enabled=message.flags.state) if isinstance(message, InitialDiagnosticsPayload): kwargs['fw_version'] = message.fw_version if node.id not in relay_states: relay_states[node.id] = MqttRelayState() relay_states[node.id].update(**kwargs) async def enable_handler(node_id: str, ctx: bot.Context) -> None: relay_nodes[node_id].switchpower(True) await ctx.reply(ctx.lang('done')) async def disable_handler(node_id: str, ctx: bot.Context) -> None: relay_nodes[node_id].switchpower(False) await ctx.reply(ctx.lang('done')) async def start(ctx: bot.Context) -> None: await ctx.reply(ctx.lang('start_message')) @bot.exceptionhandler async def exception_handler(e: Exception, ctx: bot.Context) -> bool: return False @bot.defaultreplymarkup def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: buttons = [] for node_id in config.app_config['relay_nodes']: node_data = mqtt_nodes_config.get_node(node_id) type_emoji = type_emojis[node_data['relay']['device_type']] row = [f'{type_emoji}{status_emoji[i.value]} {config.app_config.get_relay_name_translated(ctx.user_lang, node_id)}' for i in UserAction] buttons.append(row) return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) devices = [] mqtt = MqttWrapper(client_id='relay_mqtt_bot') for node_id in config.app_config['relay_nodes']: node_data = mqtt_nodes_config.get_node(node_id) mqtt_node = MqttNode(node_id=node_id, node_secret=node_data['password']) module_kwargs = {} try: if node_data['relay']['legacy_topics']: module_kwargs['legacy_topics'] = True except KeyError: pass relay_nodes[node_id] = mqtt_node.load_module('relay', **module_kwargs) mqtt_node.add_payload_callback(on_mqtt_message) mqtt.add_node(mqtt_node) type_emoji = type_emojis[node_data['relay']['device_type']] for action in UserAction: messages = [] for _lang in Translation.LANGUAGES: _label = config.app_config.get_relay_name_translated(_lang, node_id) messages.append(f'{type_emoji}{status_emoji[action.value]} {_label}') bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, node_id)) mqtt.connect_and_loop(loop_forever=False) bot.run(start_handler=start) mqtt.disconnect()