diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2023-06-09 00:13:08 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2023-06-09 00:13:08 +0300 |
commit | 2d6bb8278750ab0c79604a6c14726cd67bcf4cc2 (patch) | |
tree | 32a184d0f1d3cf4332e52b458080cbd81c783a33 | |
parent | 0026ec62566f3d5fc9f3b6db86a0d3b5f8905dba (diff) |
wip
-rw-r--r-- | src/home/telegram/config.py | 75 | ||||
-rwxr-xr-x | src/inverter_bot.py | 25 | ||||
-rwxr-xr-x | src/mqtt_node_util.py | 3 | ||||
-rwxr-xr-x | src/relay_mqtt_bot.py | 88 |
4 files changed, 148 insertions, 43 deletions
diff --git a/src/home/telegram/config.py b/src/home/telegram/config.py new file mode 100644 index 0000000..8ca4c09 --- /dev/null +++ b/src/home/telegram/config.py @@ -0,0 +1,75 @@ +from ..config import ConfigUnit +from typing import Optional, Union +from abc import ABC +from enum import Enum + + +class TelegramUserListType(Enum): + USERS = 'users' + NOTIFY = 'notify_users' + + +class TelegramUserIdsConfig(ConfigUnit): + NAME = 'telegram_user_ids' + + @staticmethod + def schema() -> Optional[dict]: + return { + 'type': 'dict', + 'schema': {'type': 'int'} + } + + +_user_ids_config = TelegramUserIdsConfig() + + +def _user_id_mapper(user: Union[str, int]) -> int: + if isinstance(user, int): + return user + return _user_ids_config[user] + + +class TelegramChatsConfig(ConfigUnit): + NAME = 'telegram_chats' + + @staticmethod + def schema() -> Optional[dict]: + return { + 'type': 'dict', + 'schema': { + 'id': {'type': 'string', 'required': True}, + 'token': {'type': 'string', 'required': True}, + } + } + + +class TelegramBotConfig(ConfigUnit, ABC): + @staticmethod + def schema() -> Optional[dict]: + return { + 'bot': { + 'type': 'dict', + 'schema': { + 'token': {'type': 'string', 'required': True}, + TelegramUserListType.USERS: {**TelegramBotConfig._userlist_schema(), 'required': True}, + TelegramUserListType.NOTIFY: TelegramBotConfig._userlist_schema(), + } + } + } + + @staticmethod + def _userlist_schema() -> dict: + return {'type': 'list', 'schema': {'type': ['string', 'int']}} + + @staticmethod + def custom_validator(data): + for ult in TelegramUserListType: + users = data['bot'][ult.value] + for user in users: + if isinstance(user, str): + if user not in _user_ids_config: + raise ValueError(f'user {user} not found in {TelegramUserIdsConfig.NAME}') + + def get_user_ids(self, + ult: TelegramUserListType = TelegramUserListType.USERS) -> list[int]: + return list(map(_user_id_mapper, self['bot'][ult.value]))
\ No newline at end of file diff --git a/src/inverter_bot.py b/src/inverter_bot.py index 8a9c363..ecf01fc 100755 --- a/src/inverter_bot.py +++ b/src/inverter_bot.py @@ -4,6 +4,7 @@ import re import datetime import json import itertools +import sys from inverterd import Format, InverterError from html import escape @@ -12,6 +13,7 @@ from typing import Optional, Tuple, Union from home.util import chunks from home.config import config, AppConfigUnit from home.telegram import bot +from home.telegram.config import TelegramBotConfig, TelegramUserListType from home.inverter import ( wrapper_instance as inverter, beautify_table, @@ -29,6 +31,12 @@ from home.api.types import BotType from home.api import WebAPIClient from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton + +if __name__ != '__main__': + print(f'this script can not be imported as module', file=sys.stderr) + sys.exit(1) + + db = None LT = escape('<=') flags_map = { @@ -44,16 +52,11 @@ flags_map = { logger = logging.getLogger(__name__) -class InverterBotConfig(AppConfigUnit): +class InverterBotConfig(AppConfigUnit, TelegramBotConfig): NAME = 'inverter_bot' @staticmethod def schema() -> Optional[dict]: - userlist_schema = { - 'type': 'list', - 'empty': False, - 'schema': {'type': 'integer'} - } acmode_item_schema = { 'thresholds': { 'type': 'list', @@ -68,15 +71,7 @@ class InverterBotConfig(AppConfigUnit): } return { - 'bot': { - 'type': 'dict', - 'required': True, - 'schema': { - 'token': {'type': 'string'}, - 'users': userlist_schema, - 'notify_users': userlist_schema - } - }, + **super(TelegramBotConfig).schema(), 'ac_mode': { 'type': 'dict', 'required': True, diff --git a/src/mqtt_node_util.py b/src/mqtt_node_util.py index 43830f9..e2ec838 100755 --- a/src/mqtt_node_util.py +++ b/src/mqtt_node_util.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import os.path + from time import sleep from typing import Optional from argparse import ArgumentParser, ArgumentError @@ -22,7 +23,7 @@ if __name__ == '__main__': parser.add_argument('--switch-relay', choices=[0, 1], type=int, help='send relay state') parser.add_argument('--push-ota', type=str, metavar='OTA_FILENAME', - help='push ota, argument receives filename') + help='push OTA, receives path to firmware.bin') config.load_app(parser=parser, no_config=True) arg = parser.parse_args() diff --git a/src/relay_mqtt_bot.py b/src/relay_mqtt_bot.py index f6a1532..24ee6e1 100755 --- a/src/relay_mqtt_bot.py +++ b/src/relay_mqtt_bot.py @@ -1,17 +1,53 @@ #!/usr/bin/env python3 +import sys + from enum import Enum -from typing import Optional +from typing import Optional, Union from telegram import ReplyKeyboardMarkup from functools import partial -from home.config import config +from home.config import config, AppConfigUnit from home.telegram import bot -from home.mqtt import MqttPayload, MqttNode, MqttWrapper +from home.telegram.config import TelegramBotConfig +from home.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule +from home.mqtt import MqttNodesConfig from home.mqtt.module.relay import MqttRelayModule, MqttRelayState from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload -config.load_app('relay_mqtt_bot') +if __name__ != '__main__': + print(f'this script can not be imported as module', file=sys.stderr) + sys.exit(1) + + +mqtt_nodes_config = MqttNodesConfig() + + +class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig): + NAME = 'relay_mqtt_bot' + + @staticmethod + def schema() -> Optional[dict]: + return { + **super(TelegramBotConfig).schema(), + 'relay_nodes': { + 'type': 'list', + 'required': True, + 'schema': { + 'type': 'string' + } + }, + } + + @staticmethod + def custom_validator(data): + relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True) + for node in data['relay_nodes']: + if node not in relay_node_names: + raise ValueError(f'unknown relay node "{node}"') + + +config.load_app(RelayMqttBotConfig) bot.initialize() bot.lang.ru( @@ -35,9 +71,8 @@ status_emoji = { } -# mqtt_relay: Optional[MqttRelayModule] = None mqtt: Optional[MqttWrapper] = None -relay_nodes: dict[str, MqttRelayModule] = {} +relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {} relay_states: dict[str, MqttRelayState] = {} @@ -88,30 +123,29 @@ def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) -if __name__ == '__main__': - devices = [] - mqtt = MqttWrapper(client_id='relay_mqtt_bot') - for device_id, data in config['relays'].items(): - mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret']) - relay_nodes[device_id] = mqtt_node.load_module('relay') - mqtt_node.add_payload_callback(on_mqtt_message) - mqtt.add_node(mqtt_node) +devices = [] +mqtt = MqttWrapper(client_id='relay_mqtt_bot') +for device_id, data in config['relays'].items(): + mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret']) + relay_nodes[device_id] = mqtt_node.load_module('relay') + mqtt_node.add_payload_callback(on_mqtt_message) + mqtt.add_node(mqtt_node) - labels = data['labels'] - bot.lang.ru(**{device_id: labels['ru']}) - bot.lang.en(**{device_id: labels['en']}) + labels = data['labels'] + bot.lang.ru(**{device_id: labels['ru']}) + bot.lang.en(**{device_id: labels['en']}) - type_emoji = type_emojis[data['type']] + type_emoji = type_emojis[data['type']] - for action in UserAction: - messages = [] - for _lang, _label in labels.items(): - messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}') - bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id)) + for action in UserAction: + messages = [] + for _lang, _label in labels.items(): + messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}') + bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id)) - mqtt.configure_tls() - mqtt.connect_and_loop(loop_forever=False) +mqtt.configure_tls() +mqtt.connect_and_loop(loop_forever=False) - bot.run(start_handler=start) +bot.run(start_handler=start) - mqtt.disconnect() +mqtt.disconnect() |