From f3b9d50496257d87757802dfb472b5ffae11962c Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sat, 10 Jun 2023 22:44:31 +0300 Subject: new config: port openwrt_log_analyzer --- src/home/telegram/telegram.py | 28 +++++++++--------- src/home/util.py | 8 ++++++ src/openwrt_log_analyzer.py | 66 +++++++++++++++++++++++-------------------- 3 files changed, 59 insertions(+), 43 deletions(-) diff --git a/src/home/telegram/telegram.py b/src/home/telegram/telegram.py index 2f94f93..f42363e 100644 --- a/src/home/telegram/telegram.py +++ b/src/home/telegram/telegram.py @@ -2,25 +2,27 @@ import requests import logging from typing import Tuple -from ..config import config - +from .config import TelegramChatsConfig +_chats = TelegramChatsConfig() _logger = logging.getLogger(__name__) def send_message(text: str, - parse_mode: str = None, - disable_web_page_preview: bool = False): - data, token = _send_telegram_data(text, parse_mode, disable_web_page_preview) + chat: str, + parse_mode: str = 'HTML', + disable_web_page_preview: bool = False,): + data, token = _send_telegram_data(text, chat, parse_mode, disable_web_page_preview) req = requests.post('https://api.telegram.org/bot%s/sendMessage' % token, data=data) return req.json() -def send_photo(filename: str): +def send_photo(filename: str, chat: str): + chat_data = _chats[chat] data = { - 'chat_id': config['telegram']['chat_id'], + 'chat_id': chat_data['id'], } - token = config['telegram']['token'] + token = chat_data['token'] url = f'https://api.telegram.org/bot{token}/sendPhoto' with open(filename, "rb") as fd: @@ -29,19 +31,19 @@ def send_photo(filename: str): def _send_telegram_data(text: str, + chat: str, parse_mode: str = None, disable_web_page_preview: bool = False) -> Tuple[dict, str]: + chat_data = _chats[chat] data = { - 'chat_id': config['telegram']['chat_id'], + 'chat_id': chat_data['id'], 'text': text } if parse_mode is not None: data['parse_mode'] = parse_mode - elif 'parse_mode' in config['telegram']: - data['parse_mode'] = config['telegram']['parse_mode'] - if disable_web_page_preview or 'disable_web_page_preview' in config['telegram']: + if disable_web_page_preview: data['disable_web_page_preview'] = 1 - return data, config['telegram']['token'] + return data, chat_data['token'] diff --git a/src/home/util.py b/src/home/util.py index 1e12243..11e7116 100644 --- a/src/home/util.py +++ b/src/home/util.py @@ -36,6 +36,14 @@ def validate_ipv4_or_hostname(address: str, raise_exception: bool = False) -> bo return False +def validate_mac_address(mac_address: str) -> bool: + mac_pattern = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$' + if re.match(mac_pattern, mac_address): + return True + else: + return False + + class Addr: host: str port: Optional[int] diff --git a/src/openwrt_log_analyzer.py b/src/openwrt_log_analyzer.py index c1c4fbe..96023cd 100755 --- a/src/openwrt_log_analyzer.py +++ b/src/openwrt_log_analyzer.py @@ -1,33 +1,39 @@ #!/usr/bin/env python3 import home.telegram as telegram -from home.config import config +from home.telegram.config import TelegramChatsConfig +from home.util import validate_mac_address +from typing import Optional +from home.config import config, AppConfigUnit from home.database import BotsDatabase, SimpleState -""" -config.toml example: -[simple_state] -file = "/home/user/.config/openwrt_log_analyzer/state.txt" - -[mysql] -host = "localhost" -database = ".." -user = ".." -password = ".." - -[devices] -Device1 = "00:00:00:00:00:00" -Device2 = "01:01:01:01:01:01" - -[telegram] -chat_id = ".." -token = ".." -parse_mode = "HTML" - -[openwrt_log_analyzer] -limit = 10 -""" +class OpenwrtLogAnalyzerConfig(AppConfigUnit): + @classmethod + def schema(cls) -> Optional[dict]: + return { + 'database_name': {'type': 'string', 'required': True}, + 'devices': { + 'type': 'dict', + 'keysrules': {'type': 'string'}, + 'valuesrules': { + 'type': 'string', + 'check_with': validate_mac_address + } + }, + 'limit': {'type': 'integer'}, + 'telegram_chat': {'type': 'string'}, + 'aps': { + 'type': 'list', + 'schema': {'type': 'integer'} + } + } + + @staticmethod + def custom_validator(data): + chats = TelegramChatsConfig() + if data['telegram_chat'] not in chats: + return ValueError(f'unknown telegram chat {data["telegram_chat"]}') def main(mac: str, @@ -48,18 +54,18 @@ def main(mac: str, max_id = log.id text = '\n'.join(map(lambda s: str(s), data)) - telegram.send_message(f'{title} (AP #{ap})\n\n' + text) + telegram.send_message(f'{title} (AP #{ap})\n\n' + text, config.app_config['telegram_chat']) return max_id if __name__ == '__main__': - config.load_app('openwrt_log_analyzer') - for ap in config['openwrt_log_analyzer']['aps']: - state_file = config['simple_state']['file'] - state_file = state_file.replace('.txt', f'-{ap}.txt') + config.load_app(OpenwrtLogAnalyzerConfig) + for ap in config.app_config['aps']: + dbname = config.app_config['database_name'] + dbname = dbname.replace('.txt', f'-{ap}.txt') - state = SimpleState(name=state_file, + state = SimpleState(name=dbname, default={'last_id': 0}) max_last_id = 0 -- cgit v1.2.3