diff options
Diffstat (limited to 'src/home')
53 files changed, 3824 insertions, 0 deletions
diff --git a/src/home/__init__.py b/src/home/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/home/__init__.py diff --git a/src/home/api/__init__.py b/src/home/api/__init__.py new file mode 100644 index 0000000..782a61e --- /dev/null +++ b/src/home/api/__init__.py @@ -0,0 +1,11 @@ +import importlib + +__all__ = ['WebAPIClient', 'RequestParams'] + + +def __getattr__(name): + if name in __all__: + module = importlib.import_module(f'.web_api_client', __name__) + return getattr(module, name) + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/home/api/__init__.pyi b/src/home/api/__init__.pyi new file mode 100644 index 0000000..1b812d6 --- /dev/null +++ b/src/home/api/__init__.pyi @@ -0,0 +1,4 @@ +from .web_api_client import ( + RequestParams as RequestParams, + WebAPIClient as WebAPIClient +) diff --git a/src/home/api/errors/__init__.py b/src/home/api/errors/__init__.py new file mode 100644 index 0000000..efb06aa --- /dev/null +++ b/src/home/api/errors/__init__.py @@ -0,0 +1 @@ +from .api_response_error import ApiResponseError diff --git a/src/home/api/errors/api_response_error.py b/src/home/api/errors/api_response_error.py new file mode 100644 index 0000000..6910b2d --- /dev/null +++ b/src/home/api/errors/api_response_error.py @@ -0,0 +1,28 @@ +from typing import Optional + + +class ApiResponseError(Exception): + def __init__(self, + status_code: int, + error_type: str, + error_message: str, + error_stacktrace: Optional[list[str]] = None): + super().__init__() + self.status_code = status_code + self.error_message = error_message + self.error_type = error_type + self.error_stacktrace = error_stacktrace + + def __str__(self): + def st_formatter(line: str): + return f'Remote| {line}' + + s = f'{self.error_type}: {self.error_message} (HTTP {self.status_code})' + if self.error_stacktrace is not None: + st = [] + for st_line in self.error_stacktrace: + st.append('\n'.join(st_formatter(st_subline) for st_subline in st_line.split('\n'))) + s += '\nRemote stacktrace:\n' + s += '\n'.join(st) + + return s diff --git a/src/home/api/types/__init__.py b/src/home/api/types/__init__.py new file mode 100644 index 0000000..9f27ff6 --- /dev/null +++ b/src/home/api/types/__init__.py @@ -0,0 +1,6 @@ +from .types import ( + BotType, + TemperatureSensorDataType, + TemperatureSensorLocation, + SoundSensorLocation +) diff --git a/src/home/api/types/types.py b/src/home/api/types/types.py new file mode 100644 index 0000000..b6233e6 --- /dev/null +++ b/src/home/api/types/types.py @@ -0,0 +1,29 @@ +from enum import Enum, auto + + +class BotType(Enum): + INVERTER = auto() + PUMP = auto() + SENSORS = auto() + ADMIN = auto() + SOUND = auto() + + +class TemperatureSensorLocation(Enum): + BIG_HOUSE_1 = auto() + BIG_HOUSE_2 = auto() + STREET = auto() + DIANA = auto() + SPB1 = auto() + + +class TemperatureSensorDataType(Enum): + TEMPERATURE = auto() + RELATIVE_HUMIDITY = auto() + + +class SoundSensorLocation(Enum): + DIANA = auto() + BIG_HOUSE = auto() + SPB1 = auto() + diff --git a/src/home/api/web_api_client.py b/src/home/api/web_api_client.py new file mode 100644 index 0000000..e3b0988 --- /dev/null +++ b/src/home/api/web_api_client.py @@ -0,0 +1,210 @@ +import requests +import json +import threading +import logging + +from collections import namedtuple +from datetime import datetime +from enum import Enum, auto +from typing import Optional, Callable, Union +from requests.auth import HTTPBasicAuth + +from .errors import ApiResponseError +from .types import * +from ..config import config +from ..util import stringify +from ..sound import RecordFile, SoundNodeClient + +logger = logging.getLogger(__name__) + + +RequestParams = namedtuple('RequestParams', 'params, files, method') + + +class HTTPMethod(Enum): + GET = auto() + POST = auto() + + +class WebAPIClient: + token: str + timeout: Union[float, tuple[float, float]] + basic_auth: Optional[HTTPBasicAuth] + do_async: bool + async_error_handler: Optional[Callable] + async_success_handler: Optional[Callable] + + def __init__(self, timeout: Union[float, tuple[float, float]] = 5): + self.token = config['api']['token'] + self.timeout = timeout + self.basic_auth = None + self.do_async = False + self.async_error_handler = None + self.async_success_handler = None + + if 'basic_auth' in config['api']: + ba = config['api']['basic_auth'] + col = ba.index(':') + + user = ba[:col] + pw = ba[col+1:] + + logger.debug(f'enabling basic auth: {user}:{pw}') + self.basic_auth = HTTPBasicAuth(user, pw) + + # api methods + # ----------- + + def log_bot_request(self, + bot: BotType, + user_id: int, + message: str): + return self._post('logs/bot-request/', { + 'bot': bot.value, + 'user_id': str(user_id), + 'message': message + }) + + def log_openwrt(self, + lines: list[tuple[int, str]]): + return self._post('logs/openwrt', { + 'logs': stringify(lines) + }) + + def get_sensors_data(self, + sensor: TemperatureSensorLocation, + hours: int): + data = self._get('sensors/data/', { + 'sensor': sensor.value, + 'hours': hours + }) + return [(datetime.fromtimestamp(date), temp, hum) for date, temp, hum in data] + + def add_sound_sensor_hits(self, + hits: list[tuple[str, int]]): + return self._post('sound_sensors/hits/', { + 'hits': stringify(hits) + }) + + def get_sound_sensor_hits(self, + location: SoundSensorLocation, + after: datetime) -> list[dict]: + return self._process_sound_sensor_hits_data(self._get('sound_sensors/hits/', { + 'after': int(after.timestamp()), + 'location': location.value + })) + + def get_last_sound_sensor_hits(self, location: SoundSensorLocation, last: int): + return self._process_sound_sensor_hits_data(self._get('sound_sensors/hits/', { + 'last': last, + 'location': location.value + })) + + def recordings_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]: + files = self._get('recordings/list/', {'extended': int(extended)})['data'] + if as_objects: + return SoundNodeClient.record_list_from_serialized(files) + return files + + def _process_sound_sensor_hits_data(self, data: list[dict]) -> list[dict]: + for item in data: + item['time'] = datetime.fromtimestamp(item['time']) + return data + + # internal methods + # ---------------- + + def _get(self, *args, **kwargs): + return self._call(method=HTTPMethod.GET, *args, **kwargs) + + def _post(self, *args, **kwargs): + return self._call(method=HTTPMethod.POST, *args, **kwargs) + + def _call(self, + name: str, + params: dict, + method: HTTPMethod, + files: Optional[dict[str, str]] = None): + if not self.do_async: + return self._make_request(name, params, method, files) + else: + t = threading.Thread(target=self._make_request_in_thread, args=(name, params, method, files)) + t.start() + return None + + def _make_request(self, + name: str, + params: dict, + method: HTTPMethod = HTTPMethod.GET, + files: Optional[dict[str, str]] = None) -> Optional[any]: + domain = config['api']['host'] + kwargs = {} + + if self.basic_auth is not None: + kwargs['auth'] = self.basic_auth + + if method == HTTPMethod.GET: + if files: + raise RuntimeError('can\'t upload files using GET, please use me properly') + kwargs['params'] = params + f = requests.get + else: + kwargs['data'] = params + f = requests.post + + fd = {} + if files: + for fname, fpath in files.items(): + fd[fname] = open(fpath, 'rb') + kwargs['files'] = fd + + try: + r = f(f'https://{domain}/api/{name}', + headers={'X-Token': self.token}, + timeout=self.timeout, + **kwargs) + + if r.headers['content-type'] != 'application/json': + raise ApiResponseError(r.status_code, 'TypeError', 'content-type is not application/json') + + data = json.loads(r.text) + if r.status_code != 200 or data['result'] == 'error': + raise ApiResponseError(r.status_code, + data['error']['type'], + data['error']['message'], + data['error']['stacktrace'] if 'stacktrace' in data['error'] else None) + + return data['data'] if 'data' in data else True + finally: + for fname, f in fd.items(): + # logger.debug(f'closing file {fname} (fd={f})') + try: + f.close() + except Exception as exc: + logger.exception(exc) + pass + + def _make_request_in_thread(self, name, params, method, files): + try: + result = self._make_request(name, params, method, files) + self._report_async_success(result, name, RequestParams(params=params, method=method, files=files)) + except Exception as e: + logger.exception(e) + self._report_async_error(e, name, RequestParams(params=params, method=method, files=files)) + + def enable_async(self, + success_handler: Optional[Callable] = None, + error_handler: Optional[Callable] = None): + self.do_async = True + if error_handler: + self.async_error_handler = error_handler + if success_handler: + self.async_success_handler = success_handler + + def _report_async_error(self, *args): + if self.async_error_handler: + self.async_error_handler(*args) + + def _report_async_success(self, *args): + if self.async_success_handler: + self.async_success_handler(*args)
\ No newline at end of file diff --git a/src/home/bot/__init__.py b/src/home/bot/__init__.py new file mode 100644 index 0000000..5e68af7 --- /dev/null +++ b/src/home/bot/__init__.py @@ -0,0 +1,6 @@ +from .reporting import ReportingHelper +from .lang import LangPack +from .wrapper import Wrapper, Context, text_filter +from .store import Store +from .errors import * +from .util import command_usage, user_any_name
\ No newline at end of file diff --git a/src/home/bot/errors.py b/src/home/bot/errors.py new file mode 100644 index 0000000..74eee6f --- /dev/null +++ b/src/home/bot/errors.py @@ -0,0 +1,2 @@ +class StoreNotEnabledError(Exception): + pass
\ No newline at end of file diff --git a/src/home/bot/lang.py b/src/home/bot/lang.py new file mode 100644 index 0000000..2f10358 --- /dev/null +++ b/src/home/bot/lang.py @@ -0,0 +1,76 @@ +import logging + +from typing import Union, Optional + +logger = logging.getLogger(__name__) + + +class LangStrings(dict): + _lang: Optional[str] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._lang = None + + def setlang(self, lang: str): + self._lang = lang + + def __missing__(self, key): + logger.warning(f'key {key} is missing in language {self._lang}') + return '{%s}' % key + + def __setitem__(self, key, value): + raise NotImplementedError(f'setting translation strings this way is prohibited (was trying to set {key}={value})') + + +class LangPack: + strings: dict[str, LangStrings[str, str]] + default_lang: str + + def __init__(self): + self.strings = {} + self.default_lang = 'en' + + def ru(self, **kwargs) -> None: + self.set(kwargs, 'ru') + + def en(self, **kwargs) -> None: + self.set(kwargs, 'en') + + def set(self, + strings: Union[LangStrings, dict], + lang: str) -> None: + + if isinstance(strings, dict) and not isinstance(strings, LangStrings): + strings = LangStrings(**strings) + strings.setlang(lang) + + if lang not in self.strings: + self.strings[lang] = strings + else: + self.strings[lang].update(strings) + + def all(self, key): + result = [] + for strings in self.strings.values(): + result.append(strings[key]) + return result + + @property + def languages(self) -> list[str]: + return list(self.strings.keys()) + + def get(self, key: str, lang: str, *args) -> str: + return self.strings[lang][key] % args + + def __call__(self, *args, **kwargs): + return self.strings[self.default_lang][args[0]] + + def __getitem__(self, key): + return self.strings[self.default_lang][key] + + def __setitem__(self, key, value): + raise NotImplementedError('setting translation strings this way is prohibited') + + def __contains__(self, key): + return key in self.strings[self.default_lang] diff --git a/src/home/bot/reporting.py b/src/home/bot/reporting.py new file mode 100644 index 0000000..df3da2a --- /dev/null +++ b/src/home/bot/reporting.py @@ -0,0 +1,22 @@ +import logging + +from telegram import Message +from ..api import WebAPIClient as APIClient +from ..api.errors import ApiResponseError +from ..api.types import BotType + +logger = logging.getLogger(__name__) + + +class ReportingHelper: + def __init__(self, client: APIClient, bot_type: BotType): + self.client = client + self.bot_type = bot_type + + def report(self, message, text: str = None) -> None: + if text is None: + text = message.text + try: + self.client.log_bot_request(self.bot_type, message.chat_id, text) + except ApiResponseError as error: + logger.exception(error) diff --git a/src/home/bot/store.py b/src/home/bot/store.py new file mode 100644 index 0000000..aeedc47 --- /dev/null +++ b/src/home/bot/store.py @@ -0,0 +1,80 @@ +import sqlite3 +import os.path +import logging + +from ..config import config + +logger = logging.getLogger(__name__) + + +def _get_database_path() -> str: + return os.path.join(os.environ['HOME'], '.config', config.app_name, 'bot.db') + + +class Store: + SCHEMA_VERSION = 1 + + def __init__(self): + self.sqlite = sqlite3.connect(_get_database_path(), check_same_thread=False) + + sqlite_version = self._get_sqlite_version() + logger.info(f'SQLite version: {sqlite_version}') + + schema_version = self._get_schema_version() + logger.info(f'Schema version: {schema_version}') + + if schema_version < 1: + self._database_init() + elif schema_version < Store.SCHEMA_VERSION: + self._database_upgrade(Store.SCHEMA_VERSION) + + def __del__(self): + if self.sqlite: + self.sqlite.commit() + self.sqlite.close() + + def _get_sqlite_version(self) -> str: + cursor = self.sqlite.cursor() + cursor.execute("SELECT sqlite_version()") + + return cursor.fetchone()[0] + + def _get_schema_version(self) -> int: + cursor = self.sqlite.execute('PRAGMA user_version') + return int(cursor.fetchone()[0]) + + def _set_schema_version(self, v) -> None: + self.sqlite.execute('PRAGMA user_version={:d}'.format(v)) + logger.info(f'Schema set to {v}') + + def _database_init(self) -> None: + cursor = self.sqlite.cursor() + cursor.execute("""CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY, + lang TEXT NOT NULL + )""") + self.sqlite.commit() + self._set_schema_version(1) + + def _database_upgrade(self, version: int) -> None: + # do the upgrade here + + # self.sqlite.commit() + self._set_schema_version(version) + + def get_user_lang(self, user_id: int, default: str = 'en') -> str: + cursor = self.sqlite.cursor() + cursor.execute('SELECT lang FROM users WHERE id=?', (user_id,)) + row = cursor.fetchone() + + if row is None: + cursor.execute('INSERT INTO users (id, lang) VALUES (?, ?)', (user_id, default)) + self.sqlite.commit() + return default + else: + return row[0] + + def set_user_lang(self, user_id: int, lang: str) -> None: + cursor = self.sqlite.cursor() + cursor.execute('UPDATE users SET lang=? WHERE id=?', (lang, user_id)) + self.sqlite.commit()
\ No newline at end of file diff --git a/src/home/bot/util.py b/src/home/bot/util.py new file mode 100644 index 0000000..4f80a67 --- /dev/null +++ b/src/home/bot/util.py @@ -0,0 +1,57 @@ +from telegram import User +from .lang import LangStrings + +_strings = { + 'en': LangStrings( + usage='Usage', + arguments='Arguments' + ), + 'ru': LangStrings( + usage='Использование', + arguments='Аргументы' + ) +} + + +def command_usage(command: str, arguments: dict, language='en') -> str: + if language not in _strings: + raise ValueError('unsupported language') + + blocks = [] + argument_names = [] + argument_lines = [] + for k, v in arguments.items(): + argument_names.append(k) + argument_lines.append( + f'<code>{k}</code>: {v}' + ) + + command = f'/{command}' + if argument_names: + command += ' ' + ' '.join(argument_names) + + blocks.append( + f'<b>{_strings[language]["usage"]}</b>\n' + f'<code>{command}</code>' + ) + + if argument_lines: + blocks.append( + f'<b>{_strings[language]["arguments"]}</b>\n' + '\n'.join(argument_lines) + ) + + return '\n\n'.join(blocks) + + +def user_any_name(user: User) -> str: + name = [user.first_name, user.last_name] + name = list(filter(lambda s: s is not None, name)) + name = ' '.join(name).strip() + + if not name: + name = user.username + + if not name: + name = str(user.id) + + return name diff --git a/src/home/bot/wrapper.py b/src/home/bot/wrapper.py new file mode 100644 index 0000000..8651e90 --- /dev/null +++ b/src/home/bot/wrapper.py @@ -0,0 +1,339 @@ +import logging +import traceback + +from html import escape +from telegram import ( + Update, + ParseMode, + ReplyKeyboardMarkup, + CallbackQuery, + User, +) +from telegram.ext import ( + Updater, + Filters, + BaseFilter, + Handler, + CommandHandler, + MessageHandler, + CallbackQueryHandler, + CallbackContext, + ConversationHandler +) +from telegram.error import TimedOut +from ..config import config +from typing import Optional, Union +from .store import Store +from .lang import LangPack +from ..api.types import BotType +from ..api import WebAPIClient +from .reporting import ReportingHelper + +logger = logging.getLogger(__name__) +languages = { + 'en': 'English', + 'ru': 'Русский' +} +LANG_STARTED = range(1) +user_filter: Optional[BaseFilter] = None + + +def default_langpack() -> LangPack: + lang = LangPack() + lang.en( + start_message="Select command on the keyboard.", + unknown_message="Unknown message", + cancel="Cancel", + select_language="Select language on the keyboard.", + invalid_language="Invalid language. Please try again.", + language_saved='Saved.', + ) + lang.ru( + start_message="Выберите команду на клавиатуре.", + unknown_message="Неизвестная команда", + cancel="Отмена", + select_language="Выберите язык на клавиатуре.", + invalid_language="Неверный язык. Пожалуйста, попробуйте снова", + language_saved="Настройки сохранены." + ) + return lang + + +def init_user_filter(): + global user_filter + if user_filter is None: + if 'users' in config['bot']: + logger.info('allowed users: ' + str(config['bot']['users'])) + user_filter = Filters.user(config['bot']['users']) + else: + user_filter = Filters.all # not sure if this is correct + + +def text_filter(*args): + init_user_filter() + return Filters.text(args[0] if isinstance(args[0], list) else [*args]) & user_filter + + +def exc2text(e: Exception) -> str: + tb = ''.join(traceback.format_tb(e.__traceback__)) + return f'{e.__class__.__name__}: ' + escape(str(e)) + "\n\n" + escape(tb) + + +class IgnoreMarkup: + pass + + +class Context: + _update: Optional[Update] + _callback_context: Optional[CallbackContext] + _markup_getter: callable + _lang: LangPack + _store: Optional[Store] + _user_lang: Optional[str] + + def __init__(self, + update: Optional[Update], + callback_context: Optional[CallbackContext], + markup_getter: callable, + lang: LangPack, + store: Optional[Store]): + self._update = update + self._callback_context = callback_context + self._markup_getter = markup_getter + self._lang = lang + self._store = store + self._user_lang = None + + def reply(self, text, markup=None): + if markup is None: + markup = self._markup_getter(self) + kwargs = dict(parse_mode=ParseMode.HTML) + if not isinstance(markup, IgnoreMarkup): + kwargs['reply_markup'] = markup + self._update.message.reply_text(text, **kwargs) + + def reply_exc(self, e: Exception) -> None: + self.reply(exc2text(e)) + + def answer(self, text: str = None): + self.callback_query.answer(text) + + def edit(self, text, markup=None): + kwargs = dict(parse_mode=ParseMode.HTML) + if not isinstance(markup, IgnoreMarkup): + kwargs['reply_markup'] = markup + self.callback_query.edit_message_text(text, **kwargs) + + @property + def text(self) -> str: + return self._update.message.text + + @property + def callback_query(self) -> CallbackQuery: + return self._update.callback_query + + @property + def args(self) -> Optional[list[str]]: + return self._callback_context.args + + @property + def user_id(self) -> int: + return self.user.id + + @property + def user(self) -> User: + return self._update.effective_user + + @property + def user_lang(self) -> str: + if self._user_lang is None: + self._user_lang = self._store.get_user_lang(self.user_id) + return self._user_lang + + def lang(self, key: str, *args) -> str: + return self._lang.get(key, self.user_lang, *args) + + def is_callback_context(self) -> bool: + return self._update.callback_query and self._update.callback_query.data and self._update.callback_query.data != '' + + +class Wrapper: + store: Optional[Store] + updater: Updater + lang: LangPack + reporting: Optional[ReportingHelper] + + def __init__(self): + self.updater = Updater(config['bot']['token'], + request_kwargs={'read_timeout': 6, 'connect_timeout': 7}) + self.lang = default_langpack() + self.store = Store() + self.reporting = None + + init_user_filter() + + dispatcher = self.updater.dispatcher + dispatcher.add_handler(CommandHandler('start', self.wrap(self.start), user_filter)) + + # transparently log all messages + self.add_handler(MessageHandler(Filters.all & user_filter, self.logging_message_handler), group=10) + self.add_handler(CallbackQueryHandler(self.logging_callback_handler), group=10) + + def run(self): + self._lang_setup() + self.updater.dispatcher.add_handler( + MessageHandler(Filters.all & user_filter, self.wrap(self.any)) + ) + + # start the bot + self.updater.start_polling() + + # run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT + self.updater.idle() + + def enable_logging(self, bot_type: BotType): + api = WebAPIClient(timeout=3) + api.enable_async() + + self.reporting = ReportingHelper(api, bot_type) + + def logging_message_handler(self, update: Update, context: CallbackContext): + if self.reporting is None: + return + + self.reporting.report(update.message) + + def logging_callback_handler(self, update: Update, context: CallbackContext): + if self.reporting is None: + return + + self.reporting.report(update.callback_query.message, text=update.callback_query.data) + + def wrap(self, f: callable): + def handler(update: Update, context: CallbackContext): + ctx = Context(update, + callback_context=context, + markup_getter=self.markup, + lang=self.lang, + store=self.store) + + try: + return f(ctx) + except Exception as e: + if not self.exception_handler(e, ctx) and not isinstance(e, TimedOut): + logger.exception(e) + if not ctx.is_callback_context(): + ctx.reply_exc(e) + else: + self.notify_user(ctx.user_id, exc2text(e)) + + return handler + + def add_handler(self, handler: Handler, group=0): + self.updater.dispatcher.add_handler(handler, group=group) + + def start(self, ctx: Context): + if 'start_message' not in self.lang: + ctx.reply('Please define start_message or override start()') + return + + ctx.reply(ctx.lang('start_message')) + + def any(self, ctx: Context): + if 'invalid_command' not in self.lang: + ctx.reply('Please define invalid_command or override any()') + return + + ctx.reply(ctx.lang('invalid_command')) + + def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]: + return None + + def exception_handler(self, e: Exception, ctx: Context) -> Optional[bool]: + pass + + def notify_all(self, text_getter: callable, exclude: tuple[int] = ()) -> None: + if 'notify_users' not in config['bot']: + logger.error('notify_all() called but no notify_users directive found in the config') + return + + for user_id in config['bot']['notify_users']: + if user_id in exclude: + continue + + text = text_getter(self.store.get_user_lang(user_id)) + self.updater.bot.send_message(chat_id=user_id, + text=text, + parse_mode='HTML') + + def notify_user(self, user_id: int, text: Union[str, Exception]) -> None: + if isinstance(text, Exception): + text = exc2text(text) + self.updater.bot.send_message(chat_id=user_id, text=text, parse_mode='HTML') + + def send_audio(self, user_id, **kwargs): + self.updater.bot.send_audio(chat_id=user_id, **kwargs) + + def send_file(self, user_id, **kwargs): + self.updater.bot.send_document(chat_id=user_id, **kwargs) + + # + # Language Selection + # + + def _lang_setup(self): + supported = self.lang.languages + if len(supported) > 1: + cancel_filter = Filters.text(self.lang.all('cancel')) + + self.add_handler(ConversationHandler( + entry_points=[CommandHandler('lang', self.wrap(self._lang_command), user_filter)], + states={ + LANG_STARTED: [ + *list(map(lambda key: MessageHandler(text_filter(languages[key]), + self.wrap(self._lang_input)), supported)), + MessageHandler(user_filter & ~cancel_filter, self.wrap(self._lang_invalid_input)) + ] + }, + fallbacks=[MessageHandler(user_filter & cancel_filter, self.wrap(self._lang_cancel_input))] + )) + + def _lang_command(self, ctx: Context): + logger.debug(f'current language: {ctx.user_lang}') + + buttons = [] + for name in languages.values(): + buttons.append(name) + markup = ReplyKeyboardMarkup([buttons, [ctx.lang('cancel')]], one_time_keyboard=False) + + ctx.reply(ctx.lang('select_language'), markup=markup) + return LANG_STARTED + + def _lang_input(self, ctx: Context): + lang = None + for key, value in languages.items(): + if value == ctx.text: + lang = key + break + + if lang is None: + ValueError('could not find the language') + + self.store.set_user_lang(ctx.user_id, lang) + + ctx.reply(ctx.lang('language_saved'), markup=IgnoreMarkup()) + + self.start(ctx) + return ConversationHandler.END + + def _lang_invalid_input(self, ctx: Context): + ctx.reply(self.lang('invalid_language'), markup=IgnoreMarkup()) + return LANG_STARTED + + def _lang_cancel_input(self, ctx: Context): + self.start(ctx) + return ConversationHandler.END + + @property + def user_filter(self): + return user_filter diff --git a/src/home/config/__init__.py b/src/home/config/__init__.py new file mode 100644 index 0000000..d4b1c27 --- /dev/null +++ b/src/home/config/__init__.py @@ -0,0 +1 @@ +from .config import ConfigStore, config, is_development_mode diff --git a/src/home/config/config.py b/src/home/config/config.py new file mode 100644 index 0000000..75cfc3a --- /dev/null +++ b/src/home/config/config.py @@ -0,0 +1,110 @@ +import toml +import logging +import os + +from os.path import join, isdir, isfile +from typing import Optional, Any, MutableMapping +from argparse import ArgumentParser + + +def _get_config_path(name: str) -> str: + dirname = join(os.environ['HOME'], '.config', name) + filename = join(os.environ['HOME'], '.config', f'{name}.toml') + if isdir(dirname): + return join(dirname, 'config.toml') + elif isfile(filename): + return filename + else: + raise IOError(f'configuration file not found (tried {dirname}/config.toml and {filename})') + + +class ConfigStore: + data: MutableMapping[str, Any] + app_name: Optional[str] + + def __int__(self): + self.data = {} + self.app_name = None + + def load(self, name: Optional[str] = None, + use_cli=True, + parser: ArgumentParser = None): + self.app_name = name + + if (name is None) and (not use_cli): + raise RuntimeError('either config name must be none or use_cli must be True') + + log_default_fmt = False + log_file = None + log_verbose = False + + path = None + if use_cli: + if parser is None: + parser = ArgumentParser() + parser.add_argument('--config', type=str, required=name is None, + help='Path to the config in TOML format') + parser.add_argument('--verbose', action='store_true') + parser.add_argument('--log-file', type=str) + parser.add_argument('--log-default-fmt', action='store_true') + args = parser.parse_args() + + if args.config: + path = args.config + if args.verbose: + log_verbose = True + if args.log_file: + log_file = args.log_file + if args.log_default_fmt: + log_default_fmt = args.log_default_fmt + + if name and path is None: + path = _get_config_path(name) + + self.data = toml.load(path) + + if 'logging' in self: + if not log_file and 'file' in self['logging']: + log_file = self['logging']['file'] + if log_default_fmt and 'default_fmt' in self['logging']: + log_default_fmt = self['logging']['default_fmt'] + + setup_logging(log_verbose, log_file, log_default_fmt) + + if use_cli: + return args + + def __getitem__(self, key): + return self.data[key] + + def __setitem__(self, key, value): + raise NotImplementedError('overwriting config values is prohibited') + + def __contains__(self, key): + return key in self.data + + +config = ConfigStore() + + +def is_development_mode() -> bool: + if 'FLASK_ENV' in os.environ and os.environ['FLASK_ENV'] == 'development': + return True + + return ('logging' in config) and ('verbose' in config['logging']) and (config['logging']['verbose'] is True) + + +def setup_logging(verbose=False, log_file=None, default_fmt=False): + logging_level = logging.INFO + if is_development_mode() or verbose: + logging_level = logging.DEBUG + + log_config = {'level': logging_level} + if not default_fmt: + log_config['format'] = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + + if log_file is not None: + log_config['filename'] = log_file + log_config['encoding'] = 'utf-8' + + logging.basicConfig(**log_config) diff --git a/src/home/database/__init__.py b/src/home/database/__init__.py new file mode 100644 index 0000000..b50cbce --- /dev/null +++ b/src/home/database/__init__.py @@ -0,0 +1,29 @@ +import importlib + +__all__ = [ + 'get_mysql', + 'mysql_now', + 'get_clickhouse', + 'SimpleState', + + 'SensorsDatabase', + 'InverterDatabase', + 'BotsDatabase' +] + + +def __getattr__(name: str): + if name in __all__: + if name.endswith('Database'): + file = name[:-8].lower() + elif 'mysql' in name: + file = 'mysql' + elif 'clickhouse' in name: + file = 'clickhouse' + else: + file = 'simple_state' + + module = importlib.import_module(f'.{file}', __name__) + return getattr(module, name) + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/home/database/__init__.pyi b/src/home/database/__init__.pyi new file mode 100644 index 0000000..31aae5d --- /dev/null +++ b/src/home/database/__init__.pyi @@ -0,0 +1,11 @@ +from .mysql import ( + get_mysql as get_mysql, + mysql_now as mysql_now +) +from .clickhouse import get_clickhouse as get_clickhouse + +from simple_state import SimpleState as SimpleState + +from .sensors import SensorsDatabase as SensorsDatabase +from .inverter import InverterDatabase as InverterDatabase +from .bots import BotsDatabase as BotsDatabase diff --git a/src/home/database/bots.py b/src/home/database/bots.py new file mode 100644 index 0000000..bc490e1 --- /dev/null +++ b/src/home/database/bots.py @@ -0,0 +1,104 @@ +import pytz + +from .mysql import mysql_now, MySQLDatabase, datetime_fmt +from ..api.types import ( + BotType, + SoundSensorLocation +) +from typing import Optional +from datetime import datetime +from html import escape + + +class OpenwrtLogRecord: + id: int + log_time: datetime + received_time: datetime + text: str + + def __init__(self, id, text, log_time, received_time): + self.id = id + self.text = text + self.log_time = log_time + self.received_time = received_time + + def __repr__(self): + return f"<b>{self.log_time.strftime('%H:%M:%S')}</b> {escape(self.text)}" + + +class BotsDatabase(MySQLDatabase): + def add_request(self, + bot: BotType, + user_id: int, + message: str): + with self.cursor() as cursor: + cursor.execute("INSERT INTO requests_log (user_id, message, bot, time) VALUES (%s, %s, %s, %s)", + (user_id, message, bot.name.lower(), mysql_now())) + self.commit() + + def add_openwrt_logs(self, + lines: list[tuple[datetime, str]]): + now = datetime.now() + with self.cursor() as cursor: + for line in lines: + time, text = line + cursor.execute("INSERT INTO openwrt (log_time, received_time, text) VALUES (%s, %s, %s)", + (time.strftime(datetime_fmt), now.strftime(datetime_fmt), text)) + self.commit() + + def add_sound_hits(self, + hits: list[tuple[SoundSensorLocation, int]], + time: datetime): + with self.cursor() as cursor: + for loc, count in hits: + cursor.execute("INSERT INTO sound_hits (location, `time`, hits) VALUES (%s, %s, %s)", + (loc.name.lower(), time.strftime(datetime_fmt), count)) + self.commit() + + def get_sound_hits(self, + location: SoundSensorLocation, + after: Optional[datetime] = None, + last: Optional[int] = None) -> list[dict]: + with self.cursor(dictionary=True) as cursor: + sql = "SELECT `time`, hits FROM sound_hits WHERE location=%s" + args = [location.name.lower()] + + if after: + sql += ' AND `time` >= %s ORDER BY time DESC' + args.append(after) + elif last: + sql += ' ORDER BY time DESC LIMIT 0, %s' + args.append(last) + else: + raise ValueError('no `after`, no `last`, what do you expect?') + + cursor.execute(sql, tuple(args)) + data = [] + for row in cursor.fetchall(): + data.append({ + 'time': row['time'], + 'hits': row['hits'] + }) + return data + + def get_openwrt_logs(self, + filter_text: str, + min_id: int, + limit: int = None) -> list[OpenwrtLogRecord]: + tz = pytz.timezone('Europe/Moscow') + with self.cursor(dictionary=True) as cursor: + sql = "SELECT * FROM openwrt WHERE text LIKE %s AND id > %s" + if limit is not None: + sql += f" LIMIT {limit}" + + cursor.execute(sql, (f'%{filter_text}%', min_id)) + data = [] + for row in cursor.fetchall(): + data.append(OpenwrtLogRecord( + id=int(row['id']), + text=row['text'], + log_time=row['log_time'].astimezone(tz), + received_time=row['received_time'].astimezone(tz) + )) + + return data diff --git a/src/home/database/clickhouse.py b/src/home/database/clickhouse.py new file mode 100644 index 0000000..4a2a247 --- /dev/null +++ b/src/home/database/clickhouse.py @@ -0,0 +1,10 @@ +from clickhouse_driver import Client as ClickhouseClient + +_links = {} + + +def get_clickhouse(db: str) -> ClickhouseClient: + if db not in _links: + _links[db] = ClickhouseClient.from_url(f'clickhouse://localhost/{db}') + + return _links[db] diff --git a/src/home/database/inverter.py b/src/home/database/inverter.py new file mode 100644 index 0000000..8902f04 --- /dev/null +++ b/src/home/database/inverter.py @@ -0,0 +1,102 @@ +from .clickhouse import get_clickhouse +from time import time + + +class InverterDatabase: + def __init__(self): + self.db = get_clickhouse('solarmon') + + def add_generation(self, home_id: int, client_time: int, watts: int) -> None: + self.db.execute( + 'INSERT INTO generation (ClientTime, ReceivedTime, HomeID, Watts) VALUES', + [[client_time, round(time()), home_id, watts]] + ) + + def add_status(self, home_id: int, + client_time: int, + grid_voltage: int, + grid_freq: int, + ac_output_voltage: int, + ac_output_freq: int, + ac_output_apparent_power: int, + ac_output_active_power: int, + output_load_percent: int, + battery_voltage: int, + battery_voltage_scc: int, + battery_voltage_scc2: int, + battery_discharging_current: int, + battery_charging_current: int, + battery_capacity: int, + inverter_heat_sink_temp: int, + mppt1_charger_temp: int, + mppt2_charger_temp: int, + pv1_input_power: int, + pv2_input_power: int, + pv1_input_voltage: int, + pv2_input_voltage: int, + mppt1_charger_status: int, + mppt2_charger_status: int, + battery_power_direction: int, + dc_ac_power_direction: int, + line_power_direction: int, + load_connected: int) -> None: + self.db.execute("""INSERT INTO status ( + ClientTime, + ReceivedTime, + HomeID, + GridVoltage, + GridFrequency, + ACOutputVoltage, + ACOutputFrequency, + ACOutputApparentPower, + ACOutputActivePower, + OutputLoadPercent, + BatteryVoltage, + BatteryVoltageSCC, + BatteryVoltageSCC2, + BatteryDischargingCurrent, + BatteryChargingCurrent, + BatteryCapacity, + HeatSinkTemp, + MPPT1ChargerTemp, + MPPT2ChargerTemp, + PV1InputPower, + PV2InputPower, + PV1InputVoltage, + PV2InputVoltage, + MPPT1ChargerStatus, + MPPT2ChargerStatus, + BatteryPowerDirection, + DCACPowerDirection, + LinePowerDirection, + LoadConnected) VALUES""", [[ + client_time, + round(time()), + home_id, + grid_voltage, + grid_freq, + ac_output_voltage, + ac_output_freq, + ac_output_apparent_power, + ac_output_active_power, + output_load_percent, + battery_voltage, + battery_voltage_scc, + battery_voltage_scc2, + battery_discharging_current, + battery_charging_current, + battery_capacity, + inverter_heat_sink_temp, + mppt1_charger_temp, + mppt2_charger_temp, + pv1_input_power, + pv2_input_power, + pv1_input_voltage, + pv2_input_voltage, + mppt1_charger_status, + mppt2_charger_status, + battery_power_direction, + dc_ac_power_direction, + line_power_direction, + load_connected + ]]) diff --git a/src/home/database/mysql.py b/src/home/database/mysql.py new file mode 100644 index 0000000..fe97cd4 --- /dev/null +++ b/src/home/database/mysql.py @@ -0,0 +1,47 @@ +import time +import logging + +from mysql.connector import connect, MySQLConnection, Error +from typing import Optional +from ..config import config + +link: Optional[MySQLConnection] = None +logger = logging.getLogger(__name__) + +datetime_fmt = '%Y-%m-%d %H:%M:%S' + + +def get_mysql() -> MySQLConnection: + global link + + if link is not None: + return link + + link = connect( + host=config['mysql']['host'], + user=config['mysql']['user'], + password=config['mysql']['password'], + database=config['mysql']['database'], + ) + link.time_zone = '+01:00' + return link + + +def mysql_now() -> str: + return time.strftime('%Y-%m-%d %H:%M:%S') + + +class MySQLDatabase: + def __init__(self): + self.db = get_mysql() + + def cursor(self, **kwargs): + try: + self.db.ping(reconnect=True, attempts=2) + except Error as e: + logger.exception(e) + self.db = get_mysql() + return self.db.cursor(**kwargs) + + def commit(self): + self.db.commit() diff --git a/src/home/database/sensors.py b/src/home/database/sensors.py new file mode 100644 index 0000000..ca53dd0 --- /dev/null +++ b/src/home/database/sensors.py @@ -0,0 +1,66 @@ +from time import time +from datetime import datetime +from typing import Tuple, List +from .clickhouse import get_clickhouse +from ..api.types import TemperatureSensorLocation + + +def get_temperature_table(sensor: TemperatureSensorLocation) -> str: + if sensor == TemperatureSensorLocation.DIANA: + return 'temp_diana' + + elif sensor == TemperatureSensorLocation.STREET: + return 'temp_street' + + elif sensor == TemperatureSensorLocation.BIG_HOUSE_1: + return 'temp' + + elif sensor == TemperatureSensorLocation.BIG_HOUSE_2: + return 'temp_roof' + + elif sensor == TemperatureSensorLocation.SPB1: + return 'temp_spb1' + + +class SensorsDatabase: + def __init__(self): + self.db = get_clickhouse('home') + + def add_temperature(self, + home_id: int, + client_time: int, + sensor: TemperatureSensorLocation, + temp: int, + rh: int): + table = get_temperature_table(sensor) + sql = """INSERT INTO """ + table + """ ( + ClientTime, + ReceivedTime, + HomeID, + Temperature, + RelativeHumidity + ) VALUES""" + self.db.execute(sql, [[ + client_time, + int(time()), + home_id, + temp, + rh + ]]) + + def get_temperature_recordings(self, + sensor: TemperatureSensorLocation, + time_range: Tuple[datetime, datetime], + home_id=1) -> List[tuple]: + table = get_temperature_table(sensor) + sql = f"""SELECT ClientTime, Temperature, RelativeHumidity + FROM {table} + WHERE ClientTime >= %(from)s AND ClientTime <= %(to)s + ORDER BY ClientTime""" + dt_from, dt_to = time_range + + data = self.db.execute(sql, { + 'from': dt_from, + 'to': dt_to + }) + return [(date, temp/100, humidity/100) for date, temp, humidity in data] diff --git a/src/home/database/simple_state.py b/src/home/database/simple_state.py new file mode 100644 index 0000000..cada9c8 --- /dev/null +++ b/src/home/database/simple_state.py @@ -0,0 +1,46 @@ +import os +import json +import atexit + + +class SimpleState: + def __init__(self, + file: str, + default: dict = None, + **kwargs): + if default is None: + default = {} + elif type(default) is not dict: + raise TypeError('default must be dictionary') + + if not os.path.exists(file): + self._data = default + else: + with open(file, 'r') as f: + self._data = json.loads(f.read()) + + self._file = file + atexit.register(self.__cleanup) + + def __cleanup(self): + if hasattr(self, '_file'): + with open(self._file, 'w') as f: + f.write(json.dumps(self._data)) + atexit.unregister(self.__cleanup) + + def __del__(self): + if 'open' in __builtins__: + self.__cleanup() + + def __getitem__(self, key): + return self._data[key] + + def __setitem__(self, key, value): + self._data[key] = value + + def __contains__(self, key): + return key in self._data + + def __delitem__(self, key): + if key in self._data: + del self._data[key] diff --git a/src/home/inverter/__init__.py b/src/home/inverter/__init__.py new file mode 100644 index 0000000..b184580 --- /dev/null +++ b/src/home/inverter/__init__.py @@ -0,0 +1,8 @@ +from .monitor import ( + ChargingEvent, + InverterMonitor, + BatteryState, + BatteryPowerDirection +) +from .inverter_wrapper import wrapper_instance +from .util import beautify_table diff --git a/src/home/inverter/inverter_wrapper.py b/src/home/inverter/inverter_wrapper.py new file mode 100644 index 0000000..df2c2fc --- /dev/null +++ b/src/home/inverter/inverter_wrapper.py @@ -0,0 +1,48 @@ +import json + +from threading import Lock +from inverterd import ( + Format, + Client as InverterClient, + InverterError +) + +_lock = Lock() + + +class InverterClientWrapper: + def __init__(self): + self._inverter = None + self._host = None + self._port = None + + def init(self, host: str, port: int): + self._host = host + self._port = port + self.create() + + def create(self): + self._inverter = InverterClient(host=self._host, port=self._port) + self._inverter.connect() + + def exec(self, command: str, arguments: tuple = (), format=Format.JSON): + with _lock: + try: + self._inverter.format(format) + response = self._inverter.exec(command, arguments) + if format == Format.JSON: + response = json.loads(response) + return response + except InverterError as e: + raise e + except Exception as e: + # silently try to reconnect + try: + self.create() + except Exception: + pass + raise e + + +wrapper_instance = InverterClientWrapper() + diff --git a/src/home/inverter/monitor.py b/src/home/inverter/monitor.py new file mode 100644 index 0000000..02ae155 --- /dev/null +++ b/src/home/inverter/monitor.py @@ -0,0 +1,448 @@ +import logging +import time + +from enum import Enum, auto +from threading import Thread +from typing import Callable, Optional +from .inverter_wrapper import wrapper_instance as inverter +from inverterd import InverterError +from ..util import Stopwatch, StopwatchError +from ..config import config + +logger = logging.getLogger(__name__) + + +class BatteryPowerDirection(Enum): + DISCHARGING = auto() + CHARGING = auto() + DO_NOTHING = auto() + + +class ChargingEvent(Enum): + AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR = auto() + AC_NOT_CHARGING = auto() + AC_CHARGING_STARTED = auto() + AC_DISCONNECTED = auto() + AC_CURRENT_CHANGED = auto() + AC_MOSTLY_CHARGED = auto() + AC_CHARGING_FINISHED = auto() + + +class ChargingState(Enum): + NOT_CHARGING = auto() + AC_BUT_SOLAR = auto() + AC_WAITING = auto() + AC_OK = auto() + AC_DONE = auto() + + +class CurrentChangeDirection(Enum): + UP = auto() + DOWN = auto() + + +class BatteryState(Enum): + NORMAL = auto() + LOW = auto() + CRITICAL = auto() + + +def _pd_from_string(pd: str) -> BatteryPowerDirection: + if pd == 'Discharge': + return BatteryPowerDirection.DISCHARGING + elif pd == 'Charge': + return BatteryPowerDirection.CHARGING + elif pd == 'Do nothing': + return BatteryPowerDirection.DO_NOTHING + else: + raise ValueError(f'invalid power direction: {pd}') + + +class MonitorConfig: + def __getattr__(self, item): + return config['monitor'][item] + + +cfg = MonitorConfig() + + +class InverterMonitor(Thread): + charging_event_handler: Optional[Callable] + battery_event_handler: Optional[Callable] + error_handler: Optional[Callable] + + def __init__(self): + super().__init__() + self.setName('InverterMonitor') + + self.interrupted = False + self.min_allowed_current = 0 + + # Event handlers for the bot. + self.charging_event_handler = None + self.battery_event_handler = None + self.error_handler = None + + # Currents list, defined in the bot config. + self.currents = cfg.gen_currents + self.currents.sort() + + # We start charging at lowest possible current, then increase it once per minute (or so) to the maximum level. + # This is done so that the load on the generator increases smoothly, not abruptly. Generator will thank us. + self.current_change_direction = CurrentChangeDirection.UP + self.next_current_enter_time = 0 + self.active_current_idx = -1 + + self.battery_state = BatteryState.NORMAL + self.charging_state = ChargingState.NOT_CHARGING + + # 'Mostly-charged' means that we've already lowered the charging current to the level + # at which batteries are charging pretty slow. So instead of burning gasoline and shaking the air, + # we can just turn the generator off at this point. + self.mostly_charged = False + + # The stopwatch is used to measure how long does the battery voltage exceeds the float voltage level. + # We don't want to damage our batteries, right? + self.floating_stopwatch = Stopwatch() + + @property + def active_current(self) -> Optional[int]: + try: + if self.active_current_idx < 0: + return None + return self.currents[self.active_current_idx] + except IndexError: + return None + + def run(self): + # Check allowed currents and validate the config. + allowed_currents = list(inverter.exec('get-allowed-ac-charging-currents')['data']) + allowed_currents.sort() + + for a in self.currents: + if a not in allowed_currents: + raise ValueError(f'invalid value {a} in gen_currents list') + + self.min_allowed_current = min(allowed_currents) + + # Read data and run implemented programs every 2 seconds. + while not self.interrupted: + try: + response = inverter.exec('get-status') + if response['result'] != 'ok': + logger.error('get-status failed:', response) + else: + gs = response['data'] + + ac = gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0 + solar = gs['pv1_input_power']['value'] > 0 + v = float(gs['battery_voltage']['value']) + load_watts = int(gs['ac_output_active_power']['value']) + pd = _pd_from_string(gs['battery_power_direction']) + + logger.debug(f'got status: ac={ac}, solar={solar}, v={v}, pd={pd}') + + self.gen_charging_program(ac, solar, v, pd) + + if not ac or pd != BatteryPowerDirection.CHARGING: + # if AC is disconnected or not charging, run the low voltage checking program + self.low_voltage_program(v, load_watts) + + elif self.battery_state != BatteryState.NORMAL: + # AC is connected and the battery is charging, assume battery level is normal + self.battery_state = BatteryState.NORMAL + + except InverterError as e: + logger.exception(e) + + time.sleep(2) + + def gen_charging_program(self, + ac: bool, # whether AC is connected + solar: bool, # whether MPPT is active + v: float, # current battery voltage + pd: BatteryPowerDirection # current power direction + ): + if self.charging_state == ChargingState.NOT_CHARGING: + if ac and solar: + # Not charging because MPPT is active (solar line is connected). + # Notify users about it and change the current state. + self.charging_state = ChargingState.AC_BUT_SOLAR + self.charging_event_handler(ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR) + logger.info('entering AC_BUT_SOLAR state') + elif ac: + # Not charging, but AC is connected and ready to use. + # Start the charging program. + self.gen_start(pd) + + elif self.charging_state == ChargingState.AC_BUT_SOLAR: + if not ac: + # AC charger has been disconnected. Since the state is AC_BUT_SOLAR, + # charging probably never even started. Stop the charging program. + self.gen_stop(ChargingState.NOT_CHARGING) + elif not solar: + # MPPT has been disconnected, and, since AC is still connected, we can + # try to start the charging program. + self.gen_start(pd) + + elif self.charging_state in (ChargingState.AC_OK, ChargingState.AC_WAITING): + if not ac: + # Charging was in progress, but AC has been suddenly disconnected. + # Sad, but what can we do? Stop the charging program and return. + self.gen_stop(ChargingState.NOT_CHARGING) + return + + if solar: + # Charging was in progress, but MPPT has been detected. Inverter doesn't charge + # batteries from AC when MPPT is active, so we have to pause our program. + self.charging_state = ChargingState.AC_BUT_SOLAR + self.charging_event_handler(ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR) + try: + self.floating_stopwatch.pause() + except StopwatchError: + msg = 'gen_charging_program: floating_stopwatch.pause() failed at (1)' + logger.warning(msg) + self.error_handler(msg) + logger.info('solar power connected during charging, entering AC_BUT_SOLAR state') + + # No surprises at this point, just check the values and make decisions based on them. + # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + # We've reached the 'mostly-charged' point, the voltage level is not float, + # but inverter decided to stop charging (or somebody used a kettle, lol). + # Anyway, assume that charging is complete, stop the program, notify users and return. + if self.mostly_charged and v > (cfg.gen_floating_v - 1) and pd != BatteryPowerDirection.CHARGING: + self.gen_stop(ChargingState.AC_DONE) + return + + # Monitor inverter power direction and notify users when it changes. + state = ChargingState.AC_OK if pd == BatteryPowerDirection.CHARGING else ChargingState.AC_WAITING + if state != self.charging_state: + self.charging_state = state + + evt = ChargingEvent.AC_CHARGING_STARTED if state == ChargingState.AC_OK else ChargingEvent.AC_NOT_CHARGING + self.charging_event_handler(evt) + + if self.floating_stopwatch.get_elapsed_time() >= cfg.gen_floating_time_max: + # We've been at a bulk voltage level too long, so we have to stop charging. + # Set the minimum current possible. + + if self.current_change_direction == CurrentChangeDirection.UP: + # This shouldn't happen, obviously an error. + msg = 'gen_charging_program:' + msg += ' been at bulk voltage level too long, but current change direction is still \'up\'!' + msg += ' This is obviously an error, please fix it' + logger.warning(msg) + self.error_handler(msg) + + self.gen_next_current(current=self.min_allowed_current) + + elif self.active_current is not None: + # If voltage is greater than float voltage, keep the stopwatch ticking + if v > cfg.gen_floating_v and self.floating_stopwatch.is_paused(): + try: + self.floating_stopwatch.go() + except StopwatchError: + msg = 'gen_charging_program: floating_stopwatch.go() failed at (2)' + logger.warning(msg) + self.error_handler(msg) + # Otherwise, pause it + elif v <= cfg.gen_floating_v and not self.floating_stopwatch.is_paused(): + try: + self.floating_stopwatch.pause() + except StopwatchError: + msg = 'gen_charging_program: floating_stopwatch.pause() failed at (3)' + logger.warning(msg) + self.error_handler(msg) + + # Charging current monitoring + if self.current_change_direction == CurrentChangeDirection.UP: + # Generator is warming up in this code path + + if self.next_current_enter_time != 0 and pd != BatteryPowerDirection.CHARGING: + # Generator was warming up and charging, but stopped (pd has changed). + # Resetting to the minimum possible pd + logger.info(f'gen_charging_program (warming path): was charging but power direction suddeny changed. resetting to minimum current') + self.next_current_enter_time = 0 + self.gen_next_current(current=self.min_allowed_current) + + elif self.next_current_enter_time == 0 and pd == BatteryPowerDirection.CHARGING: + self.next_current_enter_time = time.time() + cfg.gen_raise_intervals[self.active_current_idx] + logger.info(f'gen_charging_program (warming path): set next_current_enter_time to {self.next_current_enter_time}') + + elif self.next_current_enter_time != 0 and time.time() >= self.next_current_enter_time: + logger.info('gen_charging_program (warming path): hit next_current_enter_time, calling gen_next_current()') + self.gen_next_current() + else: + # Gradually lower the current level, based on how close + # battery voltage has come to the bulk level. + if self.active_current >= 30: + upper_bound = cfg.gen_cur30_v_limit + elif self.active_current == 20: + upper_bound = cfg.gen_cur20_v_limit + else: + upper_bound = cfg.gen_cur10_v_limit + + # Voltage is high enough already and it's close to bulk level; we hit the upper bound, + # so let's lower the current + if v >= upper_bound: + self.gen_next_current() + + elif self.charging_state == ChargingState.AC_DONE: + # We've already finished charging, but AC was connected. Not that it's disconnected, + # set the appropriate state and notify users. + if not ac: + self.gen_stop(ChargingState.NOT_CHARGING) + + def gen_start(self, pd: BatteryPowerDirection): + if pd == BatteryPowerDirection.CHARGING: + self.charging_state = ChargingState.AC_OK + self.charging_event_handler(ChargingEvent.AC_CHARGING_STARTED) + logger.info('AC line connected and charging, entering AC_OK state') + + # Continue the stopwatch, if needed + try: + self.floating_stopwatch.go() + except StopwatchError: + msg = 'floating_stopwatch.go() failed at ac_charging_start(), AC_OK path' + logger.warning(msg) + self.error_handler(msg) + else: + self.charging_state = ChargingState.AC_WAITING + self.charging_event_handler(ChargingEvent.AC_NOT_CHARGING) + logger.info('AC line connected but not charging yet, entering AC_WAITING state') + + # Pause the stopwatch, if needed + try: + if not self.floating_stopwatch.is_paused(): + self.floating_stopwatch.pause() + except StopwatchError: + msg = 'floating_stopwatch.pause() failed at ac_charging_start(), AC_WAITING path' + logger.warning(msg) + self.error_handler(msg) + + # idx == -1 means haven't started our program yet. + if self.active_current_idx == -1: + self.gen_next_current() + # self.set_hw_charging_current(self.min_allowed_current) + + def gen_stop(self, reason: ChargingState): + self.charging_state = reason + + if reason == ChargingState.AC_DONE: + event = ChargingEvent.AC_CHARGING_FINISHED + elif reason == ChargingState.NOT_CHARGING: + event = ChargingEvent.AC_DISCONNECTED + else: + raise ValueError(f'ac_charging_stop: unexpected reason {reason}') + + logger.info(f'charging is finished, entering {reason} state') + self.charging_event_handler(event) + + # Let Mr. Proper do his job + if self.active_current_idx != -1: + self.next_current_enter_time = 0 + self.mostly_charged = False + self.active_current_idx = -1 + self.floating_stopwatch.reset() + + def gen_next_current(self, current=None): + if current is None: + try: + current = self._next_current() + logger.debug(f'gen_next_current: ready to change charging current to {current} A') + except IndexError: + logger.debug('gen_next_current: was going to change charging current, but no currents left; finishing charging program') + self.gen_stop(ChargingState.AC_DONE) + return + + else: + try: + idx = self.currents.index(current) + except ValueError: + msg = f'gen_next_current: got current={current} but it\'s not in the currents list' + logger.error(msg) + self.error_handler(msg) + return + self.active_current_idx = idx + + if self.current_change_direction == CurrentChangeDirection.DOWN: + if current == self.currents[0]: + self.mostly_charged = True + self.gen_stop(ChargingState.AC_DONE) + + elif current == self.currents[1] and not self.mostly_charged: + self.mostly_charged = True + self.charging_event_handler(ChargingEvent.AC_MOSTLY_CHARGED) + + self.set_hw_charging_current(current) + + def set_hw_charging_current(self, current: int): + try: + response = inverter.exec('set-max-ac-charging-current', (0, current)) + if response['result'] != 'ok': + logger.error(f'failed to change AC charging current to {current} A') + raise InverterError('set-max-ac-charging-current: inverterd reported error') + else: + self.charging_event_handler(ChargingEvent.AC_CURRENT_CHANGED, current=current) + logger.info(f'changed AC charging current to {current} A') + except InverterError as e: + self.error_handler(f'failed to set charging current to {current} A (caught InverterError)') + logger.exception(e) + + def _next_current(self): + if self.current_change_direction == CurrentChangeDirection.UP: + self.active_current_idx += 1 + if self.active_current_idx == len(self.currents)-1: + logger.info('_next_current: charging current power direction to DOWN') + self.current_change_direction = CurrentChangeDirection.DOWN + self.next_current_enter_time = 0 + else: + if self.active_current_idx == 0: + raise IndexError('can\'t go lower') + self.active_current_idx -= 1 + + logger.info(f'_next_current: active_current_idx set to {self.active_current_idx}, returning current of {self.currents[self.active_current_idx]} A') + return self.currents[self.active_current_idx] + + def low_voltage_program(self, v: float, load_watts: int): + crit_level = cfg.vcrit + low_level = cfg.vlow + + if v <= crit_level: + state = BatteryState.CRITICAL + elif v <= low_level: + state = BatteryState.LOW + else: + state = BatteryState.NORMAL + + if state != self.battery_state: + self.battery_state = state + self.battery_event_handler(state, v, load_watts) + + def set_charging_event_handler(self, handler: Callable): + self.charging_event_handler = handler + + def set_battery_event_handler(self, handler: Callable): + self.battery_event_handler = handler + + def set_error_handler(self, handler: Callable): + self.error_handler = handler + + def stop(self): + self.interrupted = True + + def dump_status(self) -> dict: + return { + 'interrupted': self.interrupted, + 'currents': self.currents, + 'active_current': self.active_current, + 'current_change_direction': self.current_change_direction.name, + 'battery_state': self.battery_state.name, + 'charging_state': self.charging_state.name, + 'mostly_charged': self.mostly_charged, + 'floating_stopwatch_paused': self.floating_stopwatch.is_paused(), + 'floating_stopwatch_elapsed': self.floating_stopwatch.get_elapsed_time(), + 'time_now': time.time(), + 'next_current_enter_time': self.next_current_enter_time, + } diff --git a/src/home/inverter/util.py b/src/home/inverter/util.py new file mode 100644 index 0000000..a577e6a --- /dev/null +++ b/src/home/inverter/util.py @@ -0,0 +1,8 @@ +import re + + +def beautify_table(s): + lines = s.split('\n') + lines = list(map(lambda line: re.sub(r'\s+', ' ', line), lines)) + lines = list(map(lambda line: re.sub(r'(.*?): (.*)', r'<b>\1:</b> \2', line), lines)) + return '\n'.join(lines) diff --git a/src/home/mqtt/__init__.py b/src/home/mqtt/__init__.py new file mode 100644 index 0000000..c0ef9ba --- /dev/null +++ b/src/home/mqtt/__init__.py @@ -0,0 +1,2 @@ +from .mqtt import MQTTBase +from .util import poll_tick diff --git a/src/home/mqtt/message/__init__.py b/src/home/mqtt/message/__init__.py new file mode 100644 index 0000000..2a2221b --- /dev/null +++ b/src/home/mqtt/message/__init__.py @@ -0,0 +1,2 @@ +from .inverter import Status, Generation +from .sensors import Temperature diff --git a/src/home/mqtt/message/inverter.py b/src/home/mqtt/message/inverter.py new file mode 100644 index 0000000..2df17e5 --- /dev/null +++ b/src/home/mqtt/message/inverter.py @@ -0,0 +1,86 @@ +import struct + +from typing import Tuple + + +class Status: + # 46 bytes + format = 'IHHHHHHBHHHHHBHHHHHHHH' + + def pack(self, time: int, data: dict) -> bytes: + bits = 0 + bits |= (data['mppt1_charger_status'] & 0x3) + bits |= (data['mppt2_charger_status'] & 0x3) << 2 + bits |= (data['battery_power_direction'] & 0x3) << 4 + bits |= (data['dc_ac_power_direction'] & 0x3) << 6 + bits |= (data['line_power_direction'] & 0x3) << 8 + bits |= (data['load_connected'] & 0x1) << 10 + + return struct.pack( + self.format, + time, + int(data['grid_voltage'] * 10), + int(data['grid_freq'] * 10), + int(data['ac_output_voltage'] * 10), + int(data['ac_output_freq'] * 10), + data['ac_output_apparent_power'], + data['ac_output_active_power'], + data['output_load_percent'], + int(data['battery_voltage'] * 10), + int(data['battery_voltage_scc'] * 10), + int(data['battery_voltage_scc2'] * 10), + data['battery_discharging_current'], + data['battery_charging_current'], + data['battery_capacity'], + data['inverter_heat_sink_temp'], + data['mppt1_charger_temp'], + data['mppt2_charger_temp'], + data['pv1_input_power'], + data['pv2_input_power'], + int(data['pv1_input_voltage'] * 10), + int(data['pv2_input_voltage'] * 10), + bits + ) + + def unpack(self, buf: bytes) -> Tuple[int, dict]: + data = struct.unpack(self.format, buf) + return data[0], { + 'grid_voltage': data[1] / 10, + 'grid_freq': data[2] / 10, + 'ac_output_voltage': data[3] / 10, + 'ac_output_freq': data[4] / 10, + 'ac_output_apparent_power': data[5], + 'ac_output_active_power': data[6], + 'output_load_percent': data[7], + 'battery_voltage': data[8] / 10, + 'battery_voltage_scc': data[9] / 10, + 'battery_voltage_scc2': data[10] / 10, + 'battery_discharging_current': data[11], + 'battery_charging_current': data[12], + 'battery_capacity': data[13], + 'inverter_heat_sink_temp': data[14], + 'mppt1_charger_temp': data[15], + 'mppt2_charger_temp': data[16], + 'pv1_input_power': data[17], + 'pv2_input_power': data[18], + 'pv1_input_voltage': data[19] / 10, + 'pv2_input_voltage': data[20] / 10, + 'mppt1_charger_status': data[21] & 0x03, + 'mppt2_charger_status': (data[21] >> 2) & 0x03, + 'battery_power_direction': (data[21] >> 4) & 0x03, + 'dc_ac_power_direction': (data[21] >> 6) & 0x03, + 'line_power_direction': (data[21] >> 8) & 0x03, + 'load_connected': (data[21] >> 10) & 0x01, + } + + +class Generation: + # 8 bytes + format = 'II' + + def pack(self, time: int, wh: int) -> bytes: + return struct.pack(self.format, int(time), wh) + + def unpack(self, buf: bytes) -> tuple: + data = struct.unpack(self.format, buf) + return tuple(data) diff --git a/src/home/mqtt/message/sensors.py b/src/home/mqtt/message/sensors.py new file mode 100644 index 0000000..ee522f0 --- /dev/null +++ b/src/home/mqtt/message/sensors.py @@ -0,0 +1,19 @@ +import struct + +from typing import Tuple + + +class Temperature: + format = 'IhH' + + def pack(self, time: int, temp: float, rh: float) -> bytes: + return struct.pack( + self.format, + time, + int(temp*100), + int(rh*100) + ) + + def unpack(self, buf: bytes) -> Tuple[int, float, float]: + data = struct.unpack(self.format, buf) + return data[0], data[1]/100, data[2]/100 diff --git a/src/home/mqtt/mqtt.py b/src/home/mqtt/mqtt.py new file mode 100644 index 0000000..b360d22 --- /dev/null +++ b/src/home/mqtt/mqtt.py @@ -0,0 +1,61 @@ +import os.path +import paho.mqtt.client as mqtt +import ssl +import logging + +from typing import Tuple +from ..config import config + +logger = logging.getLogger(__name__) + + +def username_and_password() -> Tuple[str, str]: + username = config['mqtt']['username'] if 'username' in config['mqtt'] else None + password = config['mqtt']['password'] if 'password' in config['mqtt'] else None + return username, password + + +class MQTTBase: + def __init__(self, clean_session=True): + self.client = mqtt.Client(client_id=config['mqtt']['client_id'], + protocol=mqtt.MQTTv311, + clean_session=clean_session) + self.client.on_connect = self.on_connect + self.client.on_disconnect = self.on_disconnect + self.client.on_message = self.on_message + + self.home_id = 1 + + username, password = username_and_password() + if username and password: + self.client.username_pw_set(username, password) + + def configure_tls(self): + ca_certs = os.path.realpath(os.path.join( + os.path.dirname(os.path.realpath(__file__)), + '..', + '..', + '..', + 'assets', + 'mqtt_ca.crt' + )) + self.client.tls_set(ca_certs=ca_certs, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2) + + def connect_and_loop(self, loop_forever=True): + host = config['mqtt']['host'] + port = config['mqtt']['port'] + + self.client.connect(host, port, 60) + if loop_forever: + self.client.loop_forever() + else: + self.client.loop_start() + + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + logger.info("Connected with result code " + str(rc)) + + def on_disconnect(self, client: mqtt.Client, userdata, rc): + logger.info("Disconnected with result code " + str(rc)) + + def on_message(self, client: mqtt.Client, userdata, msg): + logger.info(msg.topic + ": " + str(msg.payload)) diff --git a/src/home/mqtt/util.py b/src/home/mqtt/util.py new file mode 100644 index 0000000..f71ffd8 --- /dev/null +++ b/src/home/mqtt/util.py @@ -0,0 +1,8 @@ +import time + + +def poll_tick(freq): + t = time.time() + while True: + t += freq + yield max(t - time.time(), 0) diff --git a/src/home/relay/__init__.py b/src/home/relay/__init__.py new file mode 100644 index 0000000..f1568be --- /dev/null +++ b/src/home/relay/__init__.py @@ -0,0 +1,16 @@ +import importlib + +__all__ = ['RelayClient', 'RelayServer'] + + +def __getattr__(name): + _map = { + 'RelayClient': '.client', + 'RelayServer': '.server' + } + + if name in __all__: + module = importlib.import_module(_map[name], __name__) + return getattr(module, name) + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/home/relay/__init__.pyi b/src/home/relay/__init__.pyi new file mode 100644 index 0000000..94341f6 --- /dev/null +++ b/src/home/relay/__init__.pyi @@ -0,0 +1,2 @@ +from .client import RelayClient as RelayClient +from .server import RelayServer as RelayServer diff --git a/src/home/relay/client.py b/src/home/relay/client.py new file mode 100644 index 0000000..8c8d6c4 --- /dev/null +++ b/src/home/relay/client.py @@ -0,0 +1,39 @@ +import socket + + +class RelayClient: + def __init__(self, port=8307, host='127.0.0.1'): + self._host = host + self._port = port + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + def __del__(self): + self.sock.close() + + def connect(self): + self.sock.connect((self._host, self._port)) + + def _write(self, line): + self.sock.sendall((line+'\r\n').encode()) + + def _read(self): + buf = bytearray() + while True: + buf.extend(self.sock.recv(256)) + if b'\r\n' in buf: + break + + response = buf.decode().strip() + return response + + def on(self): + self._write('on') + return self._read() + + def off(self): + self._write('off') + return self._read() + + def status(self): + self._write('get') + return self._read() diff --git a/src/home/relay/server.py b/src/home/relay/server.py new file mode 100644 index 0000000..1f33969 --- /dev/null +++ b/src/home/relay/server.py @@ -0,0 +1,82 @@ +import asyncio +import logging + +from pyA20.gpio import gpio +from pyA20.gpio import port as gpioport +from ..util import Addr + +logger = logging.getLogger(__name__) + + +class RelayServer: + OFF = 1 + ON = 0 + + def __init__(self, + pinname: str, + addr: Addr): + if not hasattr(gpioport, pinname): + raise ValueError(f'invalid pin {pinname}') + + self.pin = getattr(gpioport, pinname) + self.addr = addr + + gpio.init() + gpio.setcfg(self.pin, gpio.OUTPUT) + + self.lock = asyncio.Lock() + + def run(self): + asyncio.run(self.run_server()) + + async def relay_set(self, value): + async with self.lock: + gpio.output(self.pin, value) + + async def relay_get(self): + async with self.lock: + return int(gpio.input(self.pin)) == RelayServer.ON + + async def handle_client(self, reader, writer): + request = None + while request != 'quit': + try: + request = await reader.read(255) + if request == b'\x04': + break + request = request.decode('utf-8').strip() + except Exception: + break + + data = 'unknown' + if request == 'on': + await self.relay_set(RelayServer.ON) + logger.debug('set on') + data = 'ok' + + elif request == 'off': + await self.relay_set(RelayServer.OFF) + logger.debug('set off') + data = 'ok' + + elif request == 'get': + status = await self.relay_get() + data = 'on' if status is True else 'off' + + writer.write((data + '\r\n').encode('utf-8')) + try: + await writer.drain() + except ConnectionError: + break + + try: + writer.close() + except ConnectionError: + pass + + async def run_server(self): + host, port = self.addr + server = await asyncio.start_server(self.handle_client, host, port) + async with server: + logger.info('Server started.') + await server.serve_forever() diff --git a/src/home/sound/__init__.py b/src/home/sound/__init__.py new file mode 100644 index 0000000..43ddaff --- /dev/null +++ b/src/home/sound/__init__.py @@ -0,0 +1,8 @@ +from .node_client import SoundNodeClient +from .record import ( + RecordStatus, + RecordingNotFoundError, + Recorder, +) +from .storage import RecordStorage, RecordFile +from .record_client import RecordClient diff --git a/src/home/sound/amixer.py b/src/home/sound/amixer.py new file mode 100644 index 0000000..0ab2c64 --- /dev/null +++ b/src/home/sound/amixer.py @@ -0,0 +1,91 @@ +import subprocess + +from ..config import config +from threading import Lock +from typing import Union + + +_lock = Lock() +_default_step = 5 + + +def has_control(s: str) -> bool: + for control in config['amixer']['controls']: + if control['name'] == s: + return True + return False + + +def get_caps(s: str) -> list[str]: + for control in config['amixer']['controls']: + if control['name'] == s: + return control['caps'] + raise KeyError(f'control {s} not found') + + +def get_all() -> list: + controls = [] + for control in config['amixer']['controls']: + controls.append({ + 'name': control['name'], + 'info': get(control['name']), + 'caps': control['caps'] + }) + return controls + + +def get(control: str): + return call('get', control) + + +def mute(control): + return call('set', control, 'mute') + + +def unmute(control): + return call('set', control, 'unmute') + + +def cap(control): + return call('set', control, 'cap') + + +def nocap(control): + return call('set', control, 'nocap') + + +def _get_default_step() -> int: + if 'step' in config['amixer']: + return int(config['amixer']['step']) + + return _default_step + + +def incr(control, step=None): + if step is None: + step = _get_default_step() + return call('set', control, f'{step}%+') + + +def decr(control, step=None): + if step is None: + step = _get_default_step() + return call('set', control, f'{step}%-') + + +def call(*args, return_code=False) -> Union[int, str]: + with _lock: + result = subprocess.run([config['amixer']['bin'], *args], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if return_code: + return result.returncode + + if result.returncode != 0: + raise AmixerError(result.stderr.decode().strip()) + + return result.stdout.decode().strip() + + +class AmixerError(OSError): + pass diff --git a/src/home/sound/node_client.py b/src/home/sound/node_client.py new file mode 100644 index 0000000..7341208 --- /dev/null +++ b/src/home/sound/node_client.py @@ -0,0 +1,109 @@ +import requests +import logging +import shutil + +from ..util import Addr +from ..api.errors import ApiResponseError +from typing import Optional, Union +from .record import RecordFile + + +class SoundNodeClient: + def __init__(self, addr: Addr): + self.endpoint = f'http://{addr[0]}:{addr[1]}' + self.logger = logging.getLogger(self.__class__.__name__) + + def amixer_get_all(self): + return self._call('amixer/get-all/') + + def amixer_get(self, control: str): + return self._call(f'amixer/get/{control}/') + + def amixer_incr(self, control: str, step: Optional[int] = None): + params = {'step': step} if step is not None else None + return self._call(f'amixer/incr/{control}/', params=params) + + def amixer_decr(self, control: str, step: Optional[int] = None): + params = {'step': step} if step is not None else None + return self._call(f'amixer/decr/{control}/', params=params) + + def amixer_mute(self, control: str): + return self._call(f'amixer/mute/{control}/') + + def amixer_unmute(self, control: str): + return self._call(f'amixer/unmute/{control}/') + + def amixer_cap(self, control: str): + return self._call(f'amixer/cap/{control}/') + + def amixer_nocap(self, control: str): + return self._call(f'amixer/nocap/{control}/') + + def record(self, duration: int): + return self._call('record/', params={"duration": duration}) + + def record_info(self, record_id: int): + return self._call(f'record/info/{record_id}/') + + def record_forget(self, record_id: int): + return self._call(f'record/forget/{record_id}/') + + def record_download(self, record_id: int, output: str): + return self._call(f'record/download/{record_id}/', save_to=output) + + def storage_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]: + r = self._call('storage/list/', params={'extended': int(extended)}) + files = r['files'] + if as_objects: + return self.record_list_from_serialized(files) + return files + + @staticmethod + def record_list_from_serialized(files: Union[list[str], list[dict]]): + new_files = [] + for f in files: + kwargs = {'remote': True} + if isinstance(f, dict): + name = f['filename'] + kwargs['remote_filesize'] = f['filesize'] + else: + name = f + item = RecordFile(name, **kwargs) + new_files.append(item) + return new_files + + def storage_delete(self, file_id: str): + return self._call('storage/delete/', params={'file_id': file_id}) + + def storage_download(self, file_id: str, output: str): + return self._call('storage/download/', params={'file_id': file_id}, save_to=output) + + def _call(self, + method: str, + params: dict = None, + save_to: Optional[str] = None): + + kwargs = {} + if isinstance(params, dict): + kwargs['params'] = params + if save_to: + kwargs['stream'] = True + + url = f'{self.endpoint}/{method}' + self.logger.debug(f'calling {url}, kwargs: {kwargs}') + + r = requests.get(url, **kwargs) + if r.status_code != 200: + response = r.json() + raise ApiResponseError(status_code=r.status_code, + error_type=response['error'], + error_message=response['message'] or None, + error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None) + + if save_to: + r.raise_for_status() + with open(save_to, 'wb') as f: + shutil.copyfileobj(r.raw, f) + return True + + return r.json()['response'] diff --git a/src/home/sound/record.py b/src/home/sound/record.py new file mode 100644 index 0000000..1ad8827 --- /dev/null +++ b/src/home/sound/record.py @@ -0,0 +1,400 @@ +import threading +import time +import subprocess +import signal +import os +import logging + +from enum import Enum, auto +from typing import Optional +from ..config import config +from ..util import find_child_processes +from .storage import RecordFile, RecordStorage + + +_history_item_timeout = 7200 +_history_cleanup_freq = 3600 + + +class RecordStatus(Enum): + WAITING = auto() + RECORDING = auto() + FINISHED = auto() + ERROR = auto() + + +class RecordHistoryItem: + id: int + request_time: float + start_time: float + stop_time: float + relations: list[int] + status: RecordStatus + error: Optional[Exception] + file: Optional[RecordFile] + creation_time: float + + def __init__(self, id): + self.id = id + self.request_time = 0 + self.start_time = 0 + self.stop_time = 0 + self.relations = [] + self.status = RecordStatus.WAITING + self.file = None + self.error = None + self.creation_time = time.time() + + def add_relation(self, related_id: int): + self.relations.append(related_id) + + def mark_started(self, start_time: float): + self.start_time = start_time + self.status = RecordStatus.RECORDING + + def mark_finished(self, end_time: float, file: RecordFile): + self.stop_time = end_time + self.file = file + self.status = RecordStatus.FINISHED + + def mark_failed(self, error: Exception): + self.status = RecordStatus.ERROR + self.error = error + + def as_dict(self) -> dict: + data = { + 'id': self.id, + 'request_time': self.request_time, + 'status': self.status.value, + 'relations': self.relations, + 'start_time': self.start_time, + 'stop_time': self.stop_time, + } + if self.error: + data['error'] = str(self.error) + if self.file: + data['file'] = self.file.__dict__() + return data + + +class RecordingNotFoundError(Exception): + pass + + +class RecordHistory: + history: dict[int, RecordHistoryItem] + + def __init__(self): + self.history = {} + self.logger = logging.getLogger(self.__class__.__name__) + + def add(self, record_id: int): + self.logger.debug(f'add: record_id={record_id}') + + r = RecordHistoryItem(record_id) + r.request_time = time.time() + + self.history[record_id] = r + + def delete(self, record_id: int): + self.logger.debug(f'delete: record_id={record_id}') + del self.history[record_id] + + def cleanup(self): + del_ids = [] + for rid, item in self.history.items(): + if item.creation_time < time.time()-_history_item_timeout: + del_ids.append(rid) + for rid in del_ids: + self.delete(rid) + + def __getitem__(self, key): + if key not in self.history: + raise RecordingNotFoundError() + + return self.history[key] + + def __setitem__(self, key, value): + raise NotImplementedError('setting history item this way is prohibited') + + def __contains__(self, key): + return key in self.history + + +class Recording: + start_time: float + stop_time: float + duration: int + record_id: int + arecord_pid: Optional[int] + process: Optional[subprocess.Popen] + + g_record_id = 1 + + def __init__(self): + self.start_time = 0 + self.stop_time = 0 + self.duration = 0 + self.process = None + self.arecord_pid = None + self.record_id = Recording.next_id() + self.logger = logging.getLogger(self.__class__.__name__) + + def is_started(self) -> bool: + return self.start_time > 0 and self.stop_time > 0 + + def is_waiting(self): + return self.duration > 0 + + def ask_for(self, duration) -> int: + overtime = 0 + orig_duration = duration + + if self.is_started(): + already_passed = time.time() - self.start_time + max_duration = Recorder.get_max_record_time() - already_passed + self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}') + else: + max_duration = Recorder.get_max_record_time() + + if duration > max_duration: + overtime = duration - max_duration + duration = max_duration + + self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}') + + self.duration += duration + if self.is_started(): + til_end = self.stop_time - time.time() + if til_end < 0: + til_end = 0 + + _prev_stop_time = self.stop_time + _to_add = duration - til_end + if _to_add < 0: + _to_add = 0 + + self.stop_time += _to_add + self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})') + + return overtime + + def start(self, output: str): + assert self.start_time == 0 and self.stop_time == 0, "already started?!" + assert self.process is None, "self.process is not None, what the hell?" + + cur = time.time() + self.start_time = cur + self.stop_time = cur + self.duration + + arecord = config['arecord']['bin'] + lame = config['lame']['bin'] + b = config['lame']['bitrate'] + + cmd = f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null' + self.logger.debug(f'start: running `{cmd}`') + self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True) + + sh_pid = self.process.pid + self.logger.debug(f'start: started, pid of shell is {sh_pid}') + + arecord_pid = self.find_arecord_pid(sh_pid) + if arecord_pid is not None: + self.arecord_pid = arecord_pid + self.logger.debug(f'start: pid of arecord is {arecord_pid}') + + def stop(self): + if self.process: + if self.arecord_pid is None: + self.arecord_pid = self.find_arecord_pid(self.process.pid) + + if self.arecord_pid is not None: + os.kill(self.arecord_pid, signal.SIGINT) + timeout = config['node']['process_wait_timeout'] + + self.logger.debug(f'stop: sent SIGINT to {self.arecord_pid}. now waiting up to {timeout} seconds...') + try: + self.process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + self.logger.warning(f'stop: wait({timeout}): timeout expired, calling terminate()') + self.process.terminate() + else: + self.logger.warning('stop: pid of arecord is unknown, calling terminate()') + self.process.terminate() + + rc = self.process.returncode + self.logger.debug(f'stop: rc={rc}') + + self.process = None + self.arecord_pid = 0 + + self.duration = 0 + self.start_time = 0 + self.stop_time = 0 + + def find_arecord_pid(self, sh_pid: int): + try: + children = find_child_processes(sh_pid) + except OSError as exc: + self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc)) + return None + + for child in children: + if 'arecord' in child.cmd: + return child.pid + + return None + + @staticmethod + def next_id() -> int: + cur_id = Recording.g_record_id + Recording.g_record_id += 1 + return cur_id + + def increment_id(self): + self.record_id = Recording.next_id() + + +class Recorder: + interrupted: bool + lock: threading.Lock + history_lock: threading.Lock + recording: Optional[Recording] + overtime: int + history: RecordHistory + next_history_cleanup_time: float + storage: RecordStorage + + def __init__(self, storage: RecordStorage): + self.storage = storage + self.recording = Recording() + self.interrupted = False + self.lock = threading.Lock() + self.history_lock = threading.Lock() + self.overtime = 0 + self.history = RecordHistory() + self.next_history_cleanup_time = 0 + self.logger = logging.getLogger(self.__class__.__name__) + + def start_thread(self): + t = threading.Thread(target=self.loop) + t.daemon = True + t.start() + + def loop(self) -> None: + tempname = os.path.join(self.storage.root, 'temp.mp3') + + while not self.interrupted: + cur = time.time() + stopped = False + cur_record_id = None + + if self.next_history_cleanup_time == 0: + self.next_history_cleanup_time = time.time() + _history_cleanup_freq + elif self.next_history_cleanup_time <= time.time(): + self.logger.debug('loop: calling history.cleanup()') + try: + self.history.cleanup() + except Exception as e: + self.logger.error('loop: error while history.cleanup(): ' + str(e)) + self.next_history_cleanup_time = time.time() + _history_cleanup_freq + + with self.lock: + cur_record_id = self.recording.record_id + # self.logger.debug(f'cur_record_id={cur_record_id}') + + if not self.recording.is_started(): + if self.recording.is_waiting(): + try: + if os.path.exists(tempname): + self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..') + try: + os.unlink(tempname) + except OSError as e: + self.logger.exception(e) + self.recording.start(tempname) + with self.history_lock: + self.history[cur_record_id].mark_started(self.recording.start_time) + except Exception as exc: + self.logger.exception(exc) + + # there should not be any errors, but still.. + try: + self.recording.stop() + except Exception as exc: + self.logger.exception(exc) + + with self.history_lock: + self.history[cur_record_id].mark_failed(exc) + + self.logger.debug(f'loop: start exc path: calling increment_id()') + self.recording.increment_id() + else: + if cur >= self.recording.stop_time: + try: + start_time = self.recording.start_time + stop_time = self.recording.stop_time + self.recording.stop() + + saved_name = self.storage.save(tempname, + record_id=cur_record_id, + start_time=int(start_time), + stop_time=int(stop_time)) + + with self.history_lock: + self.history[cur_record_id].mark_finished(stop_time, saved_name) + except Exception as exc: + self.logger.exception(exc) + with self.history_lock: + self.history[cur_record_id].mark_failed(exc) + finally: + self.logger.debug(f'loop: stop exc final path: calling increment_id()') + self.recording.increment_id() + + stopped = True + + if stopped and self.overtime > 0: + self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})') + _overtime = self.overtime + self.overtime = 0 + + related_id = self.record(_overtime) + self.logger.info(f'enqueued another record with id {related_id}') + + if cur_record_id is not None: + with self.history_lock: + self.history[cur_record_id].add_relation(related_id) + + time.sleep(0.2) + + def record(self, duration: int) -> int: + self.logger.debug(f'record: duration={duration}') + with self.lock: + overtime = self.recording.ask_for(duration) + self.logger.debug(f'overtime={overtime}') + + if overtime > self.overtime: + self.overtime = overtime + + if not self.recording.is_started(): + with self.history_lock: + self.history.add(self.recording.record_id) + + return self.recording.record_id + + def stop(self): + self.interrupted = True + + def get_info(self, record_id: int) -> RecordHistoryItem: + with self.history_lock: + return self.history[record_id] + + def forget(self, record_id: int): + with self.history_lock: + self.logger.info(f'forget: removing record {record_id} from history') + self.history.delete(record_id) + + @staticmethod + def get_max_record_time() -> int: + return config['node']['record_max_time'] + diff --git a/src/home/sound/record_client.py b/src/home/sound/record_client.py new file mode 100644 index 0000000..2744a8c --- /dev/null +++ b/src/home/sound/record_client.py @@ -0,0 +1,142 @@ +import time +import logging +import threading +import os.path + +from tempfile import gettempdir +from .record import RecordStatus +from .node_client import SoundNodeClient +from ..util import Addr +from typing import Optional, Callable + + +class RecordClient: + interrupted: bool + logger: logging.Logger + clients: dict[str, SoundNodeClient] + awaiting: dict[str, dict[int, Optional[dict]]] + error_handler: Optional[Callable] + finished_handler: Optional[Callable] + download_on_finish: bool + + def __init__(self, + nodes: dict[str, Addr], + error_handler: Optional[Callable] = None, + finished_handler: Optional[Callable] = None, + download_on_finish=False): + self.interrupted = False + self.logger = logging.getLogger(self.__class__.__name__) + self.clients = {} + self.awaiting = {} + self.download_on_finish = download_on_finish + + self.error_handler = error_handler + self.finished_handler = finished_handler + + self.awaiting_lock = threading.Lock() + + for node, addr in nodes.items(): + self.clients[node] = SoundNodeClient(addr) + self.awaiting[node] = {} + + try: + t = threading.Thread(target=self.loop) + t.daemon = True + t.start() + except (KeyboardInterrupt, SystemExit) as exc: + self.stop() + self.logger.exception(exc) + + def stop(self): + self.interrupted = True + + def loop(self): + while not self.interrupted: + # self.logger.debug('loop: tick') + + for node in self.awaiting.keys(): + with self.awaiting_lock: + record_ids = list(self.awaiting[node].keys()) + if not record_ids: + continue + + self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}') + + cl = self.getclient(node) + del_ids = [] + for rid in record_ids: + info = cl.record_info(rid) + + if info['relations']: + for relid in info['relations']: + self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True) + + status = RecordStatus(info['status']) + if status in (RecordStatus.FINISHED, RecordStatus.ERROR): + if status == RecordStatus.FINISHED: + if self.download_on_finish: + local_fn = self.download(node, rid, info['file']['fileid']) + else: + local_fn = None + self._report_finished(info, local_fn, self.awaiting[node][rid]) + else: + self._report_error(info, self.awaiting[node][rid]) + del_ids.append(rid) + self.logger.debug(f'record {rid}: status {status}') + + if del_ids: + self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list') + with self.awaiting_lock: + for del_id in del_ids: + del self.awaiting[node][del_id] + + time.sleep(5) + + self.logger.info('loop ended') + + def getclient(self, node: str): + return self.clients[node] + + def record(self, + node: str, + duration: int, + userdata: Optional[dict] = None) -> int: + self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}') + + cl = self.getclient(node) + record_id = cl.record(duration)['id'] + self.logger.debug(f'record: request sent, record_id={record_id}') + + self.wait_for_record(node, record_id, userdata) + return record_id + + def wait_for_record(self, + node: str, + record_id: int, + userdata: Optional[dict] = None, + is_relative=False): + with self.awaiting_lock: + if record_id not in self.awaiting[node]: + msg = f'wait_for_record: adding {record_id} to {node}' + if is_relative: + msg += ' (by relation)' + self.logger.debug(msg) + + self.awaiting[node][record_id] = userdata + + def download(self, node: str, record_id: int, fileid: str): + dst = os.path.join(gettempdir(), f'{node}_{fileid}.mp3') + cl = self.getclient(node) + cl.record_download(record_id, dst) + return dst + + def forget(self, node: str, rid: int): + self.getclient(node).record_forget(rid) + + def _report_finished(self, *args): + if self.finished_handler: + self.finished_handler(*args) + + def _report_error(self, *args): + if self.error_handler: + self.error_handler(*args) diff --git a/src/home/sound/storage.py b/src/home/sound/storage.py new file mode 100644 index 0000000..c61f6f6 --- /dev/null +++ b/src/home/sound/storage.py @@ -0,0 +1,155 @@ +import os +import re +import shutil +import logging + +from typing import Optional, Union +from datetime import datetime +from ..util import strgen + +logger = logging.getLogger(__name__) + + +class RecordFile: + start_time: Optional[datetime] + stop_time: Optional[datetime] + record_id: Optional[int] + name: str + file_id: Optional[str] + remote: bool + remote_filesize: int + storage_root: str + + human_date_dmt = '%d.%m.%y' + human_time_fmt = '%H:%M:%S' + + def __init__(self, filename: str, remote=False, remote_filesize=None, storage_root='/'): + self.name = filename + self.storage_root = storage_root + + self.remote = remote + self.remote_filesize = remote_filesize + + m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.mp3$', filename) + if m: + self.start_time = datetime.strptime(m.group(1), RecordStorage.time_fmt) + self.stop_time = datetime.strptime(m.group(2), RecordStorage.time_fmt) + self.record_id = int(m.group(3)) + self.file_id = (m.group(1) + '_' + m.group(2)).replace('-', '_') + else: + logger.warning(f'unexpected filename: {filename}') + self.start_time = None + self.stop_time = None + self.record_id = None + self.file_id = None + + @property + def path(self): + if self.remote: + return RuntimeError('remote recording, can\'t get real path') + + return os.path.realpath(os.path.join( + self.storage_root, self.name + )) + + @property + def start_humantime(self) -> str: + if self.start_time is None: + return '?' + fmt = f'{RecordFile.human_date_dmt} {RecordFile.human_time_fmt}' + return self.start_time.strftime(fmt) + + @property + def stop_humantime(self) -> str: + if self.stop_time is None: + return '?' + fmt = RecordFile.human_time_fmt + if self.start_time.date() != self.stop_time.date(): + fmt = f'{RecordFile.human_date_dmt} {fmt}' + return self.stop_time.strftime(fmt) + + @property + def start_unixtime(self) -> int: + if self.start_time is None: + return 0 + return int(self.start_time.timestamp()) + + @property + def stop_unixtime(self) -> int: + if self.stop_time is None: + return 0 + return int(self.stop_time.timestamp()) + + @property + def filesize(self): + if self.remote: + if self.remote_filesize is None: + raise RuntimeError('file is remote and remote_filesize is not set') + return self.remote_filesize + return os.path.getsize(self.path) + + def __dict__(self) -> dict: + return { + 'start_unixtime': self.start_unixtime, + 'stop_unixtime': self.stop_unixtime, + 'filename': self.name, + 'filesize': self.filesize, + 'fileid': self.file_id, + 'record_id': self.record_id or 0, + } + + +class RecordStorage: + time_fmt = '%d%m%y-%H%M%S' + + def __init__(self, root: str): + self.root = root + + def getfiles(self, as_objects=False) -> Union[list[str], list[RecordFile]]: + files = [] + for name in os.listdir(self.root): + path = os.path.join(self.root, name) + if os.path.isfile(path) and name.endswith('.mp3'): + files.append(name if not as_objects else RecordFile(name, storage_root=self.root)) + return files + + def find(self, file_id: str) -> Optional[RecordFile]: + for name in os.listdir(self.root): + if os.path.isfile(os.path.join(self.root, name)) and name.endswith('.mp3'): + item = RecordFile(name, storage_root=self.root) + if item.file_id == file_id: + return item + return None + + def purge(self): + files = self.getfiles() + if files: + logger = logging.getLogger(self.__name__) + for f in files: + try: + path = os.path.join(self.root, f) + logger.debug(f'purge: deleting {path}') + os.unlink(path) + except OSError as exc: + logger.exception(exc) + + def delete(self, file: RecordFile): + os.unlink(file.path) + + def save(self, + fn: str, + record_id: int, + start_time: int, + stop_time: int) -> RecordFile: + + start_time_s = datetime.fromtimestamp(start_time).strftime(self.time_fmt) + stop_time_s = datetime.fromtimestamp(stop_time).strftime(self.time_fmt) + + dst_fn = f'{start_time_s}_{stop_time_s}_id{record_id}' + if os.path.exists(os.path.join(self.root, dst_fn)): + dst_fn += strgen(4) + dst_fn += '.mp3' + dst_path = os.path.join(self.root, dst_fn) + + shutil.move(fn, dst_path) + return RecordFile(dst_fn, storage_root=self.root) diff --git a/src/home/soundsensor/__init__.py b/src/home/soundsensor/__init__.py new file mode 100644 index 0000000..30052f8 --- /dev/null +++ b/src/home/soundsensor/__init__.py @@ -0,0 +1,22 @@ +import importlib + +__all__ = [ + 'SoundSensorNode', + 'SoundSensorHitHandler', + 'SoundSensorServer', + 'SoundSensorServerGuardClient' +] + + +def __getattr__(name): + if name in __all__: + if name == 'SoundSensorNode': + file = 'node' + elif name == 'SoundSensorServerGuardClient': + file = 'server_client' + else: + file = 'server' + module = importlib.import_module(f'.{file}', __name__) + return getattr(module, name) + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/home/soundsensor/__init__.pyi b/src/home/soundsensor/__init__.pyi new file mode 100644 index 0000000..cb34972 --- /dev/null +++ b/src/home/soundsensor/__init__.pyi @@ -0,0 +1,8 @@ +from .server import ( + SoundSensorHitHandler as SoundSensorHitHandler, + SoundSensorServer as SoundSensorServer, +) +from .server_client import ( + SoundSensorServerGuardClient as SoundSensorServerGuardClient +) +from .node import SoundSensorNode as SoundSensorNode diff --git a/src/home/soundsensor/node.py b/src/home/soundsensor/node.py new file mode 100644 index 0000000..b4b8fbc --- /dev/null +++ b/src/home/soundsensor/node.py @@ -0,0 +1,73 @@ +import logging +import threading + +from typing import Optional +from time import sleep +from ..util import stringify, send_datagram, Addr + +from pyA20.gpio import gpio +from pyA20.gpio import port as gpioport + +logger = logging.getLogger(__name__) + + +class SoundSensorNode: + def __init__(self, + name: str, + pinname: str, + server_addr: Optional[Addr], + delay=0.005): + + if not hasattr(gpioport, pinname): + raise ValueError(f'invalid pin {pinname}') + + self.pin = getattr(gpioport, pinname) + self.name = name + self.delay = delay + + self.server_addr = server_addr + + self.hits = 0 + self.hitlock = threading.Lock() + + self.interrupted = False + + def run(self): + try: + t = threading.Thread(target=self.sensor_reader) + t.daemon = True + t.start() + + while True: + with self.hitlock: + hits = self.hits + self.hits = 0 + + if hits > 0: + try: + if self.server_addr is not None: + send_datagram(stringify([self.name, hits]), self.server_addr) + else: + logger.debug(f'server reporting disabled, skipping reporting {hits} hits') + except OSError as exc: + logger.exception(exc) + + sleep(1) + + except (KeyboardInterrupt, SystemExit) as e: + self.interrupted = True + logger.info(str(e)) + + def sensor_reader(self): + gpio.init() + gpio.setcfg(self.pin, gpio.INPUT) + gpio.pullup(self.pin, gpio.PULLUP) + + while not self.interrupted: + state = gpio.input(self.pin) + sleep(self.delay) + + if not state: + with self.hitlock: + logger.debug('got a hit') + self.hits += 1 diff --git a/src/home/soundsensor/server.py b/src/home/soundsensor/server.py new file mode 100644 index 0000000..490fc36 --- /dev/null +++ b/src/home/soundsensor/server.py @@ -0,0 +1,125 @@ +import asyncio +import json +import logging +import threading + +from ..config import config +from aiohttp import web +from aiohttp.web_exceptions import ( + HTTPNotFound +) + +from typing import Type +from ..util import Addr, stringify, format_tb + +logger = logging.getLogger(__name__) + + +class SoundSensorHitHandler(asyncio.DatagramProtocol): + def datagram_received(self, data, addr): + try: + data = json.loads(data) + except json.JSONDecodeError as e: + logger.error('failed to parse json datagram') + logger.exception(e) + return + + try: + name, hits = data + except (ValueError, IndexError) as e: + logger.error('failed to unpack data') + logger.exception(e) + return + + self.handler(name, hits) + + def handler(self, name: str, hits: int): + pass + + +class SoundSensorServer: + def __init__(self, + addr: Addr, + handler_impl: Type[SoundSensorHitHandler]): + self.addr = addr + self.impl = handler_impl + + self._recording_lock = threading.Lock() + self._recording_enabled = True + + if self.guard_control_enabled(): + if 'guard_recording_default' in config['server']: + self._recording_enabled = config['server']['guard_recording_default'] + + def guard_control_enabled(self) -> bool: + return 'guard_control' in config['server'] and config['server']['guard_control'] is True + + def set_recording(self, enabled: bool): + with self._recording_lock: + self._recording_enabled = enabled + + def is_recording_enabled(self) -> bool: + with self._recording_lock: + return self._recording_enabled + + def run(self): + if self.guard_control_enabled(): + t = threading.Thread(target=self.run_guard_server) + t.daemon = True + t.start() + + loop = asyncio.get_event_loop() + t = loop.create_datagram_endpoint(self.impl, local_addr=self.addr) + loop.run_until_complete(t) + loop.run_forever() + + def run_guard_server(self): + routes = web.RouteTableDef() + + def ok(data=None): + if data is None: + data = 1 + response = {'response': data} + return web.json_response(response, dumps=stringify) + + @web.middleware + async def errors_handler_middleware(request, handler): + try: + response = await handler(request) + return response + except HTTPNotFound: + return web.json_response({'error': 'not found'}, status=404) + except Exception as exc: + data = { + 'error': exc.__class__.__name__, + 'message': exc.message if hasattr(exc, 'message') else str(exc) + } + tb = format_tb(exc) + if tb: + data['stacktrace'] = tb + + return web.json_response(data, status=500) + + @routes.post('/guard/enable') + async def guard_enable(request): + self.set_recording(True) + return ok() + + @routes.post('/guard/disable') + async def guard_disable(request): + self.set_recording(False) + return ok() + + @routes.get('/guard/status') + async def guard_status(request): + return ok({'enabled': self.is_recording_enabled()}) + + asyncio.set_event_loop(asyncio.new_event_loop()) # need to create new event loop in new thread + app = web.Application() + app.add_routes(routes) + app.middlewares.append(errors_handler_middleware) + + web.run_app(app, + host=self.addr[0], + port=self.addr[1], + handle_signals=False) # handle_signals=True doesn't work in separate thread diff --git a/src/home/soundsensor/server_client.py b/src/home/soundsensor/server_client.py new file mode 100644 index 0000000..7eef996 --- /dev/null +++ b/src/home/soundsensor/server_client.py @@ -0,0 +1,38 @@ +import requests +import logging + +from ..util import Addr +from ..api.errors import ApiResponseError + + +class SoundSensorServerGuardClient: + def __init__(self, addr: Addr): + self.endpoint = f'http://{addr[0]}:{addr[1]}' + self.logger = logging.getLogger(self.__class__.__name__) + + def guard_enable(self): + return self._call('guard/enable', is_post=True) + + def guard_disable(self): + return self._call('guard/disable', is_post=True) + + def guard_status(self): + return self._call('guard/status') + + def _call(self, + method: str, + is_post=False): + + url = f'{self.endpoint}/{method}' + self.logger.debug(f'calling {url}') + + r = requests.get(url) if not is_post else requests.post(url) + + if r.status_code != 200: + response = r.json() + raise ApiResponseError(status_code=r.status_code, + error_type=response['error'], + error_message=response['message'] or None, + error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None) + + return r.json()['response'] diff --git a/src/home/util.py b/src/home/util.py new file mode 100644 index 0000000..2c43cb0 --- /dev/null +++ b/src/home/util.py @@ -0,0 +1,213 @@ +import json +import socket +import time +import requests +import subprocess +import traceback +import logging +import string +import random + +from .config import config +from datetime import datetime +from typing import Tuple, Optional + +Addr = Tuple[str, int] # network address type (host, port) + +logger = logging.getLogger(__name__) + + +# https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks +def chunks(lst, n): + """Yield successive n-sized chunks from lst.""" + for i in range(0, len(lst), n): + yield lst[i:i + n] + + +def json_serial(obj): + """JSON serializer for datetime objects""" + if isinstance(obj, datetime): + return obj.timestamp() + raise TypeError("Type %s not serializable" % type(obj)) + + +def stringify(v) -> str: + return json.dumps(v, separators=(',', ':'), default=json_serial) + + +def ipv4_valid(ip: str) -> bool: + try: + socket.inet_aton(ip) + return True + except socket.error: + return False + + +def parse_addr(addr: str) -> Addr: + if addr.count(':') != 1: + raise ValueError('invalid host:port format') + + host, port = addr.split(':') + if not ipv4_valid(host): + raise ValueError('invalid ipv4 address') + + port = int(port) + if not 0 <= port <= 65535: + raise ValueError('invalid port') + + return host, port + + +def strgen(n: int): + return ''.join(random.choices(string.ascii_letters + string.digits, k=n)) + + +class MySimpleSocketClient: + host: str + port: int + + def __init__(self, host: str, port: int): + self.host = host + self.port = port + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect((self.host, self.port)) + self.sock.settimeout(5) + + def __del__(self): + self.sock.close() + + def write(self, line: str) -> None: + self.sock.sendall((line + '\r\n').encode()) + + def read(self) -> str: + buf = bytearray() + while True: + buf.extend(self.sock.recv(256)) + if b'\r\n' in buf: + break + + response = buf.decode().strip() + return response + + +def send_datagram(message: str, addr: Addr) -> None: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.sendto(message.encode(), addr) + + +def send_telegram(text: str, + parse_mode: str = None, + disable_web_page_preview: bool = False, + ): + data = { + 'chat_id': config['telegram']['chat_id'], + 'text': text + } + + if parse_mode is not None: + data['parse_mode'] = parse_mode + elif 'parse_mode' in config['telegram']: + data['parse_mode'] = config['telegram']['parse_mode'] + + if disable_web_page_preview or 'disable_web_page_preview' in config['telegram']: + data['disable_web_page_preview'] = 1 + + r = requests.post('https://api.telegram.org/bot%s/sendMessage' % config['telegram']['token'], data=data) + + if r.status_code != 200: + logger.error(r.text) + raise RuntimeError("telegram returned %d" % r.status_code) + + +def format_tb(exc) -> Optional[list[str]]: + tb = traceback.format_tb(exc.__traceback__) + if not tb: + return None + + tb = list(map(lambda s: s.strip(), tb)) + tb.reverse() + if tb[0][-1:] == ':': + tb[0] = tb[0][:-1] + + return tb + + +class ChildProcessInfo: + pid: int + cmd: str + + def __init__(self, + pid: int, + cmd: str): + self.pid = pid + self.cmd = cmd + + +def find_child_processes(ppid: int) -> list[ChildProcessInfo]: + p = subprocess.run(['pgrep', '-P', str(ppid), '--list-full'], capture_output=True) + if p.returncode != 0: + raise OSError(f'pgrep returned {p.returncode}') + + children = [] + + lines = p.stdout.decode().strip().split('\n') + for line in lines: + try: + space_idx = line.index(' ') + except ValueError as exc: + logger.exception(exc) + continue + + pid = int(line[0:space_idx]) + cmd = line[space_idx+1:] + + children.append(ChildProcessInfo(pid, cmd)) + + return children + + +class Stopwatch: + elapsed: float + time_started: Optional[float] + + def __init__(self): + self.elapsed = 0 + self.time_started = None + + def go(self): + if self.time_started is not None: + raise StopwatchError('stopwatch was already started') + + self.time_started = time.time() + + def pause(self): + if self.time_started is None: + raise StopwatchError('stopwatch was paused') + + self.elapsed += time.time() - self.time_started + self.time_started = None + + def get_elapsed_time(self): + elapsed = self.elapsed + if self.time_started is not None: + elapsed += time.time() - self.time_started + return elapsed + + def reset(self): + self.time_started = None + self.elapsed = 0 + + def is_paused(self): + return self.time_started is None + + +class StopwatchError(RuntimeError): + pass + + +def filesize_fmt(num, suffix="B") -> str: + for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: + if abs(num) < 1024.0: + return f"{num:3.1f} {unit}{suffix}" + num /= 1024.0 + return f"{num:.1f} Yi{suffix}"
\ No newline at end of file diff --git a/src/home/web_api/__init__.py b/src/home/web_api/__init__.py new file mode 100644 index 0000000..20655da --- /dev/null +++ b/src/home/web_api/__init__.py @@ -0,0 +1 @@ +from .web_api import get_app
\ No newline at end of file diff --git a/src/home/web_api/web_api.py b/src/home/web_api/web_api.py new file mode 100644 index 0000000..c75c031 --- /dev/null +++ b/src/home/web_api/web_api.py @@ -0,0 +1,213 @@ +import logging +import json +import os.path + +from datetime import datetime, timedelta +from typing import Optional + +from werkzeug.exceptions import HTTPException +from flask import Flask, request, Response + +from ..config import config, is_development_mode +from ..database import BotsDatabase, SensorsDatabase +from ..util import stringify, format_tb +from ..api.types import BotType, TemperatureSensorLocation, SoundSensorLocation +from ..sound import RecordStorage + +db: Optional[BotsDatabase] = None +sensors_db: Optional[SensorsDatabase] = None +app = Flask(__name__) +logger = logging.getLogger(__name__) + + +class AuthError(Exception): + def __init__(self, message: str): + super().__init__() + self.message = message + + +# api methods +# ----------- + +@app.route("/") +def hello(): + message = "nothing here, keep lurking" + if is_development_mode(): + message += ' (dev mode)' + return message + + +@app.route('/api/sensors/data/', methods=['GET']) +def sensors_data(): + hours = request.args.get('hours', type=int, default=1) + sensor = TemperatureSensorLocation(request.args.get('sensor', type=int)) + + if hours < 1 or hours > 24: + raise ValueError('invalid hours value') + + dt_to = datetime.now() + dt_from = dt_to - timedelta(hours=hours) + + data = sensors_db.get_temperature_recordings(sensor, (dt_from, dt_to)) + return ok(data) + + +@app.route('/api/sound_sensors/hits/', methods=['GET']) +def get_sound_sensors_hits(): + location = SoundSensorLocation(request.args.get('location', type=int)) + + after = request.args.get('after', type=int) + kwargs = {} + if after is None: + last = request.args.get('last', type=int) + if last is None: + raise ValueError('you must pass `after` or `last` params') + else: + if not 0 < last < 100: + raise ValueError('invalid last value: must be between 0 and 100') + kwargs['last'] = last + else: + kwargs['after'] = datetime.fromtimestamp(after) + + data = db.get_sound_hits(location, **kwargs) + return ok(data) + + +@app.route('/api/sound_sensors/hits/', methods=['POST']) +def post_sound_sensors_hits(): + hits = [] + for hit, count in json.loads(request.form.get('hits', type=str)): + if not hasattr(SoundSensorLocation, hit.upper()): + raise ValueError('invalid sensor location') + if count < 1: + raise ValueError(f'invalid count: {count}') + hits.append((SoundSensorLocation[hit.upper()], count)) + + db.add_sound_hits(hits, datetime.now()) + return ok() + + +@app.route('/api/logs/bot-request/', methods=['POST']) +def log_bot_request(): + user_id = request.form.get('user_id', type=int, default=0) + message = request.form.get('message', type=str, default='') + bot = BotType(request.form.get('bot', type=int)) + + # validate message + if message.strip() == '': + raise ValueError('message can\'t be empty') + + # add record to the database + db.add_request(bot, user_id, message) + + return ok() + + +@app.route('/api/logs/openwrt/', methods=['POST']) +def log_openwrt(): + logs = request.form.get('logs', type=str, default='') + + # validate it + logs = json.loads(logs) + assert type(logs) is list, "invalid json data (list expected)" + + lines = [] + for line in logs: + assert type(line) is list, "invalid line type (list expected)" + assert len(line) == 2, f"expected 2 items in line, got {len(line)}" + assert type(line[0]) is int, "invalid line[0] type (int expected)" + assert type(line[1]) is str, "invalid line[1] type (str expected)" + + lines.append(( + datetime.fromtimestamp(line[0]), + line[1] + )) + + db.add_openwrt_logs(lines) + return ok() + + +@app.route('/api/recordings/list/', methods=['GET']) +def recordings_list(): + extended = request.args.get('extended', type=bool, default=False) + node = request.args.get('node', type=str) + + root = os.path.join(config['recordings']['directory'], node) + if not os.path.isdir(root): + raise ValueError(f'invalid node {node}: no such directory') + + storage = RecordStorage(root) + files = storage.getfiles(as_objects=extended) + if extended: + files = list(map(lambda file: file.__dict__(), files)) + + return ok(files) + + +# internal functions +# ------------------ + +def ok(data=None) -> Response: + response = {'result': 'ok'} + if data is not None: + response['data'] = data + return Response(stringify(response), + mimetype='application/json') + + +def err(e) -> Response: + error = { + 'type': e.__class__.__name__, + 'message': e.message if hasattr(e, 'message') else str(e) + } + if is_development_mode(): + tb = format_tb(e) + if tb: + error['stacktrace'] = tb + data = { + 'result': 'error', + 'error': error + } + return Response(stringify(data), mimetype='application/json') + + +def get_token() -> Optional[str]: + name = 'X-Token' + if name in request.headers: + return request.headers[name] + + token = request.args.get('token', default='', type=str) + if token != '': + return token + + return None + + +@app.errorhandler(Exception) +def handle_exception(e): + if isinstance(e, HTTPException): + return e + return err(e), 500 + + +@app.before_request +def validate_token() -> None: + if request.path.startswith('/api/') and not is_development_mode(): + token = get_token() + if not token: + raise AuthError(f'token is missing') + + if token != config['api']['token']: + raise AuthError('invalid token') + + +def get_app(): + global db, sensors_db + + config.load('web_api') + app.config.from_mapping(**config['flask']) + + db = BotsDatabase() + sensors_db = SensorsDatabase() + + return app |