summaryrefslogtreecommitdiff
path: root/bin/relay_mqtt_bot.py
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2024-02-17 03:08:25 +0300
committerEvgeny Zinoviev <me@ch1p.io>2024-02-17 03:08:25 +0300
commit0ce2e41a2bad790c5232fafb4b6ed631ca8cd957 (patch)
treefd401495b87cae8c95a4c4edf2c851c8177b6069 /bin/relay_mqtt_bot.py
parente9fc2c1835f7ac8e072919df81a6661c6308dea9 (diff)
parentb7f1d55c9b4de4d21b11e5615a5dc8be0d4e883c (diff)
merge with master
Diffstat (limited to 'bin/relay_mqtt_bot.py')
-rwxr-xr-xbin/relay_mqtt_bot.py164
1 files changed, 164 insertions, 0 deletions
diff --git a/bin/relay_mqtt_bot.py b/bin/relay_mqtt_bot.py
new file mode 100755
index 0000000..3ad0a9b
--- /dev/null
+++ b/bin/relay_mqtt_bot.py
@@ -0,0 +1,164 @@
+#!/usr/bin/env python3
+import sys
+import __py_include
+
+from enum import Enum
+from typing import Optional, Union
+from telegram import ReplyKeyboardMarkup
+from functools import partial
+
+from homekit.config import config, AppConfigUnit, Translation
+from homekit.telegram import bot
+from homekit.telegram.config import TelegramBotConfig
+from homekit.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule, MqttNodesConfig
+from homekit.mqtt.module.relay import MqttRelayModule, MqttRelayState
+from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
+
+
+if __name__ != '__main__':
+ print(f'this script can not be imported as module', file=sys.stderr)
+ sys.exit(1)
+
+
+mqtt_nodes_config = MqttNodesConfig()
+
+
+class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig):
+ NAME = 'relay_mqtt_bot'
+
+ _strings: Translation
+
+ def __init__(self):
+ super().__init__()
+ self._strings = Translation('mqtt_nodes')
+
+ @classmethod
+ def schema(cls) -> Optional[dict]:
+ return {
+ **super(TelegramBotConfig).schema(),
+ 'relay_nodes': {
+ 'type': 'list',
+ 'required': True,
+ 'schema': {
+ 'type': 'string'
+ }
+ },
+ }
+
+ @staticmethod
+ def custom_validator(data):
+ relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True)
+ for node in data['relay_nodes']:
+ if node not in relay_node_names:
+ raise ValueError(f'unknown relay node "{node}"')
+
+ def get_relay_name_translated(self, lang: str, relay_name: str) -> str:
+ return self._strings.get(lang)[relay_name]['relay']
+
+
+config.load_app(RelayMqttBotConfig)
+
+bot.initialize()
+bot.lang.ru(
+ start_message="Выберите команду на клавиатуре",
+ unknown_command="Неизвестная команда",
+ done="Готово 👌",
+)
+bot.lang.en(
+ start_message="Select command on the keyboard",
+ unknown_command="Unknown command",
+ done="Done 👌",
+)
+
+
+type_emojis = {
+ 'lamp': '💡'
+}
+status_emoji = {
+ 'on': '✅',
+ 'off': '❌'
+}
+
+
+mqtt: MqttWrapper
+relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {}
+relay_states: dict[str, MqttRelayState] = {}
+
+
+class UserAction(Enum):
+ ON = 'on'
+ OFF = 'off'
+
+
+def on_mqtt_message(node: MqttNode,
+ message: MqttPayload):
+ if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
+ kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
+ if isinstance(message, InitialDiagnosticsPayload):
+ kwargs['fw_version'] = message.fw_version
+ if node.id not in relay_states:
+ relay_states[node.id] = MqttRelayState()
+ relay_states[node.id].update(**kwargs)
+
+
+async def enable_handler(node_id: str, ctx: bot.Context) -> None:
+ relay_nodes[node_id].switchpower(True)
+ await ctx.reply(ctx.lang('done'))
+
+
+async def disable_handler(node_id: str, ctx: bot.Context) -> None:
+ relay_nodes[node_id].switchpower(False)
+ await ctx.reply(ctx.lang('done'))
+
+
+async def start(ctx: bot.Context) -> None:
+ await ctx.reply(ctx.lang('start_message'))
+
+
+@bot.exceptionhandler
+async def exception_handler(e: Exception, ctx: bot.Context) -> bool:
+ return False
+
+
+@bot.defaultreplymarkup
+def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
+ buttons = []
+ for node_id in config.app_config['relay_nodes']:
+ node_data = mqtt_nodes_config.get_node(node_id)
+ type_emoji = type_emojis[node_data['relay']['device_type']]
+ row = [f'{type_emoji}{status_emoji[i.value]} {config.app_config.get_relay_name_translated(ctx.user_lang, node_id)}'
+ for i in UserAction]
+ buttons.append(row)
+ return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
+
+
+devices = []
+mqtt = MqttWrapper(client_id='relay_mqtt_bot')
+for node_id in config.app_config['relay_nodes']:
+ node_data = mqtt_nodes_config.get_node(node_id)
+ mqtt_node = MqttNode(node_id=node_id,
+ node_secret=node_data['password'])
+ module_kwargs = {}
+ try:
+ if node_data['relay']['legacy_topics']:
+ module_kwargs['legacy_topics'] = True
+ except KeyError:
+ pass
+ relay_nodes[node_id] = mqtt_node.load_module('relay', **module_kwargs)
+ mqtt_node.add_payload_callback(on_mqtt_message)
+ mqtt.add_node(mqtt_node)
+
+ type_emoji = type_emojis[node_data['relay']['device_type']]
+
+ for action in UserAction:
+ messages = []
+ for _lang in Translation.LANGUAGES:
+ _label = config.app_config.get_relay_name_translated(_lang, node_id)
+ messages.append(f'{type_emoji}{status_emoji[action.value]} {_label}')
+ bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, node_id))
+
+mqtt.connect_and_loop(loop_forever=False)
+
+bot.run(start_handler=start)
+
+mqtt.disconnect()