summaryrefslogtreecommitdiff
path: root/src/relay_mqtt_bot.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/relay_mqtt_bot.py')
-rwxr-xr-xsrc/relay_mqtt_bot.py154
1 files changed, 103 insertions, 51 deletions
diff --git a/src/relay_mqtt_bot.py b/src/relay_mqtt_bot.py
index ebbff82..9de8c7e 100755
--- a/src/relay_mqtt_bot.py
+++ b/src/relay_mqtt_bot.py
@@ -1,18 +1,62 @@
#!/usr/bin/env python3
+import sys
+
from enum import Enum
-from typing import Optional
+from typing import Optional, Union
from telegram import ReplyKeyboardMarkup
from functools import partial
-from home.config import config
+from home.config import config, AppConfigUnit, Translation
from home.telegram import bot
-from home.mqtt import MqttRelay, MqttRelayState
-from home.mqtt.esp import MqttEspDevice
-from home.mqtt.payload import MqttPayload
-from home.mqtt.payload.relay import InitialDiagnosticsPayload, DiagnosticsPayload
+from home.telegram.config import TelegramBotConfig
+from home.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule
+from home.mqtt import MqttNodesConfig
+from home.mqtt.module.relay import MqttRelayModule, MqttRelayState
+from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
+
+
+if __name__ != '__main__':
+ print(f'this script can not be imported as module', file=sys.stderr)
+ sys.exit(1)
+
+
+mqtt_nodes_config = MqttNodesConfig()
+
+
+class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig):
+ NAME = 'relay_mqtt_bot'
+
+ _strings: Translation
+
+ def __init__(self):
+ super().__init__()
+ self._strings = Translation('mqtt_nodes')
+
+ @staticmethod
+ def schema() -> Optional[dict]:
+ return {
+ **super(TelegramBotConfig).schema(),
+ 'relay_nodes': {
+ 'type': 'list',
+ 'required': True,
+ 'schema': {
+ 'type': 'string'
+ }
+ },
+ }
+ @staticmethod
+ def custom_validator(data):
+ relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True)
+ for node in data['relay_nodes']:
+ if node not in relay_node_names:
+ raise ValueError(f'unknown relay node "{node}"')
-config.load('relay_mqtt_bot')
+ def get_relay_name_translated(self, lang: str, relay_name: str) -> str:
+ return self._strings.get(lang)[relay_name]['relay']
+
+
+config.load_app(RelayMqttBotConfig)
bot.initialize()
bot.lang.ru(
@@ -34,7 +78,10 @@ status_emoji = {
'on': '✅',
'off': '❌'
}
-mqtt_relay: Optional[MqttRelay] = None
+
+
+mqtt: MqttWrapper
+relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {}
relay_states: dict[str, MqttRelayState] = {}
@@ -43,70 +90,75 @@ class UserAction(Enum):
OFF = 'off'
-def on_mqtt_message(home_id, message: MqttPayload):
+def on_mqtt_message(node: MqttNode,
+ message: MqttPayload):
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
if isinstance(message, InitialDiagnosticsPayload):
kwargs['fw_version'] = message.fw_version
- if home_id not in relay_states:
- relay_states[home_id] = MqttRelayState()
- relay_states[home_id].update(**kwargs)
+ if node.id not in relay_states:
+ relay_states[node.id] = MqttRelayState()
+ relay_states[node.id].update(**kwargs)
-def enable_handler(home_id: str, ctx: bot.Context) -> None:
- mqtt_relay.set_power(home_id, True)
- ctx.reply(ctx.lang('done'))
+async def enable_handler(node_id: str, ctx: bot.Context) -> None:
+ relay_nodes[node_id].switchpower(True)
+ await ctx.reply(ctx.lang('done'))
-def disable_handler(home_id: str, ctx: bot.Context) -> None:
- mqtt_relay.set_power(home_id, False)
- ctx.reply(ctx.lang('done'))
+async def disable_handler(node_id: str, ctx: bot.Context) -> None:
+ relay_nodes[node_id].switchpower(False)
+ await ctx.reply(ctx.lang('done'))
-def start(ctx: bot.Context) -> None:
- ctx.reply(ctx.lang('start_message'))
+async def start(ctx: bot.Context) -> None:
+ await ctx.reply(ctx.lang('start_message'))
@bot.exceptionhandler
-def exception_handler(e: Exception, ctx: bot.Context) -> bool:
+async def exception_handler(e: Exception, ctx: bot.Context) -> bool:
return False
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = []
- for device_id, data in config['relays'].items():
- labels = data['labels']
- type_emoji = type_emojis[data['type']]
- row = [f'{type_emoji}{status_emoji[i.value]} {labels[ctx.user_lang]}'
+ for node_id in config.app_config['relay_nodes']:
+ node_data = mqtt_nodes_config.get_node(node_id)
+ type_emoji = type_emojis[node_data['relay']['device_type']]
+ row = [f'{type_emoji}{status_emoji[i.value]} {config.app_config.get_relay_name_translated(ctx.user_lang, node_id)}'
for i in UserAction]
buttons.append(row)
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
-if __name__ == '__main__':
- devices = []
- for device_id, data in config['relays'].items():
- devices.append(MqttEspDevice(id=device_id,
- secret=data['secret']))
- labels = data['labels']
- bot.lang.ru(**{device_id: labels['ru']})
- bot.lang.en(**{device_id: labels['en']})
-
- type_emoji = type_emojis[data['type']]
-
- for action in UserAction:
- messages = []
- for _lang, _label in labels.items():
- messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
- bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
-
- mqtt_relay = MqttRelay(devices=devices)
- mqtt_relay.set_message_callback(on_mqtt_message)
- mqtt_relay.configure_tls()
- mqtt_relay.connect_and_loop(loop_forever=False)
-
- # bot.enable_logging(BotType.RELAY_MQTT)
- bot.run(start_handler=start)
-
- mqtt_relay.disconnect()
+devices = []
+mqtt = MqttWrapper(client_id='relay_mqtt_bot')
+for node_id in config.app_config['relay_nodes']:
+ node_data = mqtt_nodes_config.get_node(node_id)
+ mqtt_node = MqttNode(node_id=node_id,
+ node_secret=node_data['password'])
+ module_kwargs = {}
+ try:
+ if node_data['relay']['legacy_topics']:
+ module_kwargs['legacy_topics'] = True
+ except KeyError:
+ pass
+ relay_nodes[node_id] = mqtt_node.load_module('relay', **module_kwargs)
+ mqtt_node.add_payload_callback(on_mqtt_message)
+ mqtt.add_node(mqtt_node)
+
+ type_emoji = type_emojis[node_data['relay']['device_type']]
+
+ for action in UserAction:
+ messages = []
+ for _lang in Translation.LANGUAGES:
+ _label = config.app_config.get_relay_name_translated(_lang, node_id)
+ messages.append(f'{type_emoji}{status_emoji[action.value]} {_label}')
+ bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, node_id))
+
+mqtt.connect_and_loop(loop_forever=False)
+
+bot.run(start_handler=start)
+
+mqtt.disconnect()