From 95ac1f0d6786d6f4331cfc8387ef816c1db24618 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 20 Feb 2024 00:56:00 +0300 Subject: comletely delete old lws, rewrite vk_sms_checker on python --- bin/vk_sms_checker.py | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100755 bin/vk_sms_checker.py (limited to 'bin') diff --git a/bin/vk_sms_checker.py b/bin/vk_sms_checker.py new file mode 100755 index 0000000..07d9953 --- /dev/null +++ b/bin/vk_sms_checker.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +import __py_include +import re + +from html import escape +from typing import Optional +from homekit.config import AppConfigUnit, config +from homekit.modem import ModemsConfig, E3372 +from homekit.database import MySQLHomeDatabase +from homekit.telegram import send_message + +db: Optional[MySQLHomeDatabase] = None + + +class VkSmsCheckerConfig(AppConfigUnit): + NAME = 'vk_sms_checker' + + @classmethod + def schema(cls) -> Optional[dict]: + return { + 'modem': {'type': 'string', 'required': True} + } + + @staticmethod + def custom_validator(data): + if data['modem'] not in ModemsConfig(): + raise ValueError('invalid modem') + + +def get_last_time() -> int: + cur = db.cursor() + cur.execute("SELECT last_message_time FROM vk_sms LIMIT 1") + return int(cur.fetchone()[0]) + + +def set_last_time(timestamp: int) -> None: + cur = db.cursor() + cur.execute("UPDATE vk_sms SET last_message_time=%s", (timestamp,)) + db.commit() + + +def check_sms(): + modem = ModemsConfig()[config.app_config['modem']] + cl = E3372(modem['ip'], legacy_token_auth=modem['legacy_auth']) + messages = cl.sms_list() + messages.reverse() + + last_time = get_last_time() + new_last_time = None + results = [] + + if not messages: + return + + for m in messages: + if m['UnixTime'] <= last_time: + continue + new_last_time = m['UnixTime'] + if re.match(r'^vk', m['Phone'], flags=re.IGNORECASE) or re.match(r'vk', m['Content'], flags=re.IGNORECASE): + results.append(m) + + if results: + for m in results: + text = ''+escape(m['Phone'])+' ('+m['Date']+')' + text += "\n"+escape(m['Content']) + send_message(text=text, chat='vk_sms_checker') + + if new_last_time: + set_last_time(new_last_time) + + +if __name__ == '__main__': + db = MySQLHomeDatabase() + config.load_app(VkSmsCheckerConfig) + check_sms() \ No newline at end of file -- cgit v1.2.3