summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2023-06-09 00:13:08 +0300
committerEvgeny Zinoviev <me@ch1p.io>2023-06-09 00:13:08 +0300
commit2d6bb8278750ab0c79604a6c14726cd67bcf4cc2 (patch)
tree32a184d0f1d3cf4332e52b458080cbd81c783a33
parent0026ec62566f3d5fc9f3b6db86a0d3b5f8905dba (diff)
wip
-rw-r--r--src/home/telegram/config.py75
-rwxr-xr-xsrc/inverter_bot.py25
-rwxr-xr-xsrc/mqtt_node_util.py3
-rwxr-xr-xsrc/relay_mqtt_bot.py88
4 files changed, 148 insertions, 43 deletions
diff --git a/src/home/telegram/config.py b/src/home/telegram/config.py
new file mode 100644
index 0000000..8ca4c09
--- /dev/null
+++ b/src/home/telegram/config.py
@@ -0,0 +1,75 @@
+from ..config import ConfigUnit
+from typing import Optional, Union
+from abc import ABC
+from enum import Enum
+
+
+class TelegramUserListType(Enum):
+ USERS = 'users'
+ NOTIFY = 'notify_users'
+
+
+class TelegramUserIdsConfig(ConfigUnit):
+ NAME = 'telegram_user_ids'
+
+ @staticmethod
+ def schema() -> Optional[dict]:
+ return {
+ 'type': 'dict',
+ 'schema': {'type': 'int'}
+ }
+
+
+_user_ids_config = TelegramUserIdsConfig()
+
+
+def _user_id_mapper(user: Union[str, int]) -> int:
+ if isinstance(user, int):
+ return user
+ return _user_ids_config[user]
+
+
+class TelegramChatsConfig(ConfigUnit):
+ NAME = 'telegram_chats'
+
+ @staticmethod
+ def schema() -> Optional[dict]:
+ return {
+ 'type': 'dict',
+ 'schema': {
+ 'id': {'type': 'string', 'required': True},
+ 'token': {'type': 'string', 'required': True},
+ }
+ }
+
+
+class TelegramBotConfig(ConfigUnit, ABC):
+ @staticmethod
+ def schema() -> Optional[dict]:
+ return {
+ 'bot': {
+ 'type': 'dict',
+ 'schema': {
+ 'token': {'type': 'string', 'required': True},
+ TelegramUserListType.USERS: {**TelegramBotConfig._userlist_schema(), 'required': True},
+ TelegramUserListType.NOTIFY: TelegramBotConfig._userlist_schema(),
+ }
+ }
+ }
+
+ @staticmethod
+ def _userlist_schema() -> dict:
+ return {'type': 'list', 'schema': {'type': ['string', 'int']}}
+
+ @staticmethod
+ def custom_validator(data):
+ for ult in TelegramUserListType:
+ users = data['bot'][ult.value]
+ for user in users:
+ if isinstance(user, str):
+ if user not in _user_ids_config:
+ raise ValueError(f'user {user} not found in {TelegramUserIdsConfig.NAME}')
+
+ def get_user_ids(self,
+ ult: TelegramUserListType = TelegramUserListType.USERS) -> list[int]:
+ return list(map(_user_id_mapper, self['bot'][ult.value])) \ No newline at end of file
diff --git a/src/inverter_bot.py b/src/inverter_bot.py
index 8a9c363..ecf01fc 100755
--- a/src/inverter_bot.py
+++ b/src/inverter_bot.py
@@ -4,6 +4,7 @@ import re
import datetime
import json
import itertools
+import sys
from inverterd import Format, InverterError
from html import escape
@@ -12,6 +13,7 @@ from typing import Optional, Tuple, Union
from home.util import chunks
from home.config import config, AppConfigUnit
from home.telegram import bot
+from home.telegram.config import TelegramBotConfig, TelegramUserListType
from home.inverter import (
wrapper_instance as inverter,
beautify_table,
@@ -29,6 +31,12 @@ from home.api.types import BotType
from home.api import WebAPIClient
from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton
+
+if __name__ != '__main__':
+ print(f'this script can not be imported as module', file=sys.stderr)
+ sys.exit(1)
+
+
db = None
LT = escape('<=')
flags_map = {
@@ -44,16 +52,11 @@ flags_map = {
logger = logging.getLogger(__name__)
-class InverterBotConfig(AppConfigUnit):
+class InverterBotConfig(AppConfigUnit, TelegramBotConfig):
NAME = 'inverter_bot'
@staticmethod
def schema() -> Optional[dict]:
- userlist_schema = {
- 'type': 'list',
- 'empty': False,
- 'schema': {'type': 'integer'}
- }
acmode_item_schema = {
'thresholds': {
'type': 'list',
@@ -68,15 +71,7 @@ class InverterBotConfig(AppConfigUnit):
}
return {
- 'bot': {
- 'type': 'dict',
- 'required': True,
- 'schema': {
- 'token': {'type': 'string'},
- 'users': userlist_schema,
- 'notify_users': userlist_schema
- }
- },
+ **super(TelegramBotConfig).schema(),
'ac_mode': {
'type': 'dict',
'required': True,
diff --git a/src/mqtt_node_util.py b/src/mqtt_node_util.py
index 43830f9..e2ec838 100755
--- a/src/mqtt_node_util.py
+++ b/src/mqtt_node_util.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
import os.path
+
from time import sleep
from typing import Optional
from argparse import ArgumentParser, ArgumentError
@@ -22,7 +23,7 @@ if __name__ == '__main__':
parser.add_argument('--switch-relay', choices=[0, 1], type=int,
help='send relay state')
parser.add_argument('--push-ota', type=str, metavar='OTA_FILENAME',
- help='push ota, argument receives filename')
+ help='push OTA, receives path to firmware.bin')
config.load_app(parser=parser, no_config=True)
arg = parser.parse_args()
diff --git a/src/relay_mqtt_bot.py b/src/relay_mqtt_bot.py
index f6a1532..24ee6e1 100755
--- a/src/relay_mqtt_bot.py
+++ b/src/relay_mqtt_bot.py
@@ -1,17 +1,53 @@
#!/usr/bin/env python3
+import sys
+
from enum import Enum
-from typing import Optional
+from typing import Optional, Union
from telegram import ReplyKeyboardMarkup
from functools import partial
-from home.config import config
+from home.config import config, AppConfigUnit
from home.telegram import bot
-from home.mqtt import MqttPayload, MqttNode, MqttWrapper
+from home.telegram.config import TelegramBotConfig
+from home.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule
+from home.mqtt import MqttNodesConfig
from home.mqtt.module.relay import MqttRelayModule, MqttRelayState
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
-config.load_app('relay_mqtt_bot')
+if __name__ != '__main__':
+ print(f'this script can not be imported as module', file=sys.stderr)
+ sys.exit(1)
+
+
+mqtt_nodes_config = MqttNodesConfig()
+
+
+class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig):
+ NAME = 'relay_mqtt_bot'
+
+ @staticmethod
+ def schema() -> Optional[dict]:
+ return {
+ **super(TelegramBotConfig).schema(),
+ 'relay_nodes': {
+ 'type': 'list',
+ 'required': True,
+ 'schema': {
+ 'type': 'string'
+ }
+ },
+ }
+
+ @staticmethod
+ def custom_validator(data):
+ relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True)
+ for node in data['relay_nodes']:
+ if node not in relay_node_names:
+ raise ValueError(f'unknown relay node "{node}"')
+
+
+config.load_app(RelayMqttBotConfig)
bot.initialize()
bot.lang.ru(
@@ -35,9 +71,8 @@ status_emoji = {
}
-# mqtt_relay: Optional[MqttRelayModule] = None
mqtt: Optional[MqttWrapper] = None
-relay_nodes: dict[str, MqttRelayModule] = {}
+relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {}
relay_states: dict[str, MqttRelayState] = {}
@@ -88,30 +123,29 @@ def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
-if __name__ == '__main__':
- devices = []
- mqtt = MqttWrapper(client_id='relay_mqtt_bot')
- for device_id, data in config['relays'].items():
- mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret'])
- relay_nodes[device_id] = mqtt_node.load_module('relay')
- mqtt_node.add_payload_callback(on_mqtt_message)
- mqtt.add_node(mqtt_node)
+devices = []
+mqtt = MqttWrapper(client_id='relay_mqtt_bot')
+for device_id, data in config['relays'].items():
+ mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret'])
+ relay_nodes[device_id] = mqtt_node.load_module('relay')
+ mqtt_node.add_payload_callback(on_mqtt_message)
+ mqtt.add_node(mqtt_node)
- labels = data['labels']
- bot.lang.ru(**{device_id: labels['ru']})
- bot.lang.en(**{device_id: labels['en']})
+ labels = data['labels']
+ bot.lang.ru(**{device_id: labels['ru']})
+ bot.lang.en(**{device_id: labels['en']})
- type_emoji = type_emojis[data['type']]
+ type_emoji = type_emojis[data['type']]
- for action in UserAction:
- messages = []
- for _lang, _label in labels.items():
- messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
- bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
+ for action in UserAction:
+ messages = []
+ for _lang, _label in labels.items():
+ messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
+ bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
- mqtt.configure_tls()
- mqtt.connect_and_loop(loop_forever=False)
+mqtt.configure_tls()
+mqtt.connect_and_loop(loop_forever=False)
- bot.run(start_handler=start)
+bot.run(start_handler=start)
- mqtt.disconnect()
+mqtt.disconnect()