diff options
Diffstat (limited to 'bin')
36 files changed, 6531 insertions, 0 deletions
diff --git a/bin/__py_include.py b/bin/__py_include.py new file mode 100644 index 0000000..8f98830 --- /dev/null +++ b/bin/__py_include.py @@ -0,0 +1,9 @@ +import sys +import os.path + +for _name in ('include/py',): + sys.path.extend([ + os.path.realpath( + os.path.join(os.path.dirname(os.path.join(__file__)), '..', _name) + ) + ])
\ No newline at end of file diff --git a/bin/camera_node.py b/bin/camera_node.py new file mode 100755 index 0000000..1485557 --- /dev/null +++ b/bin/camera_node.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +import asyncio +import time +import __py_include + +from homekit.config import config +from homekit.media import MediaNodeServer, ESP32CameraRecordStorage, CameraRecorder +from homekit.camera import CameraType, esp32 +from homekit.util import Addr +from homekit import http + + +# Implements HTTP API for a camera. +# --------------------------------- + +class ESP32CameraNodeServer(MediaNodeServer): + def __init__(self, web_addr: Addr, *args, **kwargs): + super().__init__(*args, **kwargs) + self.last_settings_sync = 0 + + self.web = esp32.WebClient(web_addr) + self.get('/capture/', self.capture) + + async def capture(self, req: http.Request): + await self.sync_settings_if_needed() + + try: + with_flash = int(req.query['with_flash']) + except KeyError: + with_flash = 0 + + if with_flash: + await self.web.setflash(True) + await asyncio.sleep(0.5) + + bytes = (await self.web.capture()).read() + + if with_flash: + await asyncio.sleep(0.5) + await self.web.setflash(False) + + res = http.StreamResponse() + res.content_type = 'image/jpeg' + res.content_length = len(bytes) + + await res.prepare(req) + await res.write(bytes) + await res.write_eof() + + return res + + async def do_record(self, request: http.Request): + await self.sync_settings_if_needed() + + # sync settings + return await super().do_record(request) + + async def sync_settings_if_needed(self): + if self.last_settings_sync != 0 and time.time() - self.last_settings_sync < 300: + return + changed = await self.web.syncsettings(config['camera']['settings']) + if changed: + self.logger.debug('sync_settings_if_needed: some settings were changed, sleeping for 0.4 sec') + await asyncio.sleep(0.4) + self.last_settings_sync = time.time() + + +if __name__ == '__main__': + config.load_app('camera_node') + + recorder_kwargs = {} + camera_type = CameraType(config['camera']['type']) + if camera_type == CameraType.ESP32: + recorder_kwargs['stream_addr'] = config.get_addr('camera.web_addr') # this is not a mistake, we don't use stream_addr for esp32-cam anymore + storage = ESP32CameraRecordStorage(config['node']['storage']) + else: + raise RuntimeError(f'unsupported camera type {camera_type}') + + recorder = CameraRecorder(storage=storage, + camera_type=camera_type, + **recorder_kwargs) + recorder.start_thread() + + server = ESP32CameraNodeServer( + recorder=recorder, + storage=storage, + web_addr=config.get_addr('camera.web_addr'), + addr=config.get_addr('node.listen')) + server.run() diff --git a/bin/electricity_calc.py b/bin/electricity_calc.py new file mode 100755 index 0000000..cff2327 --- /dev/null +++ b/bin/electricity_calc.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +import logging +import os +import sys +import inspect +import __py_include + +from homekit.config import config # do not remove this import! +from datetime import datetime, timedelta +from logging import Logger +from homekit.database import InverterDatabase +from argparse import ArgumentParser, ArgumentError +from typing import Optional + +_logger: Optional[Logger] = None +_progname = os.path.basename(__file__) +_is_verbose = False + +fmt_time = '%Y-%m-%d %H:%M:%S' +fmt_date = '%Y-%m-%d' + + +def method_usage() -> str: + # https://stackoverflow.com/questions/2654113/how-to-get-the-callers-method-name-in-the-called-method + curframe = inspect.currentframe() + calframe = inspect.getouterframes(curframe, 2) + return f'{_progname} {calframe[1][3]} [ARGS]' + + +def fmt_escape(s: str): + return s.replace('%', '%%') + + +def setup_logging(verbose: bool): + global _is_verbose + + logging_level = logging.INFO if not verbose else logging.DEBUG + logging.basicConfig(level=logging_level) + + _is_verbose = verbose + + +class SubParser: + def __init__(self, description: str, usage: str): + self.parser = ArgumentParser( + description=description, + usage=usage + ) + + def add_argument(self, *args, **kwargs): + self.parser.add_argument(*args, **kwargs) + + def parse_args(self): + self.add_argument('--verbose', '-V', action='store_true', + help='enable debug logs') + + args = self.parser.parse_args(sys.argv[2:]) + setup_logging(args.verbose) + + return args + + +def strptime_auto(s: str) -> datetime: + e = None + for fmt in (fmt_time, fmt_date): + try: + return datetime.strptime(s, fmt) + except ValueError as _e: + e = _e + raise e + + +def get_dt_from_to_arguments(parser): + parser.add_argument('--from', type=str, dest='date_from', required=True, + help=f'From date, format: {fmt_escape(fmt_time)} or {fmt_escape(fmt_date)}') + parser.add_argument('--to', type=str, dest='date_to', default='now', + help=f'To date, format: {fmt_escape(fmt_time)}, {fmt_escape(fmt_date)}, \'now\' or \'24h\'') + arg = parser.parse_args() + + dt_from = strptime_auto(arg.date_from) + + if arg.date_to == 'now': + dt_to = datetime.now() + elif arg.date_to == '24h': + dt_to = dt_from + timedelta(days=1) + else: + dt_to = strptime_auto(arg.date_to) + + return dt_from, dt_to + + +def print_intervals(intervals): + for interval in intervals: + start, end = interval + buf = f'{start.strftime(fmt_time)} .. ' + if end: + buf += f'{end.strftime(fmt_time)}' + else: + buf += 'now' + + print(buf) + + +class Electricity(): + def __init__(self): + global _logger + + methods = [func.replace('_', '-') + for func in dir(Electricity) + if callable(getattr(Electricity, func)) and not func.startswith('_') and func != 'query'] + + parser = ArgumentParser( + usage=f'{_progname} METHOD [ARGS]' + ) + parser.add_argument('method', choices=methods, + help='Method to run') + parser.add_argument('--verbose', '-V', action='store_true', + help='enable debug logs') + + argv = sys.argv[1:2] + for arg in ('-V', '--verbose'): + if arg in sys.argv: + argv.append(arg) + args = parser.parse_args(argv) + + setup_logging(args.verbose) + self.db = InverterDatabase() + + method = args.method.replace('-', '_') + getattr(self, method)() + + def get_grid_connected_intervals(self): + parser = SubParser('Returns datetime intervals when grid was connected', method_usage()) + dt_from, dt_to = get_dt_from_to_arguments(parser) + + intervals = self.db.get_grid_connected_intervals(dt_from, dt_to) + print_intervals(intervals) + + def get_grid_used_intervals(self): + parser = SubParser('Returns datetime intervals when power grid was actually used', method_usage()) + dt_from, dt_to = get_dt_from_to_arguments(parser) + + intervals = self.db.get_grid_used_intervals(dt_from, dt_to) + print_intervals(intervals) + + def get_grid_consumed_energy(self): + parser = SubParser('Returns sum of energy consumed from util grid', method_usage()) + dt_from, dt_to = get_dt_from_to_arguments(parser) + + wh = self.db.get_grid_consumed_energy(dt_from, dt_to) + print('%.2f' % wh,) + + def get_consumed_energy(self): + parser = SubParser('Returns total consumed energy', method_usage()) + dt_from, dt_to = get_dt_from_to_arguments(parser) + + wh = self.db.get_consumed_energy(dt_from, dt_to) + print('%.2f' % wh,) + + +if __name__ == '__main__': + try: + Electricity() + except Exception as e: + _logger.exception(e) + sys.exit(1) diff --git a/bin/esp32_capture.py b/bin/esp32_capture.py new file mode 100755 index 0000000..839114d --- /dev/null +++ b/bin/esp32_capture.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +import asyncio +import logging +import os.path +import __py_include + +from argparse import ArgumentParser +from homekit.camera.esp32 import WebClient +from homekit.util import Addr +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from datetime import datetime +from typing import Optional + +logger = logging.getLogger(__name__) +cam: Optional[WebClient] = None + + +class ESP32Capture: + def __init__(self, addr: Addr, interval: float, output_directory: str): + self.logger = logging.getLogger(self.__class__.__name__) + self.client = WebClient(addr) + self.output_directory = output_directory + self.interval = interval + + self.scheduler = AsyncIOScheduler() + self.scheduler.add_job(self.capture, 'interval', seconds=arg.interval) + self.scheduler.start() + + async def capture(self): + self.logger.debug('capture: start') + now = datetime.now() + filename = os.path.join( + self.output_directory, + now.strftime('%Y-%m-%d-%H:%M:%S.%f.jpg') + ) + if not await self.client.capture(filename): + self.logger.error('failed to capture') + self.logger.debug('capture: done') + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--addr', type=str, required=True) + parser.add_argument('--output-directory', type=str, required=True) + parser.add_argument('--interval', type=float, default=0.5) + parser.add_argument('--verbose', action='store_true') + arg = parser.parse_args() + + if arg.verbose: + logging.basicConfig(level=logging.DEBUG) + + loop = asyncio.get_event_loop() + + ESP32Capture(Addr.fromstring(arg.addr), arg.interval, arg.output_directory) + try: + loop.run_forever() + except KeyboardInterrupt: + pass diff --git a/bin/esp32cam_capture_diff_node.py b/bin/esp32cam_capture_diff_node.py new file mode 100755 index 0000000..d664c6d --- /dev/null +++ b/bin/esp32cam_capture_diff_node.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +import asyncio +import logging +import os.path +import tempfile +import __py_include +import homekit.telegram.aio as telegram + +from homekit.config import config +from homekit.camera.esp32 import WebClient +from homekit.util import Addr, send_datagram, stringify +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from typing import Optional + +logger = logging.getLogger(__name__) +cam: Optional[WebClient] = None + + +async def pyssim(fn1: str, fn2: str) -> float: + args = [config['pyssim']['bin']] + if 'width' in config['pyssim']: + args.extend(['--width', str(config['pyssim']['width'])]) + if 'height' in config['pyssim']: + args.extend(['--height', str(config['pyssim']['height'])]) + args.extend([fn1, fn2]) + proc = await asyncio.create_subprocess_exec(*args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + stdout, stderr = await proc.communicate() + if proc.returncode != 0: + logger.error(f'pyssim({fn1}, {fn2}): pyssim returned {proc.returncode}, stderr: {stderr.decode().strip()}') + + return float(stdout.decode().strip()) + + +class ESP32CamCaptureDiffNode: + def __init__(self): + self.client = WebClient(Addr.fromstring(config['esp32cam_web_addr'])) + self.directory = tempfile.gettempdir() + self.nextpic = 1 + self.first = True + self.server_addr = Addr.fromstring(config['node']['server_addr']) + + self.scheduler = AsyncIOScheduler() + self.scheduler.add_job(self.capture, 'interval', seconds=config['node']['interval']) + self.scheduler.start() + + async def capture(self): + logger.debug('capture: start') + + filename = self.getfilename() + if not await self.client.capture(os.path.join(self.directory, filename)): + logger.error('failed to capture') + return + + self.nextpic = 1 if self.nextpic == 2 else 2 + if not self.first: + second_filename = os.path.join(self.directory, self.getfilename()) + score = await pyssim(filename, second_filename) + logger.debug(f'pyssim: score={score}') + if score < config['pyssim']['threshold']: + logger.info(f'score = {score}, informing central server') + send_datagram(stringify([config['node']['name'], 2]), self.server_addr) + + # send to telegram + if 'telegram' in config: + await telegram.send_message(f'pyssim: score={score}') + await telegram.send_photo(filename) + await telegram.send_photo(second_filename) + + self.first = False + + logger.debug('capture: done') + + def getfilename(self): + return os.path.join(self.directory, f'{self.nextpic}.jpg') + + +if __name__ == '__main__': + config.load_app('esp32cam_capture_diff_node') + + loop = asyncio.get_event_loop() + ESP32CamCaptureDiffNode() + try: + loop.run_forever() + except KeyboardInterrupt: + pass diff --git a/bin/gpiorelayd.py b/bin/gpiorelayd.py new file mode 100755 index 0000000..89ba78e --- /dev/null +++ b/bin/gpiorelayd.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +import logging +import os +import sys +import __py_include + +from argparse import ArgumentParser +from homekit.util import Addr +from homekit.config import config +from homekit.relay.sunxi_h3_server import RelayServer + +logger = logging.getLogger(__name__) + + +if __name__ == '__main__': + if os.getegid() != 0: + sys.exit('Must be run as root.') + + parser = ArgumentParser() + parser.add_argument('--pin', type=str, required=True, + help='name of GPIO pin of Allwinner H3 sunxi board') + parser.add_argument('--listen', type=str, required=True, + help='address to listen to, in ip:port format') + + arg = config.load_app(no_config=True, parser=parser) + listen = Addr.fromstring(arg.listen) + + try: + RelayServer(pinname=arg.pin, addr=listen).run() + except KeyboardInterrupt: + logger.info('Exiting...') diff --git a/bin/inverter_bot.py b/bin/inverter_bot.py new file mode 100755 index 0000000..032f513 --- /dev/null +++ b/bin/inverter_bot.py @@ -0,0 +1,961 @@ +#!/usr/bin/env python3 +import logging +import re +import datetime +import json +import itertools +import sys +import asyncio +import __py_include + +from inverterd import Format, InverterError +from html import escape +from typing import Optional, Tuple, Union + +from homekit.util import chunks +from homekit.config import config, AppConfigUnit +from homekit.telegram import bot +from homekit.telegram.config import TelegramBotConfig, TelegramUserListType +from homekit.inverter import ( + wrapper_instance as inverter, + beautify_table, + InverterMonitor, +) +from homekit.inverter.types import ( + ChargingEvent, + ACPresentEvent, + BatteryState, + ACMode, + OutputSourcePriority +) +from homekit.database.inverter_time_formats import FormatDate +from homekit.api import WebApiClient +from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton + + +if __name__ != '__main__': + print(f'this script can not be imported as module', file=sys.stderr) + sys.exit(1) + + +db = None +LT = escape('<=') +flags_map = { + 'buzzer': 'BUZZ', + 'overload_bypass': 'OLBP', + 'escape_to_default_screen_after_1min_timeout': 'LCDE', + 'overload_restart': 'OLRS', + 'over_temp_restart': 'OTRS', + 'backlight_on': 'BLON', + 'alarm_on_on_primary_source_interrupt': 'ALRM', + 'fault_code_record': 'FTCR', +} +logger = logging.getLogger(__name__) + + +class InverterBotConfig(AppConfigUnit, TelegramBotConfig): + NAME = 'inverter_bot' + + @classmethod + def schema(cls) -> Optional[dict]: + acmode_item_schema = { + 'thresholds': { + 'type': 'list', + 'required': True, + 'schema': { + 'type': 'list', + 'min': 40, + 'max': 60 + }, + }, + 'initial_current': {'type': 'integer'} + } + + return { + **super(TelegramBotConfig).schema(), + 'ac_mode': { + 'type': 'dict', + 'required': True, + 'schema': { + 'generator': acmode_item_schema, + 'utilities': acmode_item_schema + } + }, + 'monitor': { + 'type': 'dict', + 'required': True, + 'schema': { + 'vlow': {'type': 'integer', 'required': True}, + 'vcrit': {'type': 'integer', 'required': True}, + 'gen_currents': {'type': 'list', 'schema': {'type': 'integer'}, 'required': True}, + 'gen_raise_intervals': {'type': 'list', 'schema': {'type': 'integer'}, 'required': True}, + 'gen_cur30_v_limit': {'type': 'float', 'required': True}, + 'gen_cur20_v_limit': {'type': 'float', 'required': True}, + 'gen_cur10_v_limit': {'type': 'float', 'required': True}, + 'gen_floating_v': {'type': 'integer', 'required': True}, + 'gen_floating_time_max': {'type': 'integer', 'required': True} + } + } + } + + +config.load_app(InverterBotConfig) + +bot.initialize() +bot.lang.ru( + socket="В розетке", + status='Статус', + generation='Генерация', + priority='Приоритет', + battery="АКБ", + load="Нагрузка", + generator="Генератор", + utilities="Столб", + consumption="Статистика потребления", + settings="Настройки", + done="Готово", + unexpected_callback_data="Ошибка: неверные данные", + invalid_input="Неверное значение", + invalid_mode="Invalid mode", + + flags_press_button='Нажмите кнопку для переключения настройки', + flags_fail='Не удалось установить настройку', + flags_invalid='Неизвестная настройка', + + # generation + gen_input_power='Зарядная мощность', + + # settings + settings_msg="Что вы хотите настроить?", + settings_osp='Приоритет питания нагрузки', + settings_ac_preset="Применить шаблон режима AC", + settings_bat_thresholds="Пороги заряда АКБ от AC", + settings_bat_cut_off_voltage="Порог отключения АКБ", + settings_ac_max_charging_current="Максимальный ток заряда от AC", + + settings_osp_msg="Установите приоритет:", + settings_osp_sub='Solar-Utility-Battery', + settings_osp_sbu='Solar-Battery-Utility', + + settings_select_bottom_threshold="Выберите нижний порог:", + settings_select_upper_threshold="Выберите верхний порог:", + settings_select_max_current='Выберите максимальный ток:', + settings_enter_cutoff_voltage=f'Введите напряжение V, где 40.0 {LT} V {LT} 48.0', + + # time and date + today='Сегодня', + yday1='Вчера', + yday2='Позавчера', + for_7days='За 7 дней', + for_30days='За 30 дней', + # to_select_interval='Выбрать интервал', + + # consumption + consumption_msg="Выберите тип:", + consumption_total="Домашние приборы", + consumption_grid="Со столба", + consumption_select_interval='Выберите период:', + consumption_request_sent="⏳ Запрос отправлен...", + + # status + charging_at=', ', + pd_charging='заряжается', + pd_discharging='разряжается', + pd_nothing='не используется', + + # flags + flag_buzzer='Звуковой сигнал', + flag_overload_bypass='Разрешить перегрузку', + flag_escape_to_default_screen_after_1min_timeout='Возврат на главный экран через 1 минуту', + flag_overload_restart='Перезапуск при перегрузке', + flag_over_temp_restart='Перезапуск при перегреве', + flag_backlight_on='Подсветка экрана', + flag_alarm_on_on_primary_source_interrupt='Сигнал при разрыве основного источника питания', + flag_fault_code_record='Запись кодов ошибок', + + # commands + setbatuv_v=f'напряжение, 40.0 {LT} V {LT} 48.0', + setgenct_cv=f'напряжение включения заряда, 44 {LT} CV {LT} 51', + setgenct_dv=f'напряжение отключения заряда, 48 {LT} DV {LT} 58', + setgencc_a='максимальный ток заряда, допустимые значения: %s', + + # monitor + chrg_evt_started='✅ Начали заряжать от генератора.', + chrg_evt_finished='✅ Зарядили. Генератор пора выключать.', + chrg_evt_disconnected='ℹ️ Генератор отключен.', + chrg_evt_current_changed='ℹ️ Ток заряда от генератора установлен в %d A.', + chrg_evt_not_charging='ℹ️ Генератор подключен, но не заряжает.', + chrg_evt_na_solar='⛔️ Генератор подключен, но аккумуляторы не заряжаются из-за подключенных панелей.', + chrg_evt_mostly_charged='✅ Аккумуляторы более-менее заряжены, генератор пора выключать.', + battery_level_changed='Уровень заряда АКБ: <b>%s %s</b> (<b>%0.1f V</b> при нагрузке <b>%d W</b>)', + error_message='<b>Ошибка:</b> %s.', + + util_chrg_evt_started='✅ Начали заряжать от столба.', + util_chrg_evt_stopped='ℹ️ Перестали заряжать от столба.', + util_chrg_evt_stopped_solar='ℹ️ Перестали заряжать от столба из-за подключения панелей.', + + util_connected='✅️ Столб подключён.', + util_disconnected='‼️ Столб отключён.', + + # other notifications + ac_mode_changed_notification='Пользователь <a href="tg://user?id=%d">%s</a> установил режим AC: <b>%s</b>.', + osp_changed_notification='Пользователь <a href="tg://user?id=%d">%s</a> установил приоритет источника питания нагрузки: <b>%s</b>.', + osp_auto_changed_notification='ℹ️ Бот установил приоритет источника питания нагрузки: <b>%s</b>. Причины: напряжение АКБ %.1f V, мощность заряда с панелей %d W.', + + bat_state_normal='Нормальный', + bat_state_low='Низкий', + bat_state_critical='Критический', +) + +bot.lang.en( + socket='AC output', + status='Status', + generation='Generation', + priority='Priority', + battery="Battery", + load="Load", + generator="Generator", + utilities="Utilities", + consumption="Consumption statistics", + settings="Settings", + done="Done", + unexpected_callback_data="Unexpected callback data", + select_priortiy="Select priority:", + invalid_input="Invalid input", + invalid_mode="Invalid mode", + + flags_press_button='Press a button to toggle a flag.', + flags_fail='Failed to toggle flag', + flags_invalid='Invalid flag', + + # settings + settings_msg='What do you want to configure?', + settings_osp='Output source priority', + settings_ac_preset="AC preset", + settings_bat_thresholds="Battery charging thresholds", + settings_bat_cut_off_voltage="Battery cut-off voltage", + settings_ac_max_charging_current="Max AC charging current", + + settings_osp_msg="Select priority:", + settings_osp_sub='Solar-Utility-Battery', + settings_osp_sbu='Solar-Battery-Utility', + + settings_select_bottom_threshold="Select bottom (lower) threshold:", + settings_select_upper_threshold="Select top (upper) threshold:", + settings_select_max_current='Select max current:', + settings_enter_cutoff_voltage=f'Enter voltage V (40.0 {LT} V {LT} 48.0):', + + # generation + gen_input_power='Input power', + + # time and date + today='Today', + yday1='Yesterday', + yday2='The day before yesterday', + for_7days='7 days', + for_30days='30 days', + # to_select_interval='Select interval', + + # consumption + consumption_msg="Select type:", + consumption_total="Home appliances", + consumption_grid="Consumed from grid", + consumption_select_interval='Select period:', + consumption_request_sent="⏳ Request sent...", + + # status + charging_at=' @ ', + pd_charging='charging', + pd_discharging='discharging', + pd_nothing='not used', + + # flags + flag_buzzer='Buzzer', + flag_overload_bypass='Overload bypass', + flag_escape_to_default_screen_after_1min_timeout='Reset to default LCD page after 1min timeout', + flag_overload_restart='Restart on overload', + flag_over_temp_restart='Restart on overtemp', + flag_backlight_on='LCD backlight', + flag_alarm_on_on_primary_source_interrupt='Beep on primary source interruption', + flag_fault_code_record='Fault code recording', + + # commands + setbatuv_v=f'floating point number, 40.0 {LT} V {LT} 48.0', + setgenct_cv=f'charging voltage, 44 {LT} CV {LT} 51', + setgenct_dv=f'discharging voltage, 48 {LT} DV {LT} 58', + setgencc_a='max charging current, allowed values: %s', + + # monitor + chrg_evt_started='✅ Started charging from AC.', + chrg_evt_finished='✅ Finished charging, it\'s time to stop the generator.', + chrg_evt_disconnected='ℹ️ AC disconnected.', + chrg_evt_current_changed='ℹ️ AC charging current set to %d A.', + chrg_evt_not_charging='ℹ️ AC connected but not charging.', + chrg_evt_na_solar='⛔️ AC connected, but battery won\'t be charged due to active solar power line.', + chrg_evt_mostly_charged='✅ The battery is mostly charged now. The generator can be turned off.', + battery_level_changed='Battery level: <b>%s</b> (<b>%0.1f V</b> under <b>%d W</b> load)', + error_message='<b>Error:</b> %s.', + + util_chrg_evt_started='✅ Started charging from utilities.', + util_chrg_evt_stopped='ℹ️ Stopped charging from utilities.', + util_chrg_evt_stopped_solar='ℹ️ Stopped charging from utilities because solar panels were connected.', + + util_connected='✅️ Utilities connected.', + util_disconnected='‼️ Utilities disconnected.', + + # other notifications + ac_mode_changed_notification='User <a href="tg://user?id=%d">%s</a> set AC mode to <b>%s</b>.', + osp_changed_notification='User <a href="tg://user?id=%d">%s</a> set output source priority: <b>%s</b>.', + osp_auto_changed_notification='Bot changed output source priority to <b>%s</b>. Reasons: battery voltage is %.1f V, solar input is %d W.', + + bat_state_normal='Normal', + bat_state_low='Low', + bat_state_critical='Critical', +) + + +def monitor_charging(event: ChargingEvent, **kwargs) -> None: + args = [] + is_util = False + if event == ChargingEvent.AC_CHARGING_STARTED: + key = 'started' + elif event == ChargingEvent.AC_CHARGING_FINISHED: + key = 'finished' + elif event == ChargingEvent.AC_DISCONNECTED: + key = 'disconnected' + elif event == ChargingEvent.AC_NOT_CHARGING: + key = 'not_charging' + elif event == ChargingEvent.AC_CURRENT_CHANGED: + key = 'current_changed' + args.append(kwargs['current']) + elif event == ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR: + key = 'na_solar' + elif event == ChargingEvent.AC_MOSTLY_CHARGED: + key = 'mostly_charged' + elif event == ChargingEvent.UTIL_CHARGING_STARTED: + key = 'started' + is_util = True + elif event == ChargingEvent.UTIL_CHARGING_STOPPED: + key = 'stopped' + is_util = True + elif event == ChargingEvent.UTIL_CHARGING_STOPPED_SOLAR: + key = 'stopped_solar' + is_util = True + else: + logger.error('unknown charging event:', event) + return + + key = f'chrg_evt_{key}' + if is_util: + key = f'util_{key}' + + asyncio.ensure_future( + bot.notify_all( + lambda lang: bot.lang.get(key, lang, *args) + ) + ) + + +def monitor_battery(state: BatteryState, v: float, load_watts: int) -> None: + if state == BatteryState.NORMAL: + emoji = '✅' + elif state == BatteryState.LOW: + emoji = '⚠️' + elif state == BatteryState.CRITICAL: + emoji = '‼️' + else: + logger.error('unknown battery state:', state) + return + + asyncio.ensure_future( + bot.notify_all( + lambda lang: bot.lang.get('battery_level_changed', lang, + emoji, bot.lang.get(f'bat_state_{state.name.lower()}', lang), v, load_watts) + ) + ) + + +def monitor_util(event: ACPresentEvent): + if event == ACPresentEvent.CONNECTED: + key = 'connected' + else: + key = 'disconnected' + key = f'util_{key}' + asyncio.ensure_future( + bot.notify_all( + lambda lang: bot.lang.get(key, lang) + ) + ) + + +def monitor_error(error: str) -> None: + asyncio.ensure_future( + bot.notify_all( + lambda lang: bot.lang.get('error_message', lang, error) + ) + ) + + +def osp_change_cb(new_osp: OutputSourcePriority, + solar_input: int, + v: float): + + setosp(new_osp) + + asyncio.ensure_future( + bot.notify_all( + lambda lang: bot.lang.get('osp_auto_changed_notification', lang, + bot.lang.get(f'settings_osp_{new_osp.value.lower()}', lang), v, solar_input), + ) + ) + + +@bot.handler(command='status') +async def full_status(ctx: bot.Context) -> None: + status = inverter.exec('get-status', format=Format.TABLE) + await ctx.reply(beautify_table(status)) + + +@bot.handler(command='config') +async def full_rated(ctx: bot.Context) -> None: + rated = inverter.exec('get-rated', format=Format.TABLE) + await ctx.reply(beautify_table(rated)) + + +@bot.handler(command='errors') +async def full_errors(ctx: bot.Context) -> None: + errors = inverter.exec('get-errors', format=Format.TABLE) + await ctx.reply(beautify_table(errors)) + + +@bot.handler(command='flags') +async def flags_handler(ctx: bot.Context) -> None: + flags = inverter.exec('get-flags')['data'] + text, markup = build_flags_keyboard(flags, ctx) + await ctx.reply(text, markup=markup) + + +def build_flags_keyboard(flags: dict, ctx: bot.Context) -> Tuple[str, InlineKeyboardMarkup]: + keyboard = [] + for k, v in flags.items(): + label = ('✅' if v else '❌') + ' ' + ctx.lang(f'flag_{k}') + proto_flag = flags_map[k] + keyboard.append([InlineKeyboardButton(label, callback_data=f'flag_{proto_flag}')]) + + return ctx.lang('flags_press_button'), InlineKeyboardMarkup(keyboard) + + +def getacmode() -> ACMode: + return ACMode(bot.db.get_param('ac_mode', default=ACMode.GENERATOR)) + + +def setacmode(mode: ACMode): + monitor.set_ac_mode(mode) + + cv, dv = config['ac_mode'][str(mode.value)]['thresholds'] + a = config['ac_mode'][str(mode.value)]['initial_current'] + + logger.debug(f'setacmode: mode={mode}, cv={cv}, dv={dv}, a={a}') + + inverter.exec('set-charge-thresholds', (cv, dv)) + inverter.exec('set-max-ac-charge-current', (0, a)) + + +def setosp(sp: OutputSourcePriority): + logger.debug(f'setosp: sp={sp}') + inverter.exec('set-output-source-priority', (sp.value,)) + monitor.notify_osp(sp) + + +class SettingsConversation(bot.conversation): + START, OSP, AC_PRESET, BAT_THRESHOLDS_1, BAT_THRESHOLDS_2, BAT_CUT_OFF_VOLTAGE, AC_MAX_CHARGING_CURRENT = range(7) + STATE_SEQS = [ + [START, OSP], + [START, AC_PRESET], + [START, BAT_THRESHOLDS_1, BAT_THRESHOLDS_2], + [START, BAT_CUT_OFF_VOLTAGE], + [START, AC_MAX_CHARGING_CURRENT] + ] + + START_BUTTONS = bot.lang.pfx('settings_', ['ac_preset', + 'ac_max_charging_current', + 'bat_thresholds', + 'bat_cut_off_voltage', + 'osp']) + OSP_BUTTONS = bot.lang.pfx('settings_osp_', [sp.value.lower() for sp in OutputSourcePriority]) + AC_PRESET_BUTTONS = [mode.value for mode in ACMode] + + RECHARGE_VOLTAGES = [44, 45, 46, 47, 48, 49, 50, 51] + REDISCHARGE_VOLTAGES = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58] + + @bot.conventer(START, message='settings') + async def start_enter(self, ctx: bot.Context): + buttons = list(chunks(list(self.START_BUTTONS), 2)) + buttons.reverse() + return await self.reply(ctx, self.START, ctx.lang('settings_msg'), buttons, + with_cancel=True) + + @bot.convinput(START, messages={ + 'settings_osp': OSP, + 'settings_ac_preset': AC_PRESET, + 'settings_bat_thresholds': BAT_THRESHOLDS_1, + 'settings_bat_cut_off_voltage': BAT_CUT_OFF_VOLTAGE, + 'settings_ac_max_charging_current': AC_MAX_CHARGING_CURRENT + }) + async def start_input(self, ctx: bot.Context): + pass + + @bot.conventer(OSP) + async def osp_enter(self, ctx: bot.Context): + return await self.reply(ctx, self.OSP, ctx.lang('settings_osp_msg'), self.OSP_BUTTONS, + with_back=True) + + @bot.convinput(OSP, messages=OSP_BUTTONS) + async def osp_input(self, ctx: bot.Context): + selected_sp = None + for sp in OutputSourcePriority: + if ctx.text == ctx.lang(f'settings_osp_{sp.value.lower()}'): + selected_sp = sp + break + + if selected_sp is None: + raise ValueError('invalid sp') + + # apply the mode + setosp(selected_sp) + + await asyncio.gather( + # reply to user + ctx.reply(ctx.lang('saved'), markup=bot.IgnoreMarkup()), + + # notify other users + bot.notify_all( + lambda lang: bot.lang.get('osp_changed_notification', lang, + ctx.user.id, ctx.user.name, + bot.lang.get(f'settings_osp_{selected_sp.value.lower()}', lang)), + exclude=(ctx.user_id,) + ) + ) + + return self.END + + @bot.conventer(AC_PRESET) + async def acpreset_enter(self, ctx: bot.Context): + return await self.reply(ctx, self.AC_PRESET, ctx.lang('settings_ac_preset_msg'), self.AC_PRESET_BUTTONS, + with_back=True) + + @bot.convinput(AC_PRESET, messages=AC_PRESET_BUTTONS) + async def acpreset_input(self, ctx: bot.Context): + if monitor.active_current is not None: + raise RuntimeError('generator charging program is active') + + if ctx.text == ctx.lang('utilities'): + newmode = ACMode.UTILITIES + elif ctx.text == ctx.lang('generator'): + newmode = ACMode.GENERATOR + else: + raise ValueError('invalid mode') + + # apply the mode + setacmode(newmode) + + # save + bot.db.set_param('ac_mode', str(newmode.value)) + + await asyncio.gather( + # reply to user + ctx.reply(ctx.lang('saved'), markup=bot.IgnoreMarkup()), + + # notify other users + bot.notify_all( + lambda lang: bot.lang.get('ac_mode_changed_notification', lang, + ctx.user.id, ctx.user.name, + bot.lang.get(str(newmode.value), lang)), + exclude=(ctx.user_id,) + ) + ) + + return self.END + + @bot.conventer(BAT_THRESHOLDS_1) + async def thresholds1_enter(self, ctx: bot.Context): + buttons = list(map(lambda v: f'{v} V', self.RECHARGE_VOLTAGES)) + buttons = chunks(buttons, 4) + return await self.reply(ctx, self.BAT_THRESHOLDS_1, ctx.lang('settings_select_bottom_threshold'), buttons, + with_back=True, buttons_lang_completed=True) + + @bot.convinput(BAT_THRESHOLDS_1, + messages=list(map(lambda n: f'{n} V', RECHARGE_VOLTAGES)), + messages_lang_completed=True) + async def thresholds1_input(self, ctx: bot.Context): + v = self._parse_voltage(ctx.text) + ctx.user_data['bat_thrsh_v1'] = v + return await self.invoke(self.BAT_THRESHOLDS_2, ctx) + + @bot.conventer(BAT_THRESHOLDS_2) + async def thresholds2_enter(self, ctx: bot.Context): + buttons = list(map(lambda v: f'{v} V', self.REDISCHARGE_VOLTAGES)) + buttons = chunks(buttons, 4) + return await self.reply(ctx, self.BAT_THRESHOLDS_2, ctx.lang('settings_select_upper_threshold'), buttons, + with_back=True, buttons_lang_completed=True) + + @bot.convinput(BAT_THRESHOLDS_2, + messages=list(map(lambda n: f'{n} V', REDISCHARGE_VOLTAGES)), + messages_lang_completed=True) + async def thresholds2_input(self, ctx: bot.Context): + v2 = v = self._parse_voltage(ctx.text) + v1 = ctx.user_data['bat_thrsh_v1'] + del ctx.user_data['bat_thrsh_v1'] + + response = inverter.exec('set-charge-thresholds', (v1, v2)) + await ctx.reply(ctx.lang('saved') if response['result'] == 'ok' else 'ERROR', + markup=bot.IgnoreMarkup()) + return self.END + + @bot.conventer(AC_MAX_CHARGING_CURRENT) + async def ac_max_enter(self, ctx: bot.Context): + buttons = self._get_allowed_ac_charge_amps() + buttons = map(lambda n: f'{n} A', buttons) + buttons = [list(buttons)] + return await self.reply(ctx, self.AC_MAX_CHARGING_CURRENT, ctx.lang('settings_select_max_current'), buttons, + with_back=True, buttons_lang_completed=True) + + @bot.convinput(AC_MAX_CHARGING_CURRENT, regex=r'^\d+ A$') + async def ac_max_input(self, ctx: bot.Context): + a = self._parse_amps(ctx.text) + allowed = self._get_allowed_ac_charge_amps() + if a not in allowed: + raise ValueError('input is not allowed') + + response = inverter.exec('set-max-ac-charge-current', (0, a)) + await ctx.reply(ctx.lang('saved') if response['result'] == 'ok' else 'ERROR', + markup=bot.IgnoreMarkup()) + return self.END + + @bot.conventer(BAT_CUT_OFF_VOLTAGE) + async def cutoff_enter(self, ctx: bot.Context): + return await self.reply(ctx, self.BAT_CUT_OFF_VOLTAGE, ctx.lang('settings_enter_cutoff_voltage'), None, + with_back=True) + + @bot.convinput(BAT_CUT_OFF_VOLTAGE, regex=r'^(\d{2}(\.\d{1})?)$') + async def cutoff_input(self, ctx: bot.Context): + v = float(ctx.text) + if 40.0 <= v <= 48.0: + response = inverter.exec('set-battery-cutoff-voltage', (v,)) + await ctx.reply(ctx.lang('saved') if response['result'] == 'ok' else 'ERROR', + markup=bot.IgnoreMarkup()) + else: + raise ValueError('invalid voltage') + + return self.END + + def _get_allowed_ac_charge_amps(self) -> list[int]: + l = inverter.exec('get-allowed-ac-charge-currents')['data'] + l = filter(lambda n: n <= 40, l) + return list(l) + + def _parse_voltage(self, s: str) -> int: + return int(re.match(r'^(\d{2}) V$', s).group(1)) + + def _parse_amps(self, s: str) -> int: + return int(re.match(r'^(\d{1,2}) A$', s).group(1)) + + +class ConsumptionConversation(bot.conversation): + START, TOTAL, GRID = range(3) + STATE_SEQS = [ + [START, TOTAL], + [START, GRID] + ] + + START_BUTTONS = bot.lang.pfx('consumption_', ['total', 'grid']) + INTERVAL_BUTTONS = [ + ['today'], + ['yday1'], + ['for_7days', 'for_30days'], + # ['to_select_interval'] + ] + INTERVAL_BUTTONS_FLAT = list(itertools.chain.from_iterable(INTERVAL_BUTTONS)) + + @bot.conventer(START, message='consumption') + async def start_enter(self, ctx: bot.Context): + return await self.reply(ctx, self.START, ctx.lang('consumption_msg'), [self.START_BUTTONS], + with_cancel=True) + + @bot.convinput(START, messages={ + 'consumption_total': TOTAL, + 'consumption_grid': GRID + }) + async def start_input(self, ctx: bot.Context): + pass + + @bot.conventer(TOTAL) + async def total_enter(self, ctx: bot.Context): + return await self._render_interval_btns(ctx, self.TOTAL) + + @bot.conventer(GRID) + async def grid_enter(self, ctx: bot.Context): + return await self._render_interval_btns(ctx, self.GRID) + + async def _render_interval_btns(self, ctx: bot.Context, state): + return await self.reply(ctx, state, ctx.lang('consumption_select_interval'), self.INTERVAL_BUTTONS, + with_back=True) + + @bot.convinput(TOTAL, messages=INTERVAL_BUTTONS_FLAT) + async def total_input(self, ctx: bot.Context): + return await self._render_interval_results(ctx, self.TOTAL) + + @bot.convinput(GRID, messages=INTERVAL_BUTTONS_FLAT) + async def grid_input(self, ctx: bot.Context): + return await self._render_interval_results(ctx, self.GRID) + + async def _render_interval_results(self, ctx: bot.Context, state): + # if ctx.text == ctx.lang('to_select_interval'): + # TODO + # pass + # + # else: + + now = datetime.datetime.now() + s_to = now.strftime(FormatDate) + + if ctx.text == ctx.lang('today'): + s_from = now.strftime(FormatDate) + s_to = 'now' + elif ctx.text == ctx.lang('yday1'): + s_from = (now - datetime.timedelta(days=1)).strftime(FormatDate) + elif ctx.text == ctx.lang('for_7days'): + s_from = (now - datetime.timedelta(days=7)).strftime(FormatDate) + elif ctx.text == ctx.lang('for_30days'): + s_from = (now - datetime.timedelta(days=30)).strftime(FormatDate) + + # markup = InlineKeyboardMarkup([ + # [InlineKeyboardButton(ctx.lang('please_wait'), callback_data='wait')] + # ]) + + message = await ctx.reply(ctx.lang('consumption_request_sent'), + markup=bot.IgnoreMarkup()) + + api = WebApiClient(timeout=60) + method = 'inverter_get_consumed_energy' if state == self.TOTAL else 'inverter_get_grid_consumed_energy' + + try: + wh = getattr(api, method)(s_from, s_to) + await bot.delete_message(message.chat_id, message.message_id) + await ctx.reply('%.2f Wh' % (wh,), + markup=bot.IgnoreMarkup()) + return self.END + except Exception as e: + await asyncio.gather( + bot.delete_message(message.chat_id, message.message_id), + ctx.reply_exc(e) + ) + +# other +# ----- + +@bot.handler(command='monstatus') +async def monstatus_handler(ctx: bot.Context) -> None: + msg = '' + st = monitor.dump_status() + for k, v in st.items(): + msg += k + ': ' + str(v) + '\n' + await ctx.reply(msg) + + +@bot.handler(command='monsetcur') +async def monsetcur_handler(ctx: bot.Context) -> None: + await ctx.reply('not implemented yet') + + +@bot.callbackhandler +async def button_callback(ctx: bot.Context) -> None: + query = ctx.callback_query + + if query.data.startswith('flag_'): + flag = query.data[5:] + found = False + json_key = None + for k, v in flags_map.items(): + if v == flag: + found = True + json_key = k + break + if not found: + await query.answer(ctx.lang('flags_invalid')) + return + + flags = inverter.exec('get-flags')['data'] + cur_flag_value = flags[json_key] + target_flag_value = '0' if cur_flag_value else '1' + + # set flag + response = inverter.exec('set-flag', (flag, target_flag_value)) + + # notify user + await query.answer(ctx.lang('done') if response['result'] == 'ok' else ctx.lang('flags_fail')) + + # edit message + flags[json_key] = not cur_flag_value + text, markup = build_flags_keyboard(flags, ctx) + await query.edit_message_text(text, reply_markup=markup) + + else: + await query.answer(ctx.lang('unexpected_callback_data')) + + +@bot.exceptionhandler +async def exception_handler(e: Exception, ctx: bot.Context) -> Optional[bool]: + if isinstance(e, InverterError): + try: + err = json.loads(str(e))['message'] + except json.decoder.JSONDecodeError: + err = str(e) + err = re.sub(r'((?:.*)?error:) (.*)', r'<b>\1</b> \2', err) + await ctx.reply(err, markup=bot.IgnoreMarkup()) + return True + + +@bot.handler(message='status') +async def status_handler(ctx: bot.Context) -> None: + gs = inverter.exec('get-status')['data'] + rated = inverter.exec('get-rated')['data'] + + # render response + power_direction = gs['battery_power_direction'].lower() + power_direction = re.sub(r'ge$', 'ging', power_direction) + + charging_rate = '' + chrg_at = ctx.lang('charging_at') + + if power_direction == 'charging': + charging_rate = f'{chrg_at}%s %s' % ( + gs['battery_charge_current']['value'], gs['battery_charge_current']['unit']) + pd_label = ctx.lang('pd_charging') + elif power_direction == 'discharging': + charging_rate = f'{chrg_at}%s %s' % ( + gs['battery_discharge_current']['value'], gs['battery_discharge_current']['unit']) + pd_label = ctx.lang('pd_discharging') + else: + pd_label = ctx.lang('pd_nothing') + + html = f'<b>{ctx.lang("battery")}:</b> %s %s' % (gs['battery_voltage']['value'], gs['battery_voltage']['unit']) + html += ' (%s%s)' % (pd_label, charging_rate) + + html += f'\n<b>{ctx.lang("load")}:</b> %s %s' % (gs['ac_output_active_power']['value'], gs['ac_output_active_power']['unit']) + html += ' (%s%%)' % (gs['output_load_percent']['value']) + + if gs['pv1_input_power']['value'] > 0: + html += f'\n<b>{ctx.lang("gen_input_power")}:</b> %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit']) + + if gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0: + ac_mode = getacmode() + html += f'\n<b>{ctx.lang(ac_mode.value)}:</b> %s %s' % (gs['grid_voltage']['value'], gs['grid_voltage']['unit']) + html += ', %s %s' % (gs['grid_freq']['value'], gs['grid_freq']['unit']) + + html += f'\n<b>{ctx.lang("socket")}</b>: %s %s, %s %s' % ( + gs['ac_output_voltage']['value'], gs['ac_output_voltage']['unit'], + gs['ac_output_freq']['value'], gs['ac_output_freq']['unit'] + ) + + html += f'\n<b>{ctx.lang("priority")}</b>: {rated["output_source_priority"]}' + + # send response + await ctx.reply(html) + + +@bot.handler(message='generation') +async def generation_handler(ctx: bot.Context) -> None: + today = datetime.date.today() + yday = today - datetime.timedelta(days=1) + yday2 = today - datetime.timedelta(days=2) + + gs = inverter.exec('get-status')['data'] + + gen_today = inverter.exec('get-day-generated', (today.year, today.month, today.day))['data'] + gen_yday = None + gen_yday2 = None + + if yday.month == today.month: + gen_yday = inverter.exec('get-day-generated', (yday.year, yday.month, yday.day))['data'] + + if yday2.month == today.month: + gen_yday2 = inverter.exec('get-day-generated', (yday2.year, yday2.month, yday2.day))['data'] + + # render response + html = f'<b>{ctx.lang("gen_input_power")}:</b> %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit']) + html += ' (%s %s)' % (gs['pv1_input_voltage']['value'], gs['pv1_input_voltage']['unit']) + + html += f'\n<b>{ctx.lang("today")}:</b> %s Wh' % (gen_today['wh']) + + if gen_yday is not None: + html += f'\n<b>{ctx.lang("yday1")}:</b> %s Wh' % (gen_yday['wh']) + + if gen_yday2 is not None: + html += f'\n<b>{ctx.lang("yday2")}:</b> %s Wh' % (gen_yday2['wh']) + + # send response + await ctx.reply(html) + + +@bot.defaultreplymarkup +def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: + button = [ + [ctx.lang('status'), ctx.lang('generation')], + [ctx.lang('consumption')], + [ctx.lang('settings')] + ] + return ReplyKeyboardMarkup(button, one_time_keyboard=False) + + +class InverterStore(bot.BotDatabase): + SCHEMA = 2 + + def schema_init(self, version: int) -> None: + super().schema_init(version) + + if version < 2: + cursor = self.cursor() + cursor.execute("""CREATE TABLE IF NOT EXISTS params ( + id TEXT NOT NULL PRIMARY KEY, + value TEXT NOT NULL + )""") + cursor.execute("CREATE INDEX param_id_idx ON params (id)") + self.commit() + + def get_param(self, key: str, default=None): + cursor = self.cursor() + cursor.execute('SELECT value FROM params WHERE id=?', (key,)) + row = cursor.fetchone() + + return default if row is None else row[0] + + def set_param(self, key: str, value: Union[str, int, float]): + cursor = self.cursor() + cursor.execute('REPLACE INTO params (id, value) VALUES (?, ?)', (key, str(value))) + self.commit() + + +inverter.init(host=config['inverter']['ip'], port=config['inverter']['port']) + +bot.set_database(InverterStore()) + +bot.add_conversation(SettingsConversation(enable_back=True)) +bot.add_conversation(ConsumptionConversation(enable_back=True)) + +monitor = InverterMonitor() +monitor.set_charging_event_handler(monitor_charging) +monitor.set_battery_event_handler(monitor_battery) +monitor.set_util_event_handler(monitor_util) +monitor.set_error_handler(monitor_error) +monitor.set_osp_need_change_callback(osp_change_cb) + +setacmode(getacmode()) + +if not config.get('monitor.disabled'): + logging.info('starting monitor') + monitor.start() + +bot.run() + +monitor.stop() diff --git a/bin/inverter_mqtt_util.py b/bin/inverter_mqtt_util.py new file mode 100755 index 0000000..6003c62 --- /dev/null +++ b/bin/inverter_mqtt_util.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +import __py_include + +from argparse import ArgumentParser +from homekit.config import config +from homekit.mqtt import MqttWrapper, MqttNode + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('mode', type=str, choices=('sender', 'receiver'), nargs=1) + + config.load_app('inverter_mqtt_util', parser=parser) + arg = parser.parse_args() + mode = arg.mode[0] + + mqtt = MqttWrapper(client_id=f'inverter_mqtt_{mode}', + clean_session=mode != 'receiver') + node = MqttNode(node_id='inverter') + module_kwargs = {} + if mode == 'sender': + module_kwargs['status_poll_freq'] = int(config.app_config['poll_freq']) + module_kwargs['generation_poll_freq'] = int(config.app_config['generation_poll_freq']) + node.load_module('inverter', **module_kwargs) + mqtt.add_node(node) + + mqtt.connect_and_loop() diff --git a/bin/inverterd_emulator.py b/bin/inverterd_emulator.py new file mode 100755 index 0000000..371d955 --- /dev/null +++ b/bin/inverterd_emulator.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python3 +import logging +import __py_include + +from homekit.inverter.emulator import InverterEmulator + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + InverterEmulator(addr=('127.0.0.1', 8305)) diff --git a/bin/ipcam_capture.py b/bin/ipcam_capture.py new file mode 100755 index 0000000..5de14af --- /dev/null +++ b/bin/ipcam_capture.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +import __py_include +import sys +import os +import subprocess +import asyncio +import signal + +from typing import TextIO +from argparse import ArgumentParser +from socket import gethostname +from asyncio.streams import StreamReader +from homekit.config import LinuxBoardsConfig, config as homekit_config +from homekit.camera import IpcamConfig, CaptureType +from homekit.camera.util import get_hls_directory, get_hls_channel_name, get_recordings_path + +ipcam_config = IpcamConfig() +lbc_config = LinuxBoardsConfig() +channels = (1, 2) +tasks = [] +restart_delay = 3 +lock = asyncio.Lock() +worker_type: CaptureType + + +async def read_output(stream: StreamReader, + thread_name: str, + output: TextIO): + try: + while True: + line = await stream.readline() + if not line: + break + print(f"[{thread_name}] {line.decode().strip()}", file=output) + + except asyncio.LimitOverrunError: + print(f"[{thread_name}] Output limit exceeded.", file=output) + + except Exception as e: + print(f"[{thread_name}] Error occurred while reading output: {e}", file=sys.stderr) + + +async def run_ffmpeg(cam: int, channel: int): + prefix = get_hls_channel_name(cam, channel) + + if homekit_config.app_config.logging_is_verbose(): + debug_args = ['-v', '-info'] + else: + debug_args = ['-nostats', '-loglevel', 'error'] + + protocol = 'tcp' if ipcam_config.should_use_tcp_for_rtsp(cam) else 'udp' + user, pw = ipcam_config.get_rtsp_creds() + ip = ipcam_config.get_camera_ip(cam) + path = ipcam_config.get_camera_type(cam).get_channel_url(channel) + ext = ipcam_config.get_camera_container(cam) + ffmpeg_command = ['ffmpeg', *debug_args, + '-rtsp_transport', protocol, + '-i', f'rtsp://{user}:{pw}@{ip}:554{path}', + '-c', 'copy',] + + if worker_type == CaptureType.HLS: + ffmpeg_command.extend(['-bufsize', '1835k', + '-pix_fmt', 'yuv420p', + '-flags', '-global_header', + '-hls_time', '2', + '-hls_list_size', '3', + '-hls_flags', 'delete_segments', + os.path.join(get_hls_directory(cam, channel), 'live.m3u8')]) + + elif worker_type == CaptureType.RECORD: + ffmpeg_command.extend(['-f', 'segment', + '-strftime', '1', + '-segment_time', '00:10:00', + '-segment_atclocktime', '1', + os.path.join(get_recordings_path(cam), f'record_%Y-%m-%d-%H.%M.%S.{ext.value}')]) + + else: + raise ValueError(f'invalid worker type: {worker_type}') + + while True: + try: + process = await asyncio.create_subprocess_exec( + *ffmpeg_command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + stdout_task = asyncio.create_task(read_output(process.stdout, prefix, sys.stdout)) + stderr_task = asyncio.create_task(read_output(process.stderr, prefix, sys.stderr)) + + await asyncio.gather(stdout_task, stderr_task) + + # check the return code of the process + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, ffmpeg_command) + + except (FileNotFoundError, PermissionError, subprocess.CalledProcessError) as e: + # an error occurred, print the error message + error_message = f"Error occurred in {prefix}: {e}" + print(error_message, file=sys.stderr) + + # sleep for 5 seconds before restarting the process + await asyncio.sleep(restart_delay) + + +async def run(): + kwargs = {} + if worker_type == CaptureType.RECORD: + kwargs['filter_by_server'] = gethostname() + for cam in ipcam_config.get_all_cam_names(**kwargs): + for channel in channels: + task = asyncio.create_task(run_ffmpeg(cam, channel)) + tasks.append(task) + + try: + await asyncio.gather(*tasks) + except KeyboardInterrupt: + print('KeyboardInterrupt: stopping processes...', file=sys.stderr) + for task in tasks: + task.cancel() + + # wait for subprocesses to terminate + await asyncio.gather(*tasks, return_exceptions=True) + + # send termination signal to all subprocesses + for task in tasks: + process = task.get_stack() + if process: + process.send_signal(signal.SIGTERM) + + +if __name__ == '__main__': + capture_types = [t.value for t in CaptureType] + parser = ArgumentParser() + parser.add_argument('type', type=str, metavar='CAPTURE_TYPE', choices=tuple(capture_types), + help='capture type (variants: '+', '.join(capture_types)+')') + + arg = homekit_config.load_app(no_config=True, parser=parser) + worker_type = CaptureType(arg['type']) + + asyncio.run(run()) diff --git a/bin/ipcam_motion_worker.sh b/bin/ipcam_motion_worker.sh new file mode 100755 index 0000000..603a407 --- /dev/null +++ b/bin/ipcam_motion_worker.sh @@ -0,0 +1,327 @@ +#!/bin/bash + +set -e + +DIR="$( cd "$( dirname "$(realpath "${BASH_SOURCE[0]}")" )" &>/dev/null && pwd )" +PROGNAME="$0" + +. "$DIR/../include/bash/include.bash" + +curl_opts="-s --connect-timeout 10 --retry 5 --max-time 180 --retry-delay 0 --retry-max-time 180" +allow_multiple= +fetch_limit=10 + +config= +config_camera= +is_remote= +api_url= + +dvr_scan_path="$HOME/.local/bin/dvr-scan" +fs_root="/var/ipcam_motion_fs" +fs_max_filesize=146800640 + +declare -A config=() + +usage() { + cat <<EOF +usage: $PROGNAME OPTIONS + +Options: + -v, -vx be verbose. + -v enables debug logs. + -vx does \`set -x\`, may be used to debug the script. + --allow-multiple don't check for another instance + --L, --fetch-limit default: $fetch_limit + --remote + --local + --dvr-scan-path default: $dvr_scan_path + --fs-root default: $fs_root + --fs-max-filesize default: $fs_max_filesize +EOF + exit 1 +} + +get_recordings_dir() { + local camera="$1" + curl $curl_opts "${api_url}/api/camera/list" \ + | jq ".response.\"${camera}\".recordings_path" | tr -d '"' +} + +# returns three words per line: +# filename filesize camera +get_recordings_list() { + curl $curl_opts "${api_url}/api/recordings?limit=${fetch_limit}" \ + | jq '.response.files[] | [.name, .size, .cam] | join(" ")' | tr -d '"' +} + +read_camera_motion_config() { + local camera="$1" + local dst=config + + if [ "$config_camera" != "$camera" ]; then + local n=0 + local failed= + local key + local value + + while read line; do + n=$(( n+1 )) + + # skip empty lines or comments + if [ -z "$line" ] || [[ "$line" =~ ^#.* ]]; then + continue + fi + + if [[ $line = *"="* ]]; then + key="${line%%=*}" + value="${line#*=}" + eval "$dst[$key]=\"$value\"" + else + echoerr "config: invalid line $n" + failed=1 + fi + done < <(curl $curl_opts "${api_url}/api/motion/params/${camera}") + + config_camera="$camera" + + [ -z "$failed" ] + else + debug "read_camera_motion_config: config for $camera already loaded" + fi +} + +dump_config() { + for key in min_event_length downscale_factor frame_skip threshold; do + debug "config[$key]=${config[$key]}" + done +} + +get_camera_roi_config() { + local camera="$1" + curl $curl_opts "${api_url}/api/motion/params/${camera}/roi" +} + +report_failure() { + local camera="$1" + local file="$2" + local message="$3" + + local response=$(curl $curl_opts -X POST "${api_url}/api/motion/fail/${camera}" \ + -F "filename=$file" \ + -F "message=$message") + + print_response_error "$response" "report_failure" +} + +report_timecodes() { + local camera="$1" + local file="$2" + local timecodes="$3" + + local response=$(curl $curl_opts -X POST "${api_url}/api/motion/done/${camera}" \ + -F "filename=$file" \ + -F "timecodes=$timecodes") + + print_response_error "$response" "report_timecodes" +} + +print_response_error() { + local resp="$1" + local sufx="$2" + + local error="$(echo "$resp" | jq '.error')" + local message + + if [ "$error" != "null" ]; then + message="$(echo "$resp" | jq '.message' | tr -d '"')" + error="$(echo "$error" | tr -d '"')" + + echoerr "$sufx: $error ($message)" + fi +} + +process_queue() { + local tc + local url + local words + local file + local size + local camera + local local_recs_dir + + if [ "$is_remote" = "1" ]; then + pushd "${fs_root}" >/dev/null || die "failed to change to ${fs_root}" + touch tmp || die "directory '${fs_root}' is not writable" + rm tmp + + [ -f "video.mp4" ] && { + echowarn "video.mp4 already exists in ${fs_root}, removing.." + rm "video.mp4" + } + fi + + while read line; do + words=($line) + file=${words[0]} + size=${words[1]} + camera=${words[2]} + + debug "next video: cam=$camera file=$file" + + read_camera_motion_config "$camera" +# dump_config + + if [ "$is_remote" = "0" ]; then + local_recs_dir="$(get_recordings_dir "$camera")" + + debug "[$camera] processing $file..." + + tc=$(do_motion "$camera" "${local_recs_dir}/$file") + debug "[$camera] $file: timecodes=$tc" + + report_timecodes "$camera" "$file" "$tc" + else + if (( size > fs_max_filesize )); then + echoerr "[$camera] won't download $file, size exceeds fs_max_filesize ($size > ${fs_max_filesize})" + report_failure "$camera" "$file" "too large file" + continue + fi + + url="${api_url}/api/recordings/${camera}/download/${file}" + debug "[$camera] downloading $url..." + + if ! download "$url" "video.mp4"; then + echoerr "[$camera] failed to download $file" + report_failure "$camera" "$file" "download error" + continue + fi + + tc=$(do_motion "$camera" "video.mp4") + debug "[$camera] $file: timecodes=$tc" + + report_timecodes "$camera" "$file" "$tc" + + rm "video.mp4" + fi + done < <(get_recordings_list) + + if [ "$is_remote" = "1" ]; then popd >/dev/null; fi +} + +do_motion() { + local camera="$1" + local input="$2" + local tc + + local timecodes=() + + time_start + while read line; do + if ! [[ "$line" =~ ^#.* ]]; then + tc="$(do_dvr_scan "$input" "$line")" + if [ -n "$tc" ]; then + timecodes+=("$tc") + fi + fi + done < <(get_camera_roi_config "$camera") + + debug "[$camera] do_motion: finished in $(time_elapsed)s" + + timecodes="$(echo "${timecodes[@]}" | sed 's/ */ /g' | xargs)" + timecodes="${timecodes// /,}" + + echo "$timecodes" +} + +dvr_scan() { + "${dvr_scan_path}" "$@" +} + +do_dvr_scan() { + local input="$1" + local args= + + if [ ! -z "$2" ]; then + args="-roi $2" + echoinfo "dvr_scan(${BOLD}${input}${RST}${CYAN}): roi=($2), mt=${config[threshold]}" + else + echoinfo "dvr_scan(${BOLD}${input}${RST}${CYAN}): no roi, mt=${config[threshold]}" + fi + + dvr_scan -q -i "$input" -so \ + --min-event-length ${config[min_event_length]} \ + -df ${config[downscale_factor]} \ + --frame-skip ${config[frame_skip]} \ + -t ${config[threshold]} $args | tail -1 +} + +[[ $# -lt 1 ]] && usage + +while [[ $# -gt 0 ]]; do + case $1 in + -L|--fetch-limit) + fetch_limit="$2" + shift; shift + ;; + + --allow-multiple) + allow_multiple=1 + shift + ;; + + --remote) + is_remote=1 + shift + ;; + + --local) + is_remote=0 + shift + ;; + + --dvr-scan-path) + dvr_scan_path="$2" + shift; shift + ;; + + --fs-root) + fs_root="$2" + shift; shift + ;; + + --fs-max-filesize) + fs_max_filesize="$2" + shift; shift + ;; + + --api-url) + api_url="$2" + shift; shift + ;; + + -v) + VERBOSE=1 + shift + ;; + + -vx) + VERBOSE=1 + set -x + shift + ;; + + *) + die "unrecognized argument '$1'" + exit 1 + ;; + esac +done + +if [ -z "$allow_multiple" ] && pidof -o %PPID -x "$(basename "${BASH_SOURCE[0]}")" >/dev/null; then + die "process already running" +fi + +[ -z "$is_remote" ] && die "either --remote or --local is required" +[ -z "$api_url" ] && die "--api-url is required" + +process_queue
\ No newline at end of file diff --git a/bin/ipcam_server.py b/bin/ipcam_server.py new file mode 100755 index 0000000..71d5ea1 --- /dev/null +++ b/bin/ipcam_server.py @@ -0,0 +1,544 @@ +#!/usr/bin/env python3 +import logging +import os +import asyncio +import time +import shutil +import __py_include + +import homekit.telegram.aio as telegram + +from socket import gethostname +from argparse import ArgumentParser +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from asyncio import Lock + +from homekit.config import config as homekit_config, LinuxBoardsConfig +from homekit.util import Addr +from homekit import http +from homekit.database.sqlite import SQLiteBase +from homekit.camera import util as camutil, IpcamConfig +from homekit.camera.types import ( + TimeFilterType, + TelegramLinkType, + VideoContainerType +) +from homekit.camera.util import ( + get_recordings_path, + get_motion_path, + is_valid_recording_name, + datetime_from_filename +) + +from typing import Optional, Union, List, Tuple +from datetime import datetime, timedelta +from functools import cmp_to_key + + +ipcam_config = IpcamConfig() +lbc_config = LinuxBoardsConfig() + + +# ipcam database +# -------------- + +class IpcamServerDatabase(SQLiteBase): + SCHEMA = 4 + + def __init__(self, path=None): + super().__init__(path=path) + + def schema_init(self, version: int) -> None: + cursor = self.cursor() + + if version < 1: + # timestamps + cursor.execute("""CREATE TABLE IF NOT EXISTS timestamps ( + camera INTEGER PRIMARY KEY, + fix_time INTEGER NOT NULL, + motion_time INTEGER NOT NULL + )""") + for cam in ipcam_config.get_all_cam_names_for_this_server(): + self.add_camera(cam) + + if version < 2: + # motion_failures + cursor.execute("""CREATE TABLE IF NOT EXISTS motion_failures ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + camera INTEGER NOT NULL, + filename TEXT NOT NULL + )""") + + if version < 3: + cursor.execute("ALTER TABLE motion_failures ADD COLUMN message TEXT NOT NULL DEFAULT ''") + + if version < 4: + cursor.execute("ALTER TABLE timestamps ADD COLUMN motion_start_time INTEGER NOT NULL DEFAULT 0") + cursor.execute("UPDATE timestamps SET motion_start_time=motion_time") + + self.commit() + + def add_camera(self, camera: int): + self.cursor().execute("INSERT INTO timestamps (camera, fix_time, motion_time) VALUES (?, ?, ?)", + (camera, 0, 0)) + self.commit() + + def add_motion_failure(self, + camera: int, + filename: str, + message: Optional[str]): + self.cursor().execute("INSERT INTO motion_failures (camera, filename, message) VALUES (?, ?, ?)", + (camera, filename, message or '')) + self.commit() + + def get_all_timestamps(self): + cur = self.cursor() + data = {} + + cur.execute("SELECT camera, fix_time, motion_time, motion_start_time FROM timestamps") + for cam, fix_time, motion_time, motion_start_time in cur.fetchall(): + data[int(cam)] = { + 'fix': int(fix_time), + 'motion': int(motion_time), + 'motion_start': int(motion_start_time) + } + + return data + + def set_timestamp(self, + camera: int, + time_type: TimeFilterType, + time: Union[int, datetime]): + cur = self.cursor() + if isinstance(time, datetime): + time = int(time.timestamp()) + cur.execute(f"UPDATE timestamps SET {time_type.value}_time=? WHERE camera=?", (time, camera)) + self.commit() + + def get_timestamp(self, + camera: int, + time_type: TimeFilterType) -> int: + cur = self.cursor() + cur.execute(f"SELECT {time_type.value}_time FROM timestamps WHERE camera=?", (camera,)) + return int(cur.fetchone()[0]) + + +# ipcam web api +# ------------- + +class IpcamWebServer(http.HTTPServer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.get('/api/recordings', self.get_motion_queue) + self.get('/api/recordings/{name}', self.get_camera_recordings) + self.get('/api/recordings/{name}/download/{file}', self.download_recording) + self.get('/api/camera/list', self.camlist) + self.get('/api/timestamp/{name}/{type}', self.get_timestamp) + self.get('/api/timestamp/all', self.get_all_timestamps) + + self.post('/api/debug/fix', self.debug_fix) + self.post('/api/debug/cleanup', self.debug_cleanup) + + self.post('/api/timestamp/{name}/{type}', self.set_timestamp) + + self.post('/api/motion/done/{name}', self.submit_motion) + self.post('/api/motion/fail/{name}', self.submit_motion_failure) + + # self.get('/api/motion/params/{name}', self.get_motion_params) + # self.get('/api/motion/params/{name}/roi', self.get_motion_roi_params) + + self.queue_lock = Lock() + + async def get_camera_recordings(self, req): + camera = int(req.match_info['name']) + try: + filter = TimeFilterType(req.query['filter']) + except KeyError: + filter = None + + try: + limit = int(req.query['limit']) + except KeyError: + limit = 0 + + files = get_recordings_files(camera, filter, limit) + if files: + time = datetime_from_filename(files[len(files)-1]['name']) + db.set_timestamp(camera, TimeFilterType.MOTION_START, time) + return self.ok({'files': files}) + + async def get_motion_queue(self, req): + try: + limit = int(req.query['limit']) + except KeyError: + limit = 0 + + async with self.queue_lock: + files = get_recordings_files(None, TimeFilterType.MOTION_START, limit) + if files: + times_by_cam = {} + for file in files: + time = datetime_from_filename(file['name']) + if file['cam'] not in times_by_cam or times_by_cam[file['cam']] < time: + times_by_cam[file['cam']] = time + for cam, time in times_by_cam.items(): + db.set_timestamp(cam, TimeFilterType.MOTION_START, time) + + return self.ok({'files': files}) + + async def download_recording(self, req: http.Request): + cam = int(req.match_info['name']) + file = req.match_info['file'] + + fullpath = os.path.join(get_recordings_path(cam), file) + if not os.path.isfile(fullpath): + raise ValueError(f'file "{fullpath}" does not exists') + + return http.FileResponse(fullpath) + + async def camlist(self, req: http.Request): + return self.ok(ipcam_config.get_all_cam_names_for_this_server()) + + async def submit_motion(self, req: http.Request): + data = await req.post() + + camera = int(req.match_info['name']) + timecodes = data['timecodes'] + filename = data['filename'] + + time = datetime_from_filename(filename) + + try: + if timecodes != '': + fragments = camutil.dvr_scan_timecodes(timecodes) + asyncio.ensure_future(process_fragments(camera, filename, fragments)) + + db.set_timestamp(camera, TimeFilterType.MOTION, time) + return self.ok() + + except camutil.DVRScanInvalidTimecodes as e: + db.add_motion_failure(camera, filename, str(e)) + db.set_timestamp(camera, TimeFilterType.MOTION, time) + return self.ok('invalid timecodes') + + async def submit_motion_failure(self, req: http.Request): + camera = int(req.match_info['name']) + + data = await req.post() + filename = data['filename'] + message = data['message'] + + db.add_motion_failure(camera, filename, message) + db.set_timestamp(camera, TimeFilterType.MOTION, datetime_from_filename(filename)) + + return self.ok() + + async def debug_fix(self, req: http.Request): + asyncio.ensure_future(fix_job()) + return self.ok() + + async def debug_cleanup(self, req: http.Request): + asyncio.ensure_future(cleanup_job()) + return self.ok() + + async def set_timestamp(self, req: http.Request): + cam, time_type, time = self._getset_timestamp_params(req, need_time=True) + db.set_timestamp(cam, time_type, time) + return self.ok() + + async def get_timestamp(self, req: http.Request): + cam, time_type = self._getset_timestamp_params(req) + return self.ok(db.get_timestamp(cam, time_type)) + + async def get_all_timestamps(self, req: http.Request): + return self.ok(db.get_all_timestamps()) + + # async def get_motion_params(self, req: http.Request): + # data = config['motion_params'][int(req.match_info['name'])] + # lines = [ + # f'threshold={data["threshold"]}', + # f'min_event_length=3s', + # f'frame_skip=2', + # f'downscale_factor=3', + # ] + # return self.plain('\n'.join(lines)+'\n') + # + # async def get_motion_roi_params(self, req: http.Request): + # data = config['motion_params'][int(req.match_info['name'])] + # return self.plain('\n'.join(data['roi'])+'\n') + + @staticmethod + def _getset_timestamp_params(req: http.Request, need_time=False): + values = [] + + cam = int(req.match_info['name']) + assert cam in ipcam_config.get_all_cam_names_for_this_server(), 'invalid camera' + + values.append(cam) + values.append(TimeFilterType(req.match_info['type'])) + + if need_time: + time = req.query['time'] + if time.startswith('record_'): + time = datetime_from_filename(time) + elif time.isnumeric(): + time = int(time) + else: + raise ValueError('invalid time') + values.append(time) + + return values + + +# other global stuff +# ------------------ + +def open_database(database_path: str): + global db + db = IpcamServerDatabase(database_path) + + # update cams list in database, if needed + stored_cams = db.get_all_timestamps().keys() + for cam in ipcam_config.get_all_cam_names_for_this_server(): + if cam not in stored_cams: + db.add_camera(cam) + + +def get_recordings_files(cam: Optional[int] = None, + time_filter_type: Optional[TimeFilterType] = None, + limit=0) -> List[dict]: + from_time = 0 + to_time = int(time.time()) + + cams = [cam] if cam is not None else ipcam_config.get_all_cam_names_for_this_server() + files = [] + for cam in cams: + if time_filter_type: + from_time = db.get_timestamp(cam, time_filter_type) + if time_filter_type in (TimeFilterType.MOTION, TimeFilterType.MOTION_START): + to_time = db.get_timestamp(cam, TimeFilterType.FIX) + + from_time = datetime.fromtimestamp(from_time) + to_time = datetime.fromtimestamp(to_time) + + recdir = get_recordings_path(cam) + cam_files = [{ + 'cam': cam, + 'name': file, + 'size': os.path.getsize(os.path.join(recdir, file))} + for file in os.listdir(recdir) + if is_valid_recording_name(file) and from_time < datetime_from_filename(file) <= to_time] + cam_files.sort(key=lambda file: file['name']) + + if cam_files: + last = cam_files[len(cam_files)-1] + fullpath = os.path.join(recdir, last['name']) + if camutil.has_handle(fullpath): + logger.debug(f'get_recordings_files: file {fullpath} has opened handle, ignoring it') + cam_files.pop() + files.extend(cam_files) + + if limit > 0: + files = files[:limit] + + return files + + +async def process_fragments(camera: int, + filename: str, + fragments: List[Tuple[int, int]]) -> None: + time = datetime_from_filename(filename) + + rec_dir = get_recordings_path(camera) + motion_dir = get_motion_path(camera) + if not os.path.exists(motion_dir): + os.mkdir(motion_dir) + + for fragment in fragments: + start, end = fragment + + start -= ipcam_config['motion_padding'] + end += ipcam_config['motion_padding'] + + if start < 0: + start = 0 + + duration = end - start + + dt1 = (time + timedelta(seconds=start)).strftime(datetime_format) + dt2 = (time + timedelta(seconds=end)).strftime(datetime_format) + + await camutil.ffmpeg_cut(input=os.path.join(rec_dir, filename), + output=os.path.join(motion_dir, f'{dt1}__{dt2}.mp4'), + start_pos=start, + duration=duration) + + if fragments and ipcam_config['motion_telegram']: + asyncio.ensure_future(motion_notify_tg(camera, filename, fragments)) + + +async def motion_notify_tg(camera: int, + filename: str, + fragments: List[Tuple[int, int]]): + dt_file = datetime_from_filename(filename) + fmt = '%H:%M:%S' + + text = f'Camera: <b>{camera}</b>\n' + text += f'Original file: <b>{filename}</b> ' + text += _tg_links(TelegramLinkType.ORIGINAL_FILE, camera, filename) + + for start, end in fragments: + start -= ipcam_config['motion_padding'] + end += ipcam_config['motion_padding'] + + if start < 0: + start = 0 + + duration = end - start + if duration < 0: + duration = 0 + + dt1 = dt_file + timedelta(seconds=start) + dt2 = dt_file + timedelta(seconds=end) + + text += f'\nFragment: <b>{duration}s</b>, {dt1.strftime(fmt)}-{dt2.strftime(fmt)} ' + text += _tg_links(TelegramLinkType.FRAGMENT, camera, f'{dt1.strftime(datetime_format)}__{dt2.strftime(datetime_format)}.mp4') + + await telegram.send_message(text) + + +def _tg_links(link_type: TelegramLinkType, + camera: int, + file: str) -> str: + links = [] + for link_name, link_template in ipcam_config[f'{link_type.value}_url_templates']: + link = link_template.replace('{camera}', str(camera)).replace('{file}', file) + links.append(f'<a href="{link}">{link_name}</a>') + return ' '.join(links) + + +async def fix_job() -> None: + global fix_job_running + logger.debug('fix_job: starting') + + if fix_job_running: + logger.error('fix_job: already running') + return + + try: + fix_job_running = True + for cam in ipcam_config.get_all_cam_names_for_this_server(): + files = get_recordings_files(cam, TimeFilterType.FIX) + if not files: + logger.debug(f'fix_job: no files for camera {cam}') + continue + + logger.debug(f'fix_job: got %d files for camera {cam}' % (len(files),)) + + for file in files: + fullpath = os.path.join(get_recordings_path(cam), file['name']) + await camutil.ffmpeg_recreate(fullpath) + timestamp = datetime_from_filename(file['name']) + if timestamp: + db.set_timestamp(cam, TimeFilterType.FIX, timestamp) + + finally: + fix_job_running = False + + +async def cleanup_job() -> None: + def compare(i1: str, i2: str) -> int: + dt1 = datetime_from_filename(i1) + dt2 = datetime_from_filename(i2) + + if dt1 < dt2: + return -1 + elif dt1 > dt2: + return 1 + else: + return 0 + + global cleanup_job_running + logger.debug('cleanup_job: starting') + + if cleanup_job_running: + logger.error('cleanup_job: already running') + return + + try: + cleanup_job_running = True + + gb = float(1 << 30) + disk_number = 0 + for storage in lbc_config.get_board_disks(gethostname()): + disk_number += 1 + if os.path.exists(storage['mountpoint']): + total, used, free = shutil.disk_usage(storage['mountpoint']) + free_gb = free // gb + if free_gb < ipcam_config['cleanup_min_gb']: + cleaned = 0 + files = [] + for cam in ipcam_config.get_all_cam_names_for_this_server(filter_by_disk=disk_number): + for _dir in (get_recordings_path(cam), get_motion_path(cam)): + files += list(map(lambda file: os.path.join(_dir, file), os.listdir(_dir))) + files = list(filter(lambda path: os.path.isfile(path) and path.endswith(tuple([f'.{t.value}' for t in VideoContainerType])), files)) + files.sort(key=cmp_to_key(compare)) + + for file in files: + size = os.stat(file).st_size + try: + os.unlink(file) + cleaned += size + except OSError as e: + logger.exception(e) + if (free + cleaned) // gb >= ipcam_config['cleanup_min_gb']: + break + else: + logger.error(f"cleanup_job: {storage['mountpoint']} not found") + finally: + cleanup_job_running = False + + +fix_job_running = False +cleanup_job_running = False + +datetime_format = '%Y-%m-%d-%H.%M.%S' +datetime_format_re = r'\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}.\d{2}' +db: Optional[IpcamServerDatabase] = None +server: Optional[IpcamWebServer] = None +logger = logging.getLogger(__name__) + + +# start of the program +# -------------------- + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--listen', type=str, required=True) + parser.add_argument('--database-path', type=str, required=True) + arg = homekit_config.load_app(no_config=True, parser=parser) + + open_database(arg.database_path) + + loop = asyncio.get_event_loop() + + try: + scheduler = AsyncIOScheduler(event_loop=loop) + if ipcam_config['fix_enabled']: + scheduler.add_job(fix_job, 'interval', + seconds=ipcam_config['fix_interval'], + misfire_grace_time=None) + + scheduler.add_job(cleanup_job, 'interval', + seconds=ipcam_config['cleanup_interval'], + misfire_grace_time=None) + scheduler.start() + except KeyError: + pass + + asyncio.ensure_future(fix_job()) + asyncio.ensure_future(cleanup_job()) + + server = IpcamWebServer(Addr.fromstring(arg.listen)) + server.run() diff --git a/bin/lugovaya_pump_mqtt_bot.py b/bin/lugovaya_pump_mqtt_bot.py new file mode 100755 index 0000000..85402d1 --- /dev/null +++ b/bin/lugovaya_pump_mqtt_bot.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +import datetime +import __py_include + +from enum import Enum +from typing import Optional +from telegram import ReplyKeyboardMarkup, User + +from homekit.config import config, AppConfigUnit +from homekit.telegram import bot +from homekit.telegram.config import TelegramBotConfig +from homekit.telegram._botutil import user_any_name +from homekit.mqtt import MqttNode, MqttPayload, MqttNodesConfig, MqttWrapper +from homekit.mqtt.module.relay import MqttRelayState, MqttRelayModule +from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload + + +class LugovayaPumpMqttBotConfig(TelegramBotConfig, AppConfigUnit): + NAME = 'lugovaya_pump_mqtt_bot' + + @classmethod + def schema(cls) -> Optional[dict]: + return { + **TelegramBotConfig.schema(), + 'relay_node_id': { + 'type': 'string', + 'required': True + }, + } + + @staticmethod + def custom_validator(data): + relay_node_names = MqttNodesConfig().get_nodes(filters=('relay',), only_names=True) + if data['relay_node_id'] not in relay_node_names: + raise ValueError('unknown relay node "%s"' % (data['relay_node_id'],)) + + +config.load_app(LugovayaPumpMqttBotConfig) + +bot.initialize() +bot.lang.ru( + start_message="Выберите команду на клавиатуре", + start_message_no_access="Доступ запрещён. Вы можете отправить заявку на получение доступа.", + unknown_command="Неизвестная команда", + send_access_request="Отправить заявку", + management="Админка", + + enable="Включить", + enabled="Включен ✅", + + disable="Выключить", + disabled="Выключен ❌", + + status="Статус", + status_updated=' (обновлено %s)', + + done="Готово 👌", + user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.', + user_action_on="включил", + user_action_off="выключил", + date_yday="вчера", + date_yyday="позавчера", + date_at="в" +) +bot.lang.en( + start_message="Select command on the keyboard", + start_message_no_access="You have no access.", + unknown_command="Unknown command", + send_access_request="Send request", + management="Admin options", + + enable="Turn ON", + enable_silently="Turn ON silently", + enabled="Turned ON ✅", + + disable="Turn OFF", + disable_silently="Turn OFF silently", + disabled="Turned OFF ❌", + + status="Status", + status_updated=' (updated %s)', + + done="Done 👌", + user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.', + user_action_on="ON", + user_action_off="OFF", + + date_yday="yesterday", + date_yyday="the day before yesterday", + date_at="at" +) + + +mqtt: MqttWrapper +relay_state = MqttRelayState() +relay_module: MqttRelayModule + + +class UserAction(Enum): + ON = 'on' + OFF = 'off' + + +# def on_mqtt_message(home_id, message: MqttPayload): +# if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): +# kwargs = dict(rssi=message.rssi, enabled=message.flags.state) +# if isinstance(message, InitialDiagnosticsPayload): +# kwargs['fw_version'] = message.fw_version +# relay_state.update(**kwargs) + + +async def notify(user: User, action: UserAction) -> None: + def text_getter(lang: str): + action_name = bot.lang.get(f'user_action_{action.value}', lang) + user_name = user_any_name(user) + return 'ℹ ' + bot.lang.get('user_action_notification', lang, + user.id, user_name, action_name) + + await bot.notify_all(text_getter, exclude=(user.id,)) + + +@bot.handler(message='enable') +async def enable_handler(ctx: bot.Context) -> None: + relay_module.switchpower(True) + await ctx.reply(ctx.lang('done')) + await notify(ctx.user, UserAction.ON) + + +@bot.handler(message='disable') +async def disable_handler(ctx: bot.Context) -> None: + relay_module.switchpower(False) + await ctx.reply(ctx.lang('done')) + await notify(ctx.user, UserAction.OFF) + + +@bot.handler(message='status') +async def status(ctx: bot.Context) -> None: + label = ctx.lang('enabled') if relay_state.enabled else ctx.lang('disabled') + if relay_state.ever_updated: + date_label = '' + today = datetime.date.today() + if today != relay_state.update_time.date(): + yday = today - datetime.timedelta(days=1) + yyday = today - datetime.timedelta(days=2) + if yday == relay_state.update_time.date(): + date_label = ctx.lang('date_yday') + elif yyday == relay_state.update_time.date(): + date_label = ctx.lang('date_yyday') + else: + date_label = relay_state.update_time.strftime('%d.%m.%Y') + date_label += ' ' + date_label += ctx.lang('date_at') + ' ' + date_label += relay_state.update_time.strftime('%H:%M') + label += ctx.lang('status_updated', date_label) + await ctx.reply(label) + + +async def start(ctx: bot.Context) -> None: + if ctx.user_id in config['bot']['users']: + await ctx.reply(ctx.lang('start_message')) + else: + buttons = [ + [ctx.lang('send_access_request')] + ] + await ctx.reply(ctx.lang('start_message_no_access'), + markup=ReplyKeyboardMarkup(buttons, one_time_keyboard=False)) + + +@bot.exceptionhandler +def exception_handler(e: Exception, ctx: bot.Context) -> bool: + return False + + +@bot.defaultreplymarkup +def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [ + [ + ctx.lang('enable'), + ctx.lang('disable') + ], + # [ctx.lang('status')] + ] + # if ctx.user_id in config['bot']['admin_users']: + # buttons.append([ctx.lang('management')]) + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +node_data = MqttNodesConfig().get_node(config.app_config['relay_node_id']) + +mqtt = MqttWrapper(client_id='lugovaya_pump_mqtt_bot') +mqtt_node = MqttNode(node_id=config.app_config['relay_node_id'], + node_secret=node_data['password']) +module_kwargs = {} +try: + if node_data['relay']['legacy_topics']: + module_kwargs['legacy_topics'] = True +except KeyError: + pass +relay_module = mqtt_node.load_module('relay', **module_kwargs) +# mqtt_node.add_payload_callback(on_mqtt_message) +mqtt.add_node(mqtt_node) + +mqtt.connect_and_loop(loop_forever=False) + +bot.run(start_handler=start) + +mqtt.disconnect() diff --git a/bin/mqtt_node_util.py b/bin/mqtt_node_util.py new file mode 100755 index 0000000..cf451fd --- /dev/null +++ b/bin/mqtt_node_util.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +import os.path +import __py_include + +from time import sleep +from typing import Optional +from argparse import ArgumentParser, ArgumentError + +from homekit.config import config +from homekit.mqtt import MqttNode, MqttWrapper, get_mqtt_modules +from homekit.mqtt import MqttNodesConfig + +mqtt_node: Optional[MqttNode] = None +mqtt: Optional[MqttWrapper] = None + + +if __name__ == '__main__': + nodes_config = MqttNodesConfig() + + parser = ArgumentParser() + parser.add_argument('--node-id', type=str, required=True, choices=nodes_config.get_nodes(only_names=True)) + parser.add_argument('--modules', type=str, choices=get_mqtt_modules(), nargs='*', + help='mqtt modules to include') + parser.add_argument('--switch-relay', choices=[0, 1], type=int, + help='send relay state') + parser.add_argument('--legacy-relay', action='store_true') + parser.add_argument('--push-ota', type=str, metavar='OTA_FILENAME', + help='push OTA, receives path to firmware.bin') + + config.load_app(parser=parser, no_config=True) + arg = parser.parse_args() + + if arg.switch_relay is not None and 'relay' not in arg.modules: + raise ArgumentError(None, '--relay is only allowed when \'relay\' module included in --modules') + + mqtt = MqttWrapper(randomize_client_id=True, + client_id='mqtt_node_util') + mqtt_node = MqttNode(node_id=arg.node_id, + node_secret=nodes_config.get_node(arg.node_id)['password']) + + mqtt.add_node(mqtt_node) + + # must-have modules + ota_module = mqtt_node.load_module('ota') + mqtt_node.load_module('diagnostics') + + if arg.modules: + for m in arg.modules: + kwargs = {} + if m == 'relay' and arg.legacy_relay: + kwargs['legacy_topics'] = True + module_instance = mqtt_node.load_module(m, **kwargs) + if m == 'relay' and arg.switch_relay is not None: + module_instance.switchpower(arg.switch_relay == 1) + + try: + mqtt.connect_and_loop(loop_forever=False) + + if arg.push_ota: + if not os.path.exists(arg.push_ota): + raise OSError(f'--push-ota: file \"{arg.push_ota}\" does not exists') + ota_module.push_ota(arg.push_ota, 1) + + while True: + sleep(0.1) + + except KeyboardInterrupt: + mqtt.disconnect() diff --git a/bin/openwrt_log_analyzer.py b/bin/openwrt_log_analyzer.py new file mode 100755 index 0000000..5b14a2f --- /dev/null +++ b/bin/openwrt_log_analyzer.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +import __py_include +import homekit.telegram as telegram + +from homekit.telegram.config import TelegramChatsConfig +from homekit.util import validate_mac_address +from typing import Optional +from homekit.config import config, AppConfigUnit +from homekit.database import BotsDatabase, SimpleState + + +class OpenwrtLogAnalyzerConfig(AppConfigUnit): + @classmethod + def schema(cls) -> Optional[dict]: + return { + 'database_name': {'type': 'string', 'required': True}, + 'devices': { + 'type': 'dict', + 'keysrules': {'type': 'string'}, + 'valuesrules': { + 'type': 'string', + 'check_with': validate_mac_address + } + }, + 'limit': {'type': 'integer'}, + 'telegram_chat': {'type': 'string'}, + 'aps': { + 'type': 'list', + 'schema': {'type': 'integer'} + } + } + + @staticmethod + def custom_validator(data): + chats = TelegramChatsConfig() + if data['telegram_chat'] not in chats: + return ValueError(f'unknown telegram chat {data["telegram_chat"]}') + + +def main(mac: str, + title: str, + ap: int) -> int: + db = BotsDatabase() + + data = db.get_openwrt_logs(filter_text=mac, + min_id=state['last_id'], + access_point=ap, + limit=config['openwrt_log_analyzer']['limit']) + if not data: + return 0 + + max_id = 0 + for log in data: + if log.id > max_id: + max_id = log.id + + text = '\n'.join(map(lambda s: str(s), data)) + telegram.send_message(f'<b>{title} (AP #{ap})</b>\n\n' + text, config.app_config['telegram_chat']) + + return max_id + + +if __name__ == '__main__': + config.load_app(OpenwrtLogAnalyzerConfig) + for ap in config.app_config['aps']: + dbname = config.app_config['database_name'] + dbname = dbname.replace('.txt', f'-{ap}.txt') + + state = SimpleState(name=dbname, + default={'last_id': 0}) + + max_last_id = 0 + for name, mac in config['devices'].items(): + last_id = main(mac, title=name, ap=ap) + if last_id > max_last_id: + max_last_id = last_id + + if max_last_id: + state['last_id'] = max_last_id diff --git a/bin/openwrt_logger.py b/bin/openwrt_logger.py new file mode 100755 index 0000000..ec67542 --- /dev/null +++ b/bin/openwrt_logger.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 +import os +import __py_include + +from datetime import datetime +from typing import Tuple, List, Optional +from argparse import ArgumentParser +from homekit.config import config, AppConfigUnit +from homekit.database import SimpleState +from homekit.api import WebApiClient + + +class OpenwrtLoggerConfig(AppConfigUnit): + @classmethod + def schema(cls) -> Optional[dict]: + return dict( + database_name_template=dict(type='string', required=True) + ) + + +def parse_line(line: str) -> Tuple[int, str]: + space_pos = line.index(' ') + + date = line[:space_pos] + rest = line[space_pos+1:] + + return ( + int(datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z").timestamp()), + rest + ) + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--file', type=str, required=True, + help='openwrt log file') + parser.add_argument('--access-point', type=int, required=True, + help='access point number') + + arg = config.load_app(OpenwrtLoggerConfig, parser=parser) + + state = SimpleState(name=config.app_config['database_name_template'].replace('{ap}', str(arg.access_point)), + default=dict(seek=0, size=0)) + fsize = os.path.getsize(arg.file) + if fsize < state['size']: + state['seek'] = 0 + + with open(arg.file, 'r') as f: + if state['seek']: + # jump to the latest read position + f.seek(state['seek']) + + # read till the end of the file + content = f.read() + + # save new position + state['seek'] = f.tell() + state['size'] = fsize + + lines: List[Tuple[int, str]] = [] + + if content != '': + for line in content.strip().split('\n'): + if not line: + continue + + try: + lines.append(parse_line(line)) + except ValueError: + lines.append((0, line)) + + api = WebApiClient() + api.log_openwrt(lines, arg.access_point) diff --git a/bin/pio_build.py b/bin/pio_build.py new file mode 100644 index 0000000..539df44 --- /dev/null +++ b/bin/pio_build.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +import __py_include + +if __name__ == '__main__': + print('TODO')
\ No newline at end of file diff --git a/bin/pio_ini.py b/bin/pio_ini.py new file mode 100755 index 0000000..ee85732 --- /dev/null +++ b/bin/pio_ini.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +import os +import yaml +import re +import __py_include + +from argparse import ArgumentParser, ArgumentError +from homekit.pio import get_products, platformio_ini +from homekit.pio.exceptions import ProductConfigNotFoundError +from homekit.config import CONFIG_DIRECTORIES + + +def get_config(product: str) -> dict: + path = None + for directory in CONFIG_DIRECTORIES: + config_path = os.path.join(directory, 'pio', f'{product}.yaml') + if os.path.exists(config_path) and os.path.isfile(config_path): + path = config_path + break + if not path: + raise ProductConfigNotFoundError(f'pio/{product}.yaml not found') + with open(path, 'r') as f: + return yaml.safe_load(f) + + +def bsd_walk(product_config: dict, + f: callable): + try: + for define_name, define_extra_params in product_config['build_specific_defines'].items(): + define_name = re.sub(r'^CONFIG_', '', define_name) + kwargs = {} + if isinstance(define_extra_params, dict): + kwargs = define_extra_params + f(define_name, **kwargs) + except KeyError: + pass + + +# 'bsd' means 'build_specific_defines' +def bsd_parser(product_config: dict, + parser: ArgumentParser): + def f(define_name, **kwargs): + arg_kwargs = {} + define_name = define_name.lower().replace('_', '-') + + if 'type' in kwargs: + if kwargs['type'] in ('str', 'enum'): + arg_kwargs['type'] = str + if kwargs['type'] == 'enum' and 'list_config_key' in kwargs: + if not isinstance(product_config[kwargs['list_config_key']], list): + raise TypeError(f'product_config[{kwargs["list_config_key"]}] enum is not list') + if not product_config[kwargs['list_config_key']]: + raise ValueError(f'product_config[{kwargs["list_config_key"]}] enum cannot be empty') + arg_kwargs['choices'] = product_config[kwargs['list_config_key']] + if isinstance(product_config[kwargs['list_config_key']][0], int): + arg_kwargs['type'] = int + elif kwargs['type'] == 'int': + arg_kwargs['type'] = int + elif kwargs['type'] == 'bool': + arg_kwargs['action'] = 'store_true' + arg_kwargs['required'] = False + else: + raise TypeError(f'unsupported type {kwargs["type"]} for define {define_name}') + else: + arg_kwargs['action'] = 'store_true' + + if 'required' not in arg_kwargs: + arg_kwargs['required'] = True + parser.add_argument(f'--{define_name}', **arg_kwargs) + + bsd_walk(product_config, f) + + +def bsd_get(product_config: dict, + arg: object): + defines = {} + enums = [] + def f(define_name, **kwargs): + attr_name = define_name.lower() + attr_value = getattr(arg, attr_name) + if 'type' in kwargs: + if kwargs['type'] == 'enum': + enums.append(f'CONFIG_{define_name}') + defines[f'CONFIG_{define_name}'] = f'HOMEKIT_{attr_value.upper()}' + return + if kwargs['type'] == 'bool': + if attr_value is True: + defines[f'CONFIG_{define_name}'] = True + return + defines[f'CONFIG_{define_name}'] = str(attr_value) + bsd_walk(product_config, f) + return defines, enums + + +if __name__ == '__main__': + products = get_products() + + # first, get the product + product_parser = ArgumentParser(add_help=False) + product_parser.add_argument('--product', type=str, choices=products, required=True, + help='PIO product name') + arg, _ = product_parser.parse_known_args() + if not arg.product: + product = os.path.basename(os.path.realpath(os.getcwd())) + if product not in products: + raise ArgumentError(None, 'invalid product') + else: + product = arg.product + + product_config = get_config(product) + + # then everything else + parser = ArgumentParser(parents=[product_parser]) + parser.add_argument('--target', type=str, required=True, choices=product_config['targets'], + help='PIO build target') + parser.add_argument('--platform', default='espressif8266', type=str) + parser.add_argument('--framework', default='arduino', type=str) + parser.add_argument('--upload-port', default='/dev/ttyUSB0', type=str) + parser.add_argument('--monitor-speed', default=115200) + parser.add_argument('--debug', action='store_true') + parser.add_argument('--debug-network', action='store_true') + bsd_parser(product_config, parser) + arg = parser.parse_args() + + if arg.target not in product_config['targets']: + raise ArgumentError(None, f'target {arg.target} not found for product {product}') + + bsd, bsd_enums = bsd_get(product_config, arg) + + ini = platformio_ini(product_config=product_config, + target=arg.target, + build_specific_defines=bsd, + build_specific_defines_enums=bsd_enums, + platform=arg.platform, + framework=arg.framework, + upload_port=arg.upload_port, + monitor_speed=arg.monitor_speed, + debug=arg.debug, + debug_network=arg.debug_network) + print(ini) diff --git a/bin/polaris_kettle_bot.py b/bin/polaris_kettle_bot.py new file mode 100755 index 0000000..05c2aae --- /dev/null +++ b/bin/polaris_kettle_bot.py @@ -0,0 +1,743 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import __py_include +import logging +import locale +import queue +import time +import threading +import paho.mqtt.client as mqtt + +from homekit.telegram import bot +from homekit.mqtt import Mqtt +from homekit.config import config +from homekit.util import chunks +from syncleo import ( + Kettle, + PowerType, + DeviceListener, + IncomingMessageListener, + ConnectionStatusListener, + ConnectionStatus +) +import syncleo.protocol as kettle_proto +from typing import Optional, Tuple, List, Union +from collections import namedtuple +from functools import partial +from datetime import datetime +from abc import abstractmethod +from telegram.error import TelegramError +from telegram import ( + ReplyKeyboardMarkup, + InlineKeyboardMarkup, + InlineKeyboardButton, + Message +) +from telegram.ext import ( + CallbackQueryHandler, + MessageHandler, + CommandHandler +) + +logger = logging.getLogger(__name__) +config.load_app('polaris_kettle_bot') + +primary_choices = (70, 80, 90, 100) +all_choices = range( + config['kettle']['temp_min'], + config['kettle']['temp_max']+1, + config['kettle']['temp_step']) + +bot.initialize() +bot.lang.ru( + start_message="Выберите команду на клавиатуре:", + invalid_command="Неизвестная команда", + unexpected_callback_data="Ошибка: неверные данные", + disable="❌ Выключить", + server_error="Ошибка сервера", + back="🔙 Назад", + smth_went_wrong="😱 Что-то пошло не так", + + # /status + status_not_connected="😟 Связь с чайником не установлена", + status_on="🟢 Чайник <b>включён</b> (до <b>%d °C</b>)", + status_off="🔴 Чайник <b>выключен</b>", + status_current_temp="Сейчас: <b>%d °C</b>", + status_update_time="<i>Обновлено %s</i>", + status_update_time_fmt="%d %b в %H:%M:%S", + + # /temp + select_temperature="Выберите температуру:", + + # enable/disable + enabling="💤 Чайник включается...", + disabling="💤 Чайник выключается...", + enabled="🟢 Чайник <b>включён</b>.", + enabled_target="%s Цель: <b>%d °C</b>", + enabled_reached="✅ <b>Готово!</b> Чайник вскипел, температура <b>%d °C</b>.", + disabled="✅ Чайник <b>выключен</b>.", + please_wait="⏳ Ожидайте..." +) +bot.lang.en( + start_message="Select command on the keyboard:", + invalid_command="Unknown command", + unexpected_callback_data="Unexpected callback data", + disable="❌ Turn OFF", + server_error="Server error", + back="🔙 Back", + smth_went_wrong="😱 Something went wrong", + + # /status + status_not_connected="😟 No connection", + status_on="🟢 Turned <b>ON</b>! Target: <b>%d °C</b>", + status_off="🔴 Turned <b>OFF</b>", + status_current_temp="Now: <b>%d °C</b>", + status_update_time="<i>Updated on %s</i>", + status_update_time_fmt="%b %d, %Y at %H:%M:%S", + + # /temp + select_temperature="Select a temperature:", + + # enable/disable + enabling="💤 Turning on...", + disabling="💤 Turning off...", + enabled="🟢 The kettle is <b>turned ON</b>.", + enabled_target="%s Target: <b>%d °C</b>", + enabled_reached="✅ <b>Done</b>! The kettle has boiled, the temperature is <b>%d °C</b>.", + disabled="✅ The kettle is <b>turned OFF</b>.", + please_wait="⏳ Please wait..." +) + +kc: Optional[KettleController] = None +RenderedContent = Tuple[str, Optional[Union[InlineKeyboardMarkup, ReplyKeyboardMarkup]]] +tasks_lock = threading.Lock() + + +def run_tasks(tasks: queue.SimpleQueue, done: callable): + def next_task(r: Optional[kettle_proto.MessageResponse]): + if r is not None: + try: + assert r is not False, 'server error' + except AssertionError as exc: + logger.exception(exc) + tasks_lock.release() + return done(False) + + if not tasks.empty(): + task = tasks.get() + args = task[1:] + args.append(next_task) + f = getattr(kc.kettle, task[0]) + f(*args) + else: + tasks_lock.release() + return done(True) + + tasks_lock.acquire() + next_task(None) + + +def temperature_emoji(temp: int) -> str: + if temp > 90: + return '🔥' + elif temp >= 40: + return '♨️' + elif temp >= 35: + return '🌡' + else: + return '❄️' + + +class KettleInfoListener: + @abstractmethod + def info_updated(self, field: str): + pass + + +# class that holds data coming from the kettle over mqtt +class KettleInfo: + update_time: int + _mode: Optional[PowerType] + _temperature: Optional[int] + _target_temperature: Optional[int] + _update_listener: KettleInfoListener + + def __init__(self, update_listener: KettleInfoListener): + self.update_time = 0 + self._mode = None + self._temperature = None + self._target_temperature = None + self._update_listener = update_listener + + def _update(self, field: str): + self.update_time = int(time.time()) + if self._update_listener: + self._update_listener.info_updated(field) + + @property + def temperature(self) -> int: + return self._temperature + + @temperature.setter + def temperature(self, value: int): + self._temperature = value + self._update('temperature') + + @property + def mode(self) -> PowerType: + return self._mode + + @mode.setter + def mode(self, value: PowerType): + self._mode = value + self._update('mode') + + @property + def target_temperature(self) -> int: + return self._target_temperature + + @target_temperature.setter + def target_temperature(self, value: int): + self._target_temperature = value + self._update('target_temperature') + + +class KettleController(threading.Thread, + Mqtt, + DeviceListener, + IncomingMessageListener, + KettleInfoListener, + ConnectionStatusListener): + kettle: Kettle + info: KettleInfo + + _logger: logging.Logger + _stopped: bool + _restart_server_at: int + _lock: threading.Lock + _info_lock: threading.Lock + _accumulated_updates: dict + _info_flushed_time: float + _mqtt_root_topic: str + _muts: List[MessageUpdatingTarget] + + def __init__(self): + # basic setup + Mqtt.__init__(self, clean_session=False) + threading.Thread.__init__(self) + + self._logger = logging.getLogger(self.__class__.__name__) + + self.kettle = Kettle(mac=config['kettle']['mac'], + device_token=config['kettle']['token'], + read_timeout=config['kettle']['read_timeout']) + self.kettle_reconnect() + + # info + self.info = KettleInfo(update_listener=self) + self._accumulated_updates = {} + self._info_flushed_time = 0 + + # mqtt + self._mqtt_root_topic = '/polaris/6/'+config['kettle']['token']+'/#' + self.connect_and_loop(loop_forever=False) + + # thread loop related + self._stopped = False + # self._lock = threading.Lock() + self._info_lock = threading.Lock() + self._restart_server_at = 0 + + # bot + self._muts = [] + self._muts_lock = threading.Lock() + + self.start() + + def kettle_reconnect(self): + self.kettle.discover(wait=False, listener=self) + + def stop_all(self): + self.kettle.stop_all() + self._stopped = True + + def add_updating_message(self, mut: MessageUpdatingTarget): + with self._muts_lock: + for m in self._muts: + if m.user_id == m.user_id and m.user_did_turn_on() or m.user_did_turn_on() != mut.user_did_turn_on(): + m.delete() + self._muts.append(mut) + + # --------------------- + # threading.Thread impl + + def run(self): + while not self._stopped: + updates = [] + deletions = [] + forget = [] + + with self._muts_lock and self._info_lock: + if self._muts and self._accumulated_updates and (self._info_flushed_time == 0 or time.time() - self._info_flushed_time >= 1): + deletions = [] + + for mut in self._muts: + upd = mut.update( + mode=self.info.mode, + current_temp=self.info.temperature, + target_temp=self.info.target_temperature) + + if upd.finished or upd.delete: + forget.append(mut) + + if upd.delete: + deletions.append((mut, upd)) + + elif upd.changed: + updates.append((mut, upd)) + + self._info_flushed_time = time.time() + self._accumulated_updates = {} + + # edit messages + for mut, upd in updates: + self._logger.debug(f'loop: got update: {upd}') + try: + do_edit = True + if upd.finished: + # try to delete the old message and send a new one, to notify user more effectively + try: + bot.delete_message(upd.user_id, upd.message_id) + do_edit = False + except TelegramError as exc: + self._logger.error(f'loop: failed to delete old message (in order to send a new one)') + self._logger.exception(exc) + + if do_edit: + bot.edit_message_text(upd.user_id, upd.message_id, + text=upd.html, + reply_markup=upd.markup) + else: + bot.notify_user(upd.user_id, upd.html, reply_markup=upd.markup) + except TelegramError as exc: + if "Message can't be edited" in exc.message: + self._logger.warning("message can't be edited, adding it to forget list") + forget.append(upd) + + self._logger.error(f'loop: edit_message_text failed for update: {upd}') + self._logger.exception(exc) + + # delete messages + for mut, upd in deletions: + self._logger.debug(f'loop: got deletion: {upd}') + try: + bot.delete_message(upd.user_id, upd.message_id) + except TelegramError as exc: + self._logger.error(f'loop: delete_message failed for update: {upd}') + self._logger.exception(exc) + + # delete muts, if needed + if forget: + with self._muts_lock: + for mut in forget: + self._logger.debug(f'loop: removing mut {mut}') + self._muts.remove(mut) + + time.sleep(0.5) + + # ------------------- + # DeviceListener impl + + def device_updated(self): + self._logger.info(f'device updated: {self.kettle.device.si}') + self.kettle.start_server_if_needed(incoming_message_listener=self, + connection_status_listener=self) + + # ----------------------- + # KettleInfoListener impl + + def info_updated(self, field: str): + with self._info_lock: + newval = getattr(self.info, field) + self._logger.debug(f'info_updated: updated {field}, new value is {newval}') + self._accumulated_updates[field] = newval + + # ---------------------------- + # IncomingMessageListener impl + + def incoming_message(self, message: kettle_proto.Message) -> Optional[kettle_proto.Message]: + self._logger.info(f'incoming message: {message}') + + if isinstance(message, kettle_proto.ModeMessage): + self.info.mode = message.pt + elif isinstance(message, kettle_proto.CurrentTemperatureMessage): + self.info.temperature = message.current_temperature + elif isinstance(message, kettle_proto.TargetTemperatureMessage): + self.info.target_temperature = message.temperature + + return kettle_proto.AckMessage() + + # ----------------------------- + # ConnectionStatusListener impl + + def connection_status_updated(self, status: ConnectionStatus): + self._logger.info(f'connection status updated: {status}') + if status == ConnectionStatus.DISCONNECTED: + self.kettle.stop_all() + self.kettle_reconnect() + + # ------------- + # MQTTBase impl + + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + client.subscribe(self._mqtt_root_topic, qos=1) + self._logger.info(f'subscribed to {self._mqtt_root_topic}') + + def on_message(self, client: mqtt.Client, userdata, msg): + try: + topic = msg.topic[len(self._mqtt_root_topic)-2:] + pld = msg.payload.decode() + + self._logger.debug(f'mqtt: on message: topic={topic} pld={pld}') + + if topic == 'state/sensor/temperature': + self.info.temperature = int(float(pld)) + elif topic == 'state/mode': + self.info.mode = PowerType(int(pld)) + elif topic == 'state/temperature': + self.info.target_temperature = int(float(pld)) + + except Exception as e: + self._logger.exception(str(e)) + + +class Renderer: + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'<b>{ctx.lang("settings")}</b>\n\n' + html += ctx.lang('select_place') + return html, None + + @classmethod + def status(cls, ctx: bot.Context, + connected: bool, + mode: PowerType, + current_temp: int, + target_temp: int, + update_time: int) -> RenderedContent: + if not connected: + return cls.not_connected(ctx) + else: + # power status + if mode != PowerType.OFF: + html = ctx.lang('status_on', target_temp) + else: + html = ctx.lang('status_off') + + # current temperature + html += '\n' + html += ctx.lang('status_current_temp', current_temp) + + # updated on + html += '\n' + html += cls.updated(ctx, update_time) + + return html, None + + @classmethod + def temp(cls, ctx: bot.Context, choices) -> RenderedContent: + buttons = [] + for chunk in chunks(choices, 5): + buttons.append([f'{temperature_emoji(n)} {n}' for n in chunk]) + buttons.append([ctx.lang('back')]) + return ctx.lang('select_temperature'), ReplyKeyboardMarkup(buttons) + + @classmethod + def turned_on(cls, ctx: bot.Context, + target_temp: int, + current_temp: int, + mode: PowerType, + update_time: Optional[int] = None, + reached=False, + no_keyboard=False) -> RenderedContent: + if mode == PowerType.OFF and not reached: + html = ctx.lang('enabling') + else: + if not reached: + html = ctx.lang('enabled') + + # target temperature + html += '\n' + html += ctx.lang('enabled_target', temperature_emoji(target_temp), target_temp) + + # current temperature + html += '\n' + html += temperature_emoji(current_temp) + ' ' + html += ctx.lang('status_current_temp', current_temp) + else: + html = ctx.lang('enabled_reached', current_temp) + + # updated on + if not reached and update_time is not None: + html += '\n' + html += cls.updated(ctx, update_time) + + return html, None if no_keyboard else cls.wait_buttons(ctx) + + @classmethod + def turned_off(cls, ctx: bot.Context, + mode: PowerType, + update_time: Optional[int] = None, + reached=False, + no_keyboard=False) -> RenderedContent: + if mode != PowerType.OFF: + html = ctx.lang('disabling') + else: + html = ctx.lang('disabled') + + # updated on + if not reached and update_time is not None: + html += '\n' + html += cls.updated(ctx, update_time) + + return html, None if no_keyboard else cls.wait_buttons(ctx) + + @classmethod + def not_connected(cls, ctx: bot.Context) -> RenderedContent: + return ctx.lang('status_not_connected'), None + + @classmethod + def smth_went_wrong(cls, ctx: bot.Context) -> RenderedContent: + html = ctx.lang('smth_went_wrong') + return html, None + + @classmethod + def updated(cls, ctx: bot.Context, update_time: int): + locale_bak = locale.getlocale(locale.LC_TIME) + locale.setlocale(locale.LC_TIME, 'ru_RU.UTF-8' if ctx.user_lang == 'ru' else 'en_US.UTF-8') + dt = datetime.fromtimestamp(update_time) + html = ctx.lang('status_update_time', dt.strftime(ctx.lang('status_update_time_fmt'))) + locale.setlocale(locale.LC_TIME, locale_bak) + return html + + @classmethod + def wait_buttons(cls, ctx: bot.Context): + return InlineKeyboardMarkup([ + [ + InlineKeyboardButton(ctx.lang('please_wait'), callback_data='wait') + ] + ]) + + +MUTUpdate = namedtuple('MUTUpdate', 'message_id, user_id, finished, changed, delete, html, markup') + + +class MessageUpdatingTarget: + ctx: bot.Context + message: Message + user_target_temp: Optional[int] + user_enabled_power_mode: PowerType + initial_power_mode: PowerType + need_to_delete: bool + rendered_content: Optional[RenderedContent] + + def __init__(self, + ctx: bot.Context, + message: Message, + user_enabled_power_mode: PowerType, + initial_power_mode: PowerType, + user_target_temp: Optional[int] = None): + self.ctx = ctx + self.message = message + self.initial_power_mode = initial_power_mode + self.user_enabled_power_mode = user_enabled_power_mode + self.ignore_pm = initial_power_mode is PowerType.OFF and self.user_did_turn_on() + self.user_target_temp = user_target_temp + self.need_to_delete = False + self.rendered_content = None + self.last_reported_temp = None + + def set_rendered_content(self, content: RenderedContent): + self.rendered_content = content + + def rendered_content_changed(self, content: RenderedContent) -> bool: + return content != self.rendered_content + + def update(self, + mode: PowerType, + current_temp: int, + target_temp: int) -> MUTUpdate: + + # determine whether status updating is finished + finished = False + reached = False + if self.ignore_pm: + if mode != PowerType.OFF: + self.ignore_pm = False + elif mode == PowerType.OFF: + reached = True + if self.user_did_turn_on(): + # when target is 100 degrees, this kettle sometimes turns off at 91, sometimes at 95, sometimes at 98. + # it's totally unpredictable, so in this case, we keep updating the message until it reaches at least 97 + # degrees, or if temperature started dropping. + if self.user_target_temp < 100 \ + or current_temp >= self.user_target_temp - 3 \ + or current_temp < self.last_reported_temp: + finished = True + else: + finished = True + + self.last_reported_temp = current_temp + + # render message + if self.user_did_turn_on(): + rc = Renderer.turned_on(self.ctx, + target_temp=target_temp, + current_temp=current_temp, + mode=mode, + reached=reached, + no_keyboard=finished) + else: + rc = Renderer.turned_off(self.ctx, + mode=mode, + reached=reached, + no_keyboard=finished) + + changed = self.rendered_content_changed(rc) + update = MUTUpdate(message_id=self.message.message_id, + user_id=self.ctx.user_id, + finished=finished, + changed=changed, + delete=self.need_to_delete, + html=rc[0], + markup=rc[1]) + if changed: + self.set_rendered_content(rc) + return update + + def user_did_turn_on(self) -> bool: + return self.user_enabled_power_mode in (PowerType.ON, PowerType.CUSTOM) + + def delete(self): + self.need_to_delete = True + + @property + def user_id(self) -> int: + return self.ctx.user_id + + +@bot.handler(command='status') +def status(ctx: bot.Context) -> None: + text, markup = Renderer.status(ctx, + connected=kc.kettle.is_connected(), + mode=kc.info.mode, + current_temp=kc.info.temperature, + target_temp=kc.info.target_temperature, + update_time=kc.info.update_time) + ctx.reply(text, markup=markup) + + +@bot.handler(command='temp') +def temp(ctx: bot.Context) -> None: + text, markup = Renderer.temp( + ctx, choices=all_choices) + ctx.reply(text, markup=markup) + + +def enable(temp: int, ctx: bot.Context) -> None: + if not kc.kettle.is_connected(): + text, markup = Renderer.not_connected(ctx) + ctx.reply(text, markup=markup) + return + + tasks = queue.SimpleQueue() + if temp == 100: + power_mode = PowerType.ON + else: + power_mode = PowerType.CUSTOM + tasks.put(['set_target_temperature', temp]) + tasks.put(['set_power', power_mode]) + + def done(ok: bool): + if not ok: + html, markup = Renderer.smth_went_wrong(ctx) + else: + html, markup = Renderer.turned_on(ctx, + target_temp=temp, + current_temp=kc.info.temperature, + mode=kc.info.mode) + message = ctx.reply(html, markup=markup) + logger.debug(f'ctx.reply returned message: {message}') + + if ok: + mut = MessageUpdatingTarget(ctx, message, + initial_power_mode=kc.info.mode, + user_enabled_power_mode=power_mode, + user_target_temp=temp) + mut.set_rendered_content((html, markup)) + kc.add_updating_message(mut) + + run_tasks(tasks, done) + + +@bot.handler(message='disable') +def disable(ctx: bot.Context): + if not kc.kettle.is_connected(): + text, markup = Renderer.not_connected(ctx) + ctx.reply(text, markup=markup) + return + + def done(ok: bool): + mode = kc.info.mode + if not ok: + html, markup = Renderer.smth_went_wrong(ctx) + else: + kw = {} + if mode == PowerType.OFF: + kw['reached'] = True + kw['no_keyboard'] = True + html, markup = Renderer.turned_off(ctx, mode=mode, **kw) + message = ctx.reply(html, markup=markup) + logger.debug(f'ctx.reply returned message: {message}') + + if ok and mode != PowerType.OFF: + mut = MessageUpdatingTarget(ctx, message, + initial_power_mode=mode, + user_enabled_power_mode=PowerType.OFF) + mut.set_rendered_content((html, markup)) + kc.add_updating_message(mut) + + tasks = queue.SimpleQueue() + tasks.put(['set_power', PowerType.OFF]) + run_tasks(tasks, done) + + +@bot.handler(message='back') +def back(ctx: bot.Context): + bot.start(ctx) + + +@bot.defaultreplymarkup +def defaultmarkup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [ + [f'{temperature_emoji(n)} {n}' for n in primary_choices], + [ctx.lang('disable')] + ] + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +if __name__ == '__main__': + for temp in primary_choices: + bot.handler(text=f'{temperature_emoji(temp)} {temp}')(partial(enable, temp)) + + for temp in all_choices: + bot.handler(text=f'{temperature_emoji(temp)} {temp}')(partial(enable, temp)) + + kc = KettleController() + + bot.run() + + # bot library handles signals, so when sigterm or something like that happens, we should stop all other threads here + kc.stop_all() diff --git a/bin/polaris_kettle_util.py b/bin/polaris_kettle_util.py new file mode 100755 index 0000000..4db0ed4 --- /dev/null +++ b/bin/polaris_kettle_util.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: BSD-3-Clause + +import logging +import sys +import paho.mqtt.client as mqtt +import __py_include + +from typing import Optional +from argparse import ArgumentParser +from queue import SimpleQueue +from homekit.mqtt import Mqtt +from homekit.config import config +from syncleo import ( + Kettle, + PowerType, + protocol as kettle_proto +) + +k: Optional[Kettle] = None +logger = logging.getLogger(__name__) +control_tasks = SimpleQueue() + + +class MqttServer(Mqtt): + def __init__(self): + super().__init__(clean_session=False) + + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + logger.info("subscribing to #") + client.subscribe('#', qos=1) + + def on_message(self, client: mqtt.Client, userdata, msg): + try: + print(msg.topic, msg.payload) + + except Exception as e: + logger.exception(str(e)) + + +def kettle_connection_established(response: kettle_proto.MessageResponse): + try: + assert isinstance(response, kettle_proto.AckMessage), f'ACK expected, but received: {response}' + except AssertionError: + k.stop_all() + return + + def next_task(response: kettle_proto.MessageResponse): + try: + assert response is not False, 'server error' + except AssertionError: + k.stop_all() + return + + if not control_tasks.empty(): + task = control_tasks.get() + f, args = task(k) + args.append(next_task) + f(*args) + else: + k.stop_all() + + next_task(response) + + +def main(): + tempmin = 30 + tempmax = 100 + tempstep = 5 + + parser = ArgumentParser() + parser.add_argument('-m', dest='mode', required=True, type=str, choices=('mqtt', 'control')) + parser.add_argument('--on', action='store_true') + parser.add_argument('--off', action='store_true') + parser.add_argument('-t', '--temperature', dest='temp', type=int, default=tempmax, + choices=range(tempmin, tempmax+tempstep, tempstep)) + + arg = config.load_app('polaris_kettle_util', use_cli=True, parser=parser) + + if arg.mode == 'mqtt': + server = MqttServer() + try: + server.connect_and_loop(loop_forever=True) + except KeyboardInterrupt: + pass + + elif arg.mode == 'control': + if arg.on and arg.off: + raise RuntimeError('--on and --off are mutually exclusive') + + if arg.off: + control_tasks.put(lambda k: (k.set_power, [PowerType.OFF])) + else: + if arg.temp == tempmax: + control_tasks.put(lambda k: (k.set_power, [PowerType.ON])) + else: + control_tasks.put(lambda k: (k.set_target_temperature, [arg.temp])) + control_tasks.put(lambda k: (k.set_power, [PowerType.CUSTOM])) + + k = Kettle(mac=config['kettle']['mac'], device_token=config['kettle']['token']) + info = k.discover() + if not info: + print('no device found.') + return 1 + + print('found service:', info) + k.start_server_if_needed(kettle_connection_established) + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/bin/pump_bot.py b/bin/pump_bot.py new file mode 100755 index 0000000..e00e844 --- /dev/null +++ b/bin/pump_bot.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +import __py_include +import sys +import asyncio + +from enum import Enum +from typing import Optional, Union +from telegram import ReplyKeyboardMarkup, User +from time import time +from datetime import datetime + +from homekit.config import config, is_development_mode, AppConfigUnit +from homekit.telegram import bot +from homekit.telegram.config import TelegramBotConfig, TelegramUserListType +from homekit.telegram._botutil import user_any_name +from homekit.relay.sunxi_h3_client import RelayClient +from homekit.mqtt import MqttNode, MqttWrapper, MqttPayload, MqttNodesConfig, MqttModule +from homekit.mqtt.module.relay import MqttPowerStatusPayload, MqttRelayModule +from homekit.mqtt.module.temphum import MqttTemphumDataPayload +from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload + + +if __name__ != '__main__': + print(f'this script can not be imported as module', file=sys.stderr) + sys.exit(1) + + +mqtt_nodes_config = MqttNodesConfig() + + +class PumpBotUserListType(TelegramUserListType): + SILENT = 'silent_users' + + +class PumpBotConfig(AppConfigUnit, TelegramBotConfig): + NAME = 'pump_bot' + + @classmethod + def schema(cls) -> Optional[dict]: + return { + **super(TelegramBotConfig).schema(), + PumpBotUserListType.SILENT: TelegramBotConfig._userlist_schema(), + 'watering_relay_node': {'type': 'string'}, + 'pump_relay_addr': cls._addr_schema() + } + + @staticmethod + def custom_validator(data): + relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True) + if data['watering_relay_node'] not in relay_node_names: + raise ValueError(f'unknown relay node "{data["watering_relay_node"]}"') + + +config.load_app(PumpBotConfig) + +mqtt: MqttWrapper +mqtt_node: MqttNode +mqtt_relay_module: Union[MqttRelayModule, MqttModule] + +time_format = '%d.%m.%Y, %H:%M:%S' + +watering_mcu_status = { + 'last_time': 0, + 'last_boot_time': 0, + 'relay_opened': False, + 'ambient_temp': 0.0, + 'ambient_rh': 0.0, +} + +bot.initialize() +bot.lang.ru( + start_message="Выберите команду на клавиатуре", + unknown_command="Неизвестная команда", + + enable="Включить", + enable_silently="Включить тихо", + enabled="Насос включен ✅", + + disable="Выключить", + disable_silently="Выключить тихо", + disabled="Насос выключен ❌", + + start_watering="Включить полив", + stop_watering="Отключить полив", + + status="Статус насоса", + watering_status="Статус полива", + + done="Готово 👌", + sent="Команда отправлена", + + user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.', + user_watering_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> полив.', + user_action_on="включил", + user_action_off="выключил", + user_action_watering_on="включил", + user_action_watering_off="выключил", +) +bot.lang.en( + start_message="Select command on the keyboard", + unknown_command="Unknown command", + + enable="Turn ON", + enable_silently="Turn ON silently", + enabled="The pump is turned ON ✅", + + disable="Turn OFF", + disable_silently="Turn OFF silently", + disabled="The pump is turned OFF ❌", + + start_watering="Start watering", + stop_watering="Stop watering", + + status="Pump status", + watering_status="Watering status", + + done="Done 👌", + sent="Request sent", + + user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.', + user_watering_notification='User <a href="tg://user?id=%d">%s</a> <b>%s</b> the watering.', + user_action_on="ON", + user_action_off="OFF", + user_action_watering_on="started", + user_action_watering_off="stopped", +) + + +class UserAction(Enum): + ON = 'on' + OFF = 'off' + WATERING_ON = 'watering_on' + WATERING_OFF = 'watering_off' + + +def get_relay() -> RelayClient: + relay = RelayClient(host=config.app_config['pump_relay_addr'].host, + port=config.app_config['pump_relay_addr'].port) + relay.connect() + return relay + + +async def on(ctx: bot.Context, silent=False) -> None: + get_relay().on() + futures = [ctx.reply(ctx.lang('done'))] + if not silent: + futures.append(notify(ctx.user, UserAction.ON)) + await asyncio.gather(*futures) + + +async def off(ctx: bot.Context, silent=False) -> None: + get_relay().off() + futures = [ctx.reply(ctx.lang('done'))] + if not silent: + futures.append(notify(ctx.user, UserAction.OFF)) + await asyncio.gather(*futures) + + +async def watering_on(ctx: bot.Context) -> None: + mqtt_relay_module.switchpower(True) + await asyncio.gather( + ctx.reply(ctx.lang('sent')), + notify(ctx.user, UserAction.WATERING_ON) + ) + + +async def watering_off(ctx: bot.Context) -> None: + mqtt_relay_module.switchpower(False) + await asyncio.gather( + ctx.reply(ctx.lang('sent')), + notify(ctx.user, UserAction.WATERING_OFF) + ) + + +async def notify(user: User, action: UserAction) -> None: + notification_key = 'user_watering_notification' if action in (UserAction.WATERING_ON, UserAction.WATERING_OFF) else 'user_action_notification' + + def text_getter(lang: str): + action_name = bot.lang.get(f'user_action_{action.value}', lang) + user_name = user_any_name(user) + return 'ℹ ' + bot.lang.get(notification_key, lang, + user.id, user_name, action_name) + + await bot.notify_all(text_getter, exclude=(user.id,)) + + +@bot.handler(message='enable') +async def enable_handler(ctx: bot.Context) -> None: + await on(ctx) + + +@bot.handler(message='enable_silently') +async def enable_s_handler(ctx: bot.Context) -> None: + await on(ctx, True) + + +@bot.handler(message='disable') +async def disable_handler(ctx: bot.Context) -> None: + await off(ctx) + + +@bot.handler(message='start_watering') +async def start_watering(ctx: bot.Context) -> None: + await watering_on(ctx) + + +@bot.handler(message='stop_watering') +async def stop_watering(ctx: bot.Context) -> None: + await watering_off(ctx) + + +@bot.handler(message='disable_silently') +async def disable_s_handler(ctx: bot.Context) -> None: + await off(ctx, True) + + +@bot.handler(message='status') +async def status(ctx: bot.Context) -> None: + await ctx.reply( + ctx.lang('enabled') if get_relay().status() == 'on' else ctx.lang('disabled') + ) + + +def _get_timestamp_as_string(timestamp: int) -> str: + if timestamp != 0: + return datetime.fromtimestamp(timestamp).strftime(time_format) + else: + return 'unknown' + + +@bot.handler(message='watering_status') +async def watering_status(ctx: bot.Context) -> None: + buf = '' + if 0 < watering_mcu_status["last_time"] < time()-1800: + buf += '<b>WARNING! long time no reports from mcu! maybe something\'s wrong</b>\n' + buf += f'last report time: <b>{_get_timestamp_as_string(watering_mcu_status["last_time"])}</b>\n' + if watering_mcu_status["last_boot_time"] != 0: + buf += f'boot time: <b>{_get_timestamp_as_string(watering_mcu_status["last_boot_time"])}</b>\n' + buf += 'relay opened: <b>' + ('yes' if watering_mcu_status['relay_opened'] else 'no') + '</b>\n' + buf += f'ambient temp & humidity: <b>{watering_mcu_status["ambient_temp"]} °C, {watering_mcu_status["ambient_rh"]}%</b>' + await ctx.reply(buf) + + +@bot.defaultreplymarkup +def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [] + if ctx.user_id in config.app_config.get_user_ids(PumpBotUserListType.SILENT): + buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')]) + buttons.append([ctx.lang('enable'), ctx.lang('disable'), ctx.lang('status')],) + buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering'), ctx.lang('watering_status')]) + + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +def mqtt_payload_callback(mqtt_node: MqttNode, payload: MqttPayload): + global watering_mcu_status + + types_the_node_can_send = ( + InitialDiagnosticsPayload, + DiagnosticsPayload, + MqttTemphumDataPayload, + MqttPowerStatusPayload + ) + for cl in types_the_node_can_send: + if isinstance(payload, cl): + watering_mcu_status['last_time'] = int(time()) + break + + if isinstance(payload, InitialDiagnosticsPayload): + watering_mcu_status['last_boot_time'] = int(time()) + + elif isinstance(payload, MqttTemphumDataPayload): + watering_mcu_status['ambient_temp'] = payload.temp + watering_mcu_status['ambient_rh'] = payload.rh + + elif isinstance(payload, MqttPowerStatusPayload): + watering_mcu_status['relay_opened'] = payload.opened + + +mqtt = MqttWrapper(client_id='pump_bot') +mqtt_node = MqttNode(node_id=config.app_config['watering_relay_node']) +if is_development_mode(): + mqtt_node.load_module('diagnostics') + +mqtt_node.load_module('temphum') +mqtt_relay_module = mqtt_node.load_module('relay') + +mqtt_node.add_payload_callback(mqtt_payload_callback) + +mqtt.connect_and_loop(loop_forever=False) + +bot.run() + +try: + mqtt.disconnect() +except: + pass diff --git a/bin/pump_mqtt_bot.py b/bin/pump_mqtt_bot.py new file mode 100755 index 0000000..aea1451 --- /dev/null +++ b/bin/pump_mqtt_bot.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +import datetime +import __py_include + +from enum import Enum +from typing import Optional +from telegram import ReplyKeyboardMarkup, User + +from homekit.config import config +from homekit.telegram import bot +from homekit.telegram._botutil import user_any_name +from homekit.mqtt import MqttNode, MqttPayload +from homekit.mqtt.module.relay import MqttRelayState +from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload + + +config.load_app('pump_mqtt_bot') + +bot.initialize() +bot.lang.ru( + start_message="Выберите команду на клавиатуре", + start_message_no_access="Доступ запрещён. Вы можете отправить заявку на получение доступа.", + unknown_command="Неизвестная команда", + send_access_request="Отправить заявку", + management="Админка", + + enable="Включить", + enabled="Включен ✅", + + disable="Выключить", + disabled="Выключен ❌", + + status="Статус", + status_updated=' (обновлено %s)', + + done="Готово 👌", + user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.', + user_action_on="включил", + user_action_off="выключил", + date_yday="вчера", + date_yyday="позавчера", + date_at="в" +) +bot.lang.en( + start_message="Select command on the keyboard", + start_message_no_access="You have no access.", + unknown_command="Unknown command", + send_access_request="Send request", + management="Admin options", + + enable="Turn ON", + enable_silently="Turn ON silently", + enabled="Turned ON ✅", + + disable="Turn OFF", + disable_silently="Turn OFF silently", + disabled="Turned OFF ❌", + + status="Status", + status_updated=' (updated %s)', + + done="Done 👌", + user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.', + user_action_on="ON", + user_action_off="OFF", + + date_yday="yesterday", + date_yyday="the day before yesterday", + date_at="at" +) + + +mqtt: Optional[MqttNode] = None +relay_state = MqttRelayState() + + +class UserAction(Enum): + ON = 'on' + OFF = 'off' + + +def on_mqtt_message(home_id, message: MqttPayload): + if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): + kwargs = dict(rssi=message.rssi, enabled=message.flags.state) + if isinstance(message, InitialDiagnosticsPayload): + kwargs['fw_version'] = message.fw_version + relay_state.update(**kwargs) + + +def notify(user: User, action: UserAction) -> None: + def text_getter(lang: str): + action_name = bot.lang.get(f'user_action_{action.value}', lang) + user_name = user_any_name(user) + return 'ℹ ' + bot.lang.get('user_action_notification', lang, + user.id, user_name, action_name) + + bot.notify_all(text_getter, exclude=(user.id,)) + + +@bot.handler(message='enable') +def enable_handler(ctx: bot.Context) -> None: + mqtt.set_power(config['mqtt']['home_id'], True) + ctx.reply(ctx.lang('done')) + notify(ctx.user, UserAction.ON) + + +@bot.handler(message='disable') +def disable_handler(ctx: bot.Context) -> None: + mqtt.set_power(config['mqtt']['home_id'], False) + ctx.reply(ctx.lang('done')) + notify(ctx.user, UserAction.OFF) + + +@bot.handler(message='status') +def status(ctx: bot.Context) -> None: + label = ctx.lang('enabled') if relay_state.enabled else ctx.lang('disabled') + if relay_state.ever_updated: + date_label = '' + today = datetime.date.today() + if today != relay_state.update_time.date(): + yday = today - datetime.timedelta(days=1) + yyday = today - datetime.timedelta(days=2) + if yday == relay_state.update_time.date(): + date_label = ctx.lang('date_yday') + elif yyday == relay_state.update_time.date(): + date_label = ctx.lang('date_yyday') + else: + date_label = relay_state.update_time.strftime('%d.%m.%Y') + date_label += ' ' + date_label += ctx.lang('date_at') + ' ' + date_label += relay_state.update_time.strftime('%H:%M') + label += ctx.lang('status_updated', date_label) + ctx.reply(label) + + +def start(ctx: bot.Context) -> None: + if ctx.user_id in config['bot']['users'] or ctx.user_id in config['bot']['admin_users']: + ctx.reply(ctx.lang('start_message')) + else: + buttons = [ + [ctx.lang('send_access_request')] + ] + ctx.reply(ctx.lang('start_message_no_access'), markup=ReplyKeyboardMarkup(buttons, one_time_keyboard=False)) + + +@bot.exceptionhandler +def exception_handler(e: Exception, ctx: bot.Context) -> bool: + return False + + +@bot.defaultreplymarkup +def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [[ctx.lang('enable'), ctx.lang('disable')], [ctx.lang('status')]] + if ctx.user_id in config['bot']['admin_users']: + buttons.append([ctx.lang('management')]) + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +if __name__ == '__main__': + mqtt = MqttRelay(devices=MqttEspDevice(id=config['mqtt']['home_id'], + secret=config['mqtt']['home_secret'])) + mqtt.set_message_callback(on_mqtt_message) + mqtt.connect_and_loop(loop_forever=False) + + # bot.enable_logging(BotType.PUMP_MQTT) + bot.run(start_handler=start) + + mqtt.disconnect() diff --git a/bin/relay_mqtt_bot.py b/bin/relay_mqtt_bot.py new file mode 100755 index 0000000..3ad0a9b --- /dev/null +++ b/bin/relay_mqtt_bot.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +import sys +import __py_include + +from enum import Enum +from typing import Optional, Union +from telegram import ReplyKeyboardMarkup +from functools import partial + +from homekit.config import config, AppConfigUnit, Translation +from homekit.telegram import bot +from homekit.telegram.config import TelegramBotConfig +from homekit.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule, MqttNodesConfig +from homekit.mqtt.module.relay import MqttRelayModule, MqttRelayState +from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload + + +if __name__ != '__main__': + print(f'this script can not be imported as module', file=sys.stderr) + sys.exit(1) + + +mqtt_nodes_config = MqttNodesConfig() + + +class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig): + NAME = 'relay_mqtt_bot' + + _strings: Translation + + def __init__(self): + super().__init__() + self._strings = Translation('mqtt_nodes') + + @classmethod + def schema(cls) -> Optional[dict]: + return { + **super(TelegramBotConfig).schema(), + 'relay_nodes': { + 'type': 'list', + 'required': True, + 'schema': { + 'type': 'string' + } + }, + } + + @staticmethod + def custom_validator(data): + relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True) + for node in data['relay_nodes']: + if node not in relay_node_names: + raise ValueError(f'unknown relay node "{node}"') + + def get_relay_name_translated(self, lang: str, relay_name: str) -> str: + return self._strings.get(lang)[relay_name]['relay'] + + +config.load_app(RelayMqttBotConfig) + +bot.initialize() +bot.lang.ru( + start_message="Выберите команду на клавиатуре", + unknown_command="Неизвестная команда", + done="Готово 👌", +) +bot.lang.en( + start_message="Select command on the keyboard", + unknown_command="Unknown command", + done="Done 👌", +) + + +type_emojis = { + 'lamp': '💡' +} +status_emoji = { + 'on': '✅', + 'off': '❌' +} + + +mqtt: MqttWrapper +relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {} +relay_states: dict[str, MqttRelayState] = {} + + +class UserAction(Enum): + ON = 'on' + OFF = 'off' + + +def on_mqtt_message(node: MqttNode, + message: MqttPayload): + if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): + kwargs = dict(rssi=message.rssi, enabled=message.flags.state) + if isinstance(message, InitialDiagnosticsPayload): + kwargs['fw_version'] = message.fw_version + if node.id not in relay_states: + relay_states[node.id] = MqttRelayState() + relay_states[node.id].update(**kwargs) + + +async def enable_handler(node_id: str, ctx: bot.Context) -> None: + relay_nodes[node_id].switchpower(True) + await ctx.reply(ctx.lang('done')) + + +async def disable_handler(node_id: str, ctx: bot.Context) -> None: + relay_nodes[node_id].switchpower(False) + await ctx.reply(ctx.lang('done')) + + +async def start(ctx: bot.Context) -> None: + await ctx.reply(ctx.lang('start_message')) + + +@bot.exceptionhandler +async def exception_handler(e: Exception, ctx: bot.Context) -> bool: + return False + + +@bot.defaultreplymarkup +def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [] + for node_id in config.app_config['relay_nodes']: + node_data = mqtt_nodes_config.get_node(node_id) + type_emoji = type_emojis[node_data['relay']['device_type']] + row = [f'{type_emoji}{status_emoji[i.value]} {config.app_config.get_relay_name_translated(ctx.user_lang, node_id)}' + for i in UserAction] + buttons.append(row) + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +devices = [] +mqtt = MqttWrapper(client_id='relay_mqtt_bot') +for node_id in config.app_config['relay_nodes']: + node_data = mqtt_nodes_config.get_node(node_id) + mqtt_node = MqttNode(node_id=node_id, + node_secret=node_data['password']) + module_kwargs = {} + try: + if node_data['relay']['legacy_topics']: + module_kwargs['legacy_topics'] = True + except KeyError: + pass + relay_nodes[node_id] = mqtt_node.load_module('relay', **module_kwargs) + mqtt_node.add_payload_callback(on_mqtt_message) + mqtt.add_node(mqtt_node) + + type_emoji = type_emojis[node_data['relay']['device_type']] + + for action in UserAction: + messages = [] + for _lang in Translation.LANGUAGES: + _label = config.app_config.get_relay_name_translated(_lang, node_id) + messages.append(f'{type_emoji}{status_emoji[action.value]} {_label}') + bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, node_id)) + +mqtt.connect_and_loop(loop_forever=False) + +bot.run(start_handler=start) + +mqtt.disconnect() diff --git a/bin/relay_mqtt_http_proxy.py b/bin/relay_mqtt_http_proxy.py new file mode 100755 index 0000000..23938e1 --- /dev/null +++ b/bin/relay_mqtt_http_proxy.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +import logging +import __py_include + +from homekit import http +from homekit.config import config, AppConfigUnit +from homekit.mqtt import MqttPayload, MqttWrapper, MqttNode, MqttModule, MqttNodesConfig +from homekit.mqtt.module.relay import MqttRelayState, MqttRelayModule, MqttPowerStatusPayload +from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload +from typing import Optional, Union + + +logger = logging.getLogger(__name__) +mqtt: Optional[MqttWrapper] = None +mqtt_nodes: dict[str, MqttNode] = {} +relay_modules: dict[str, Union[MqttRelayModule, MqttModule]] = {} +relay_states: dict[str, MqttRelayState] = {} + +mqtt_nodes_config = MqttNodesConfig() + + +class RelayMqttHttpProxyConfig(AppConfigUnit): + NAME = 'relay_mqtt_http_proxy' + + @classmethod + def schema(cls) -> Optional[dict]: + return { + 'relay_nodes': { + 'type': 'list', + 'required': True, + 'schema': { + 'type': 'string' + } + }, + 'listen_addr': cls._addr_schema(required=True) + } + + @staticmethod + def custom_validator(data): + relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True) + for node in data['relay_nodes']: + if node not in relay_node_names: + raise ValueError(f'unknown relay node "{node}"') + + +def on_mqtt_message(node: MqttNode, + message: MqttPayload): + try: + is_legacy = mqtt_nodes_config[node.id]['relay']['legacy_topics'] + logger.debug(f'on_mqtt_message: relay {node.id} uses legacy topic names') + except KeyError: + is_legacy = False + kwargs = {} + + if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): + kwargs['rssi'] = message.rssi + if is_legacy: + kwargs['enabled'] = message.flags.state + + if not is_legacy and isinstance(message, MqttPowerStatusPayload): + kwargs['enabled'] = message.opened + + if len(kwargs): + logger.debug(f'on_mqtt_message: {node.id}: going to update relay state: {str(kwargs)}') + if node.id not in relay_states: + relay_states[node.id] = MqttRelayState() + relay_states[node.id].update(**kwargs) + + +class RelayMqttHttpProxy(http.HTTPServer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.get('/relay/{id}/on', self.relay_on) + self.get('/relay/{id}/off', self.relay_off) + self.get('/relay/{id}/toggle', self.relay_toggle) + + async def _relay_on_off(self, + enable: Optional[bool], + req: http.Request): + node_id = req.match_info['id'] + node_secret = req.query['secret'] + + node = mqtt_nodes[node_id] + relay_module = relay_modules[node_id] + + if enable is None: + if node_id in relay_states and relay_states[node_id].ever_updated: + cur_state = relay_states[node_id].enabled + else: + cur_state = False + enable = not cur_state + + node.secret = node_secret + relay_module.switchpower(enable) + return self.ok() + + async def relay_on(self, req: http.Request): + return await self._relay_on_off(True, req) + + async def relay_off(self, req: http.Request): + return await self._relay_on_off(False, req) + + async def relay_toggle(self, req: http.Request): + return await self._relay_on_off(None, req) + + +if __name__ == '__main__': + config.load_app(RelayMqttHttpProxyConfig) + + mqtt = MqttWrapper(client_id='relay_mqtt_http_proxy', + randomize_client_id=True) + for node_id in config.app_config['relay_nodes']: + node_data = mqtt_nodes_config.get_node(node_id) + mqtt_node = MqttNode(node_id=node_id) + module_kwargs = {} + try: + if node_data['relay']['legacy_topics']: + module_kwargs['legacy_topics'] = True + except KeyError: + pass + relay_modules[node_id] = mqtt_node.load_module('relay', **module_kwargs) + if 'legacy_topics' in module_kwargs: + mqtt_node.load_module('diagnostics') + mqtt_node.add_payload_callback(on_mqtt_message) + mqtt.add_node(mqtt_node) + mqtt_nodes[node_id] = mqtt_node + + mqtt.connect_and_loop(loop_forever=False) + + proxy = RelayMqttHttpProxy(config.app_config['listen_addr']) + try: + proxy.run() + except KeyboardInterrupt: + mqtt.disconnect() diff --git a/bin/sensors_bot.py b/bin/sensors_bot.py new file mode 100755 index 0000000..43932e1 --- /dev/null +++ b/bin/sensors_bot.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +import json +import socket +import logging +import re +import gc +import __py_include + +from io import BytesIO +from typing import Optional + +import matplotlib.pyplot as plt +import matplotlib.dates as mdates +import matplotlib.ticker as mticker + +from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton + +from homekit.config import config +from homekit.telegram import bot +from homekit.util import chunks, MySimpleSocketClient +from homekit.api import WebApiClient +from homekit.api.types import ( + TemperatureSensorLocation +) + +config.load_app('sensors_bot') +bot.initialize() + +bot.lang.ru( + start_message="Выберите датчик на клавиатуре", + unknown_command="Неизвестная команда", + temperature="Температура", + humidity="Влажность", + plot_3h="График за 3 часа", + plot_6h="График за 6 часов", + plot_12h="График за 12 часов", + plot_24h="График за 24 часа", + unexpected_callback_data="Ошибка: неверные данные", + loading="Загрузка...", + n_hrs="график за %d ч." +) +bot.lang.en( + start_message="Select the sensor on the keyboard", + unknown_command="Unknown command", + temperature="Temperature", + humidity="Relative humidity", + plot_3h="Graph for 3 hours", + plot_6h="Graph for 6 hours", + plot_12h="Graph for 12 hours", + plot_24h="Graph for 24 hours", + unexpected_callback_data="Unexpected callback data", + loading="Loading...", + n_hrs="graph for %d hours" +) + +plt.rcParams['font.size'] = 7 +logger = logging.getLogger(__name__) +plot_hours = [3, 6, 12, 24] + + +_sensor_names = [] +for k, v in config['sensors'].items(): + _sensor_names.append(k) + bot.lang.set({k: v['label_ru']}, 'ru') + bot.lang.set({k: v['label_en']}, 'en') + + +@bot.handler(messages=_sensor_names, argument='message_key') +def read_sensor(sensor: str, ctx: bot.Context) -> None: + host = config['sensors'][sensor]['ip'] + port = config['sensors'][sensor]['port'] + + try: + client = MySimpleSocketClient(host, port) + client.write('read') + data = json.loads(client.read()) + except (socket.timeout, socket.error) as error: + return ctx.reply_exc(error) + + temp = round(data['temp'], 2) + humidity = round(data['humidity'], 2) + + text = ctx.lang('temperature') + f': <b>{temp} °C</b>\n' + text += ctx.lang('humidity') + f': <b>{humidity}%</b>' + + buttons = list(map( + lambda h: InlineKeyboardButton(ctx.lang(f'plot_{h}h'), callback_data=f'plot/{sensor}/{h}'), + plot_hours + )) + ctx.reply(text, markup=InlineKeyboardMarkup(chunks(buttons, 2))) + + +@bot.callbackhandler(callback='*') +def callback_handler(ctx: bot.Context) -> None: + query = ctx.callback_query + + sensors_variants = '|'.join(config['sensors'].keys()) + hour_variants = '|'.join(list(map( + lambda n: str(n), + plot_hours + ))) + + match = re.match(rf'plot/({sensors_variants})/({hour_variants})', query.data) + if not match: + query.answer(ctx.lang('unexpected_callback_data')) + return + + query.answer(ctx.lang('loading')) + + # retrieve data + sensor = TemperatureSensorLocation[match.group(1).upper()] + hours = int(match.group(2)) + + api = WebApiClient(timeout=20) + data = api.get_sensors_data(sensor, hours) + + title = ctx.lang(sensor.name.lower()) + ' (' + ctx.lang('n_hrs', hours) + ')' + plot = draw_plot(data, title, + ctx.lang('temperature'), + ctx.lang('humidity')) + bot.send_photo(ctx.user_id, photo=plot) + + gc.collect() + + +def draw_plot(data, + title: str, + label_temp: str, + label_hum: str) -> BytesIO: + tempval = [] + humval = [] + dates = [] + for date, temp, humidity in data: + dates.append(date) + tempval.append(temp) + humval.append(humidity) + + fig, axs = plt.subplots(2, 1) + df = mdates.DateFormatter('%H:%M') + + axs[0].set_title(label_temp) + axs[0].plot(dates, tempval) + axs[0].xaxis.set_major_formatter(df) + axs[0].yaxis.set_major_formatter(mticker.FormatStrFormatter('%2.2f °C')) + + fig.suptitle(title, fontsize=10) + + axs[1].set_title(label_hum) + axs[1].plot(dates, humval) + axs[1].xaxis.set_major_formatter(df) + axs[1].yaxis.set_major_formatter(mticker.FormatStrFormatter('%2.1f %%')) + + fig.autofmt_xdate() + + # should be called after all axes have been added + fig.tight_layout() + + buf = BytesIO() + fig.savefig(buf, format='png', dpi=160) + buf.seek(0) + + plt.clf() + plt.close('all') + + return buf + + +@bot.defaultreplymarkup +def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [] + for k in config['sensors'].keys(): + buttons.append(ctx.lang(k)) + buttons = chunks(buttons, 2) + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +if __name__ == '__main__': + bot.run() diff --git a/bin/sound_bot.py b/bin/sound_bot.py new file mode 100755 index 0000000..fa22ba7 --- /dev/null +++ b/bin/sound_bot.py @@ -0,0 +1,888 @@ +#!/usr/bin/env python3 +import logging +import os +import tempfile +import __py_include + +from enum import Enum +from datetime import datetime, timedelta +from html import escape +from typing import Optional, List, Dict, Tuple + +from homekit.config import config +from homekit.api import WebApiClient +from homekit.api.types import SoundSensorLocation +from homekit.api.errors import ApiResponseError +from homekit.media import SoundNodeClient, SoundRecordClient, SoundRecordFile, CameraNodeClient +from homekit.soundsensor import SoundSensorServerGuardClient +from homekit.util import Addr, chunks, filesize_fmt + +from homekit.telegram import bot + +from telegram.error import TelegramError +from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton, User + +from PIL import Image + +config.load_app('sound_bot') + +nodes = {} +for nodename, nodecfg in config['nodes'].items(): + nodes[nodename] = Addr.fromstring(nodecfg['addr']) + +bot.initialize() +bot.lang.ru( + start_message="Выберите команду на клавиатуре", + unknown_command="Неизвестная команда", + unexpected_callback_data="Ошибка: неверные данные", + settings="Настройки микшера", + record="Запись", + loading="Загрузка...", + select_place="Выберите место:", + invalid_location="Неверное место", + invalid_interval="Неверная длительность", + unsupported_action="Неподдерживаемое действие", + # select_control="Выберите контрол для изменения настроек:", + control_state="Состояние контрола %s", + incr="громкость +", + decr="громкость -", + back="◀️ Назад", + n_min="%d мин.", + n_sec="%d сек.", + select_interval="Выберите длительность:", + place="Место", + beginning="Начало", + end="Конец", + record_result="Результат записи", + record_started='Запись запущена!', + record_error="Ошибка записи", + files="Локальные файлы", + remote_files="Файлы на сервере", + file_line="— Запись с <b>%s</b> до <b>%s</b> <i>(%s)</i>", + access_denied="Доступ запрещён", + + guard_disable="Снять с охраны", + guard_enable="Поставить на охрану", + guard_status="Статус охраны", + guard_user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> %s.', + guard_user_action_enable="включил охрану ✅", + guard_user_action_disable="выключил охрану ❌", + guard_status_enabled="Включена ✅", + guard_status_disabled="Выключена ❌", + + done="Готово 👌", + + sound_sensors="Датчики звука", + sound_sensors_info="Здесь можно получить информацию о последних срабатываниях датчиков звука.", + sound_sensors_no_24h_data="За последние 24 часа данных нет.", + sound_sensors_show_anything="Показать, что есть", + + cameras="Камеры", + select_option="Выберите опцию", + w_flash="Со вспышкой", + wo_flash="Без вспышки", +) + +bot.lang.en( + start_message="Select command on the keyboard", + unknown_command="Unknown command", + settings="Mixer settings", + record="Record", + unexpected_callback_data="Unexpected callback data", + loading="Loading...", + select_place="Select place:", + invalid_location="Invalid place", + invalid_interval="Invalid duration", + unsupported_action="Unsupported action", + # select_control="Select control to adjust its parameters:", + control_state="%s control state", + incr="vol +", + decr="vol -", + back="◀️ Back", + n_min="%d min.", + n_sec="%d s.", + select_interval="Select duration:", + place="Place", + beginning="Started", + end="Ended", + record_result="Result", + record_started='Recording started!', + record_error="Recording error", + files="Local files", + remote_files="Remote files", + file_line="— From <b>%s</b> to <b>%s</b> <i>(%s)</i>", + access_denied="Access denied", + + guard_disable="Disable guard", + guard_enable="Enable guard", + guard_status="Guard status", + guard_user_action_notification='User <a href="tg://user?id=%d">%s</a> %s.', + guard_user_action_enable="turned the guard ON ✅", + guard_user_action_disable="turn the guard OFF ❌", + guard_status_enabled="Active ✅", + guard_status_disabled="Disabled ❌", + done="Done 👌", + + sound_sensors="Sound sensors", + sound_sensors_info="Here you can get information about last sound sensors hits.", + sound_sensors_no_24h_data="No data for the last 24 hours.", + sound_sensors_show_anything="Show me at least something", + + cameras="Cameras", + select_option="Select option", + w_flash="With flash", + wo_flash="Without flash", +) + +logger = logging.getLogger(__name__) +RenderedContent = Tuple[str, Optional[InlineKeyboardMarkup]] +record_client: Optional[SoundRecordClient] = None +node_client_links: Dict[str, SoundNodeClient] = {} +cam_client_links: Dict[str, CameraNodeClient] = {} + + +def node_client(node: str) -> SoundNodeClient: + if node not in node_client_links: + node_client_links[node] = SoundNodeClient(Addr.fromstring(config['nodes'][node]['addr'])) + return node_client_links[node] + + +def camera_client(cam: str) -> CameraNodeClient: + if cam not in node_client_links: + cam_client_links[cam] = CameraNodeClient(Addr.fromstring(config['cameras'][cam]['addr'])) + return cam_client_links[cam] + + +def node_exists(node: str) -> bool: + return node in config['nodes'] + + +def camera_exists(name: str) -> bool: + return name in config['cameras'] + + +def camera_settings(name: str) -> Optional[dict]: + try: + return config['cameras'][name]['settings'] + except KeyError: + return None + + +def have_cameras() -> bool: + return 'cameras' in config and config['cameras'] + + +def sound_sensor_exists(node: str) -> bool: + return node in config['sound_sensors'] + + +def interval_defined(interval: int) -> bool: + return interval in config['bot']['record_intervals'] + + +def callback_unpack(ctx: bot.Context) -> List[str]: + return ctx.callback_query.data[3:].split('/') + + +def manual_recording_allowed(user_id: int) -> bool: + return 'manual_record_allowlist' not in config['bot'] or user_id in config['bot']['manual_record_allowlist'] + + +def guard_client() -> SoundSensorServerGuardClient: + return SoundSensorServerGuardClient(Addr.fromstring(config['bot']['guard_server'])) + + +# message renderers +# ----------------- + +class Renderer: + @classmethod + def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup: + buttons = [] + for node, nodeconfig in config['nodes'].items(): + buttons.append([InlineKeyboardButton(nodeconfig['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{node}')]) + return InlineKeyboardMarkup(buttons) + + @classmethod + def back_button(cls, + ctx: bot.Context, + buttons: list, + callback_data: str): + buttons.append([ + InlineKeyboardButton(ctx.lang('back'), callback_data=callback_data) + ]) + + +class SettingsRenderer(Renderer): + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'<b>{ctx.lang("settings")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='s0') + + @classmethod + def node(cls, ctx: bot.Context, + controls: List[dict]) -> RenderedContent: + node, = callback_unpack(ctx) + + html = [] + buttons = [] + for control in controls: + html.append(f'<b>{control["name"]}</b>\n{escape(control["info"])}') + buttons.append([ + InlineKeyboardButton(control['name'], callback_data=f's1/{node}/{control["name"]}') + ]) + + html = "\n\n".join(html) + cls.back_button(ctx, buttons, callback_data='s0') + + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def control(cls, ctx: bot.Context, data) -> RenderedContent: + node, control, *rest = callback_unpack(ctx) + + html = '<b>' + ctx.lang('control_state', control) + '</b>\n\n' + html += escape(data['info']) + buttons = [] + callback_prefix = f's2/{node}/{control}' + for cap in data['caps']: + if cap == 'mute': + muted = 'dB] [off]' in data['info'] + act = 'unmute' if muted else 'mute' + buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')]) + + elif cap == 'cap': + cap_dis = 'Capture [off]' in data['info'] + act = 'cap' if cap_dis else 'nocap' + buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')]) + + elif cap == 'volume': + buttons.append( + list(map(lambda s: InlineKeyboardButton(ctx.lang(s), callback_data=f'{callback_prefix}/{s}'), + ['decr', 'incr'])) + ) + + cls.back_button(ctx, buttons, callback_data=f's0/{node}') + + return html, InlineKeyboardMarkup(buttons) + + +class RecordRenderer(Renderer): + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'<b>{ctx.lang("record")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='r0') + + @classmethod + def node(cls, ctx: bot.Context, durations: List[int]) -> RenderedContent: + node, = callback_unpack(ctx) + + html = ctx.lang('select_interval') + + buttons = [] + for s in durations: + if s >= 60: + m = int(s / 60) + label = ctx.lang('n_min', m) + else: + label = ctx.lang('n_sec', s) + buttons.append(InlineKeyboardButton(label, callback_data=f'r1/{node}/{s}')) + buttons = list(chunks(buttons, 3)) + cls.back_button(ctx, buttons, callback_data=f'r0') + + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent: + node, *rest = callback_unpack(ctx) + + place = config['nodes'][node]['label'][ctx.user_lang] + + html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})' + return html, None + + @classmethod + def record_done(cls, info: dict, node: str, uid: int) -> str: + ulang = bot.db.get_user_lang(uid) + + def lang(key, *args): + return bot.lang.get(key, ulang, *args) + + rid = info['id'] + fmt = '%d.%m.%y %H:%M:%S' + start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt) + stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt) + + place = config['nodes'][node]['label'][ulang] + + html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n' + html += f'<b>{lang("beginning")}</b>: {start_time}\n' + html += f'<b>{lang("end")}</b>: {stop_time}' + + return html + + @classmethod + def record_error(cls, info: dict, node: str, uid: int) -> str: + ulang = bot.db.get_user_lang(uid) + + def lang(key, *args): + return bot.lang.get(key, ulang, *args) + + place = config['nodes'][node]['label'][ulang] + rid = info['id'] + + html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})' + if 'error' in info: + html += '\n'+str(info['error']) + + return html + + +class FilesRenderer(Renderer): + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'<b>{ctx.lang("files")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='f0') + + @classmethod + def filelist(cls, ctx: bot.Context, files: List[SoundRecordFile]) -> RenderedContent: + node, = callback_unpack(ctx) + + html_files = map(lambda file: cls.file(ctx, file, node), files) + html = '\n\n'.join(html_files) + + buttons = [] + cls.back_button(ctx, buttons, callback_data='f0') + + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def file(cls, ctx: bot.Context, file: SoundRecordFile, node: str) -> str: + html = ctx.lang('file_line', file.start_humantime, file.stop_humantime, filesize_fmt(file.filesize)) + if file.file_id is not None: + html += f'/audio_{node}_{file.file_id}' + return html + + +class RemoteFilesRenderer(FilesRenderer): + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'<b>{ctx.lang("remote_files")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='g0') + + +class SoundSensorRenderer(Renderer): + @classmethod + def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup: + buttons = [] + for sensor, sensor_label in config['sound_sensors'].items(): + buttons.append( + [InlineKeyboardButton(sensor_label[ctx.user_lang], callback_data=f'{callback_prefix}/{sensor}')]) + return InlineKeyboardMarkup(buttons) + + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'{ctx.lang("sound_sensors_info")}\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='S0') + + @classmethod + def hits(cls, ctx: bot.Context, data, is_last=False) -> RenderedContent: + node, = callback_unpack(ctx) + buttons = [] + + if not data: + html = ctx.lang('sound_sensors_no_24h_data') + if not is_last: + buttons.append([InlineKeyboardButton(ctx.lang('sound_sensors_show_anything'), callback_data=f'S1/{node}')]) + else: + html = '' + prev_date = None + for item in data: + item_date = item['time'].strftime('%d.%m.%y') + if prev_date is None or prev_date != item_date: + if html != '': + html += '\n\n' + html += f'<b>{item_date}</b>' + prev_date = item_date + html += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})' + cls.back_button(ctx, buttons, callback_data='S0') + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def hits_plain(cls, ctx: bot.Context, data, is_last=False) -> bytes: + node, = callback_unpack(ctx) + + text = '' + prev_date = None + for item in data: + item_date = item['time'].strftime('%d.%m.%y') + if prev_date is None or prev_date != item_date: + if text != '': + text += '\n\n' + text += item_date + prev_date = item_date + text += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})' + + return text.encode() + + +class CamerasRenderer(Renderer): + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'<b>{ctx.lang("cameras")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='c0') + + @classmethod + def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup: + buttons = [] + for camera_name, camera_data in config['cameras'].items(): + buttons.append( + [InlineKeyboardButton(camera_data['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{camera_name}')]) + return InlineKeyboardMarkup(buttons) + + @classmethod + def camera(cls, ctx: bot.Context, flash_available: bool) -> RenderedContent: + node, = callback_unpack(ctx) + + html = ctx.lang('select_option') + + buttons = [] + if flash_available: + buttons.append(InlineKeyboardButton(ctx.lang('w_flash'), callback_data=f'c1/{node}/1')) + buttons.append(InlineKeyboardButton(ctx.lang('wo_flash'), callback_data=f'c1/{node}/0')) + + cls.back_button(ctx, [buttons], callback_data=f'c0') + + return html, InlineKeyboardMarkup([buttons]) + # + # @classmethod + # def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent: + # node, *rest = callback_unpack(ctx) + # + # place = config['nodes'][node]['label'][ctx.user_lang] + # + # html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})' + # return html, None + # + # @classmethod + # def record_done(cls, info: dict, node: str, uid: int) -> str: + # ulang = bot.db.get_user_lang(uid) + # + # def lang(key, *args): + # return bot.lang.get(key, ulang, *args) + # + # rid = info['id'] + # fmt = '%d.%m.%y %H:%M:%S' + # start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt) + # stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt) + # + # place = config['nodes'][node]['label'][ulang] + # + # html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n' + # html += f'<b>{lang("beginning")}</b>: {start_time}\n' + # html += f'<b>{lang("end")}</b>: {stop_time}' + # + # return html + # + # @classmethod + # def record_error(cls, info: dict, node: str, uid: int) -> str: + # ulang = bot.db.get_user_lang(uid) + # + # def lang(key, *args): + # return bot.lang.get(key, ulang, *args) + # + # place = config['nodes'][node]['label'][ulang] + # rid = info['id'] + # + # html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})' + # if 'error' in info: + # html += '\n'+str(info['error']) + # + # return html + + +# cameras handlers +# ---------------- + +@bot.handler(message='cameras', callback=r'^c0$') +def cameras(ctx: bot.Context): + """ List of cameras """ + + text, markup = CamerasRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +@bot.callbackhandler(callback=r'^c0/.*') +def camera_options(ctx: bot.Context) -> None: + """ List of options (with/without flash etc) """ + + cam, = callback_unpack(ctx) + if not camera_exists(cam): + ctx.answer(ctx.lang('invalid_location')) + return + + ctx.answer() + flash_available = 'flash_available' in config['cameras'][cam] and config['cameras'][cam]['flash_available'] is True + + text, markup = CamerasRenderer.camera(ctx, flash_available) + ctx.edit(text, markup) + + +@bot.callbackhandler(callback=r'^c1/.*') +def camera_capture(ctx: bot.Context) -> None: + """ Cheese """ + + cam, flash = callback_unpack(ctx) + flash = int(flash) + if not camera_exists(cam): + ctx.answer(ctx.lang('invalid_location')) + return + + ctx.answer() + + client = camera_client(cam) + fd = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') + fd.close() + + client.capture(fd.name, with_flash=bool(flash)) + logger.debug(f'captured photo ({cam}), saved to {fd.name}') + + camera_config = config['cameras'][cam] + if 'rotate' in camera_config: + im = Image.open(fd.name) + im.rotate(camera_config['rotate'], expand=True) + # im.show() + im.save(fd.name) + logger.debug(f"rotated image {camera_config['rotate']} degrees") + + try: + with open(fd.name, 'rb') as f: + bot.send_photo(ctx.user_id, photo=f) + except TelegramError as exc: + logger.exception(exc) + + try: + os.unlink(fd.name) + except OSError as exc: + logger.exception(exc) + + +# settings handlers +# ----------------- + +@bot.handler(message='settings', callback=r'^s0$') +def settings(ctx: bot.Context): + """ List of nodes """ + + text, markup = SettingsRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +@bot.callbackhandler(callback=r'^s0/.*') +def settings_place(ctx: bot.Context): + """ List of controls """ + + node, = callback_unpack(ctx) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + + cl = node_client(node) + controls = cl.amixer_get_all() + + ctx.answer() + + text, markup = SettingsRenderer.node(ctx, controls) + ctx.edit(text, markup) + + +@bot.callbackhandler(callback=r'^s1/.*') +def settings_place_control(ctx: bot.Context): + """ List of available tunes for control """ + + node, control = callback_unpack(ctx) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + + cl = node_client(node) + control_data = cl.amixer_get(control) + + ctx.answer() + + text, markup = SettingsRenderer.control(ctx, control_data) + ctx.edit(text, markup) + + +@bot.callbackhandler(callback=r'^s2/.*') +def settings_place_control_action(ctx: bot.Context): + """ Tuning """ + + node, control, action = callback_unpack(ctx) + if not node_exists(node): + return + + cl = node_client(node) + if not hasattr(cl, f'amixer_{action}'): + ctx.answer(ctx.lang('invalid_action')) + return + + func = getattr(cl, f'amixer_{action}') + control_data = func(control) + + ctx.answer() + + text, markup = SettingsRenderer.control(ctx, control_data) + ctx.edit(text, markup) + + +# recording handlers +# ------------------ + +@bot.handler(message='record', callback=r'^r0$') +def record(ctx: bot.Context): + """ List of nodes """ + + if not manual_recording_allowed(ctx.user_id): + return ctx.reply(ctx.lang('access_denied')) + + text, markup = RecordRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +@bot.callbackhandler(callback=r'^r0/.*') +def record_place(ctx: bot.Context): + """ List of available intervals """ + + node, = callback_unpack(ctx) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + + ctx.answer() + + text, markup = RecordRenderer.node(ctx, config['bot']['record_intervals']) + ctx.edit(text, markup) + + +@bot.callbackhandler(callback=r'^r1/.*') +def record_place_interval(ctx: bot.Context): + """ Do record! """ + + node, interval = callback_unpack(ctx) + interval = int(interval) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + if not interval_defined(interval): + ctx.answer(ctx.lang('invalid_interval')) + return + + try: + record_id = record_client.record(node, interval, {'user_id': ctx.user_id, 'node': node}) + except ApiResponseError as e: + ctx.answer(e.error_message) + logger.error(e) + return + + ctx.answer() + + html, markup = RecordRenderer.record_started(ctx, record_id) + ctx.edit(html, markup) + + +# sound sensor handlers +# --------------------- + +@bot.handler(message='sound_sensors', callback=r'^S0$') +def sound_sensors(ctx: bot.Context): + """ List of places """ + + text, markup = SoundSensorRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +@bot.callbackhandler(callback=r'^S0/.*') +def sound_sensors_last_24h(ctx: bot.Context): + """ Last 24h log """ + + node, = callback_unpack(ctx) + if not sound_sensor_exists(node): + ctx.answer(ctx.lang('invalid location')) + return + + ctx.answer() + + cl = WebApiClient() + data = cl.get_sound_sensor_hits(location=SoundSensorLocation[node.upper()], + after=datetime.now() - timedelta(hours=24)) + + text, markup = SoundSensorRenderer.hits(ctx, data) + if len(text) > 4096: + plain = SoundSensorRenderer.hits_plain(ctx, data) + bot.send_file(ctx.user_id, document=plain, filename='data.txt') + else: + ctx.edit(text, markup=markup) + + +@bot.callbackhandler(callback=r'^S1/.*') +def sound_sensors_last_anything(ctx: bot.Context): + """ Last _something_ """ + + node, = callback_unpack(ctx) + if not sound_sensor_exists(node): + ctx.answer(ctx.lang('invalid location')) + return + + ctx.answer() + + cl = WebApiClient() + data = cl.get_last_sound_sensor_hits(location=SoundSensorLocation[node.upper()], + last=20) + + text, markup = SoundSensorRenderer.hits(ctx, data, is_last=True) + if len(text) > 4096: + plain = SoundSensorRenderer.hits_plain(ctx, data) + bot.send_file(ctx.user_id, document=plain, filename='data.txt') + else: + ctx.edit(text, markup=markup) + + +# guard enable/disable handlers +# ----------------------------- + +class GuardUserAction(Enum): + ENABLE = 'enable' + DISABLE = 'disable' + + +if 'guard_server' in config['bot']: + @bot.handler(message='guard_status') + def guard_status(ctx: bot.Context): + guard = guard_client() + resp = guard.guard_status() + + key = 'enabled' if resp['enabled'] is True else 'disabled' + ctx.reply(ctx.lang(f'guard_status_{key}')) + + + @bot.handler(message='guard_enable') + def guard_enable(ctx: bot.Context): + guard = guard_client() + guard.guard_enable() + ctx.reply(ctx.lang('done')) + + _guard_notify(ctx.user, GuardUserAction.ENABLE) + + + @bot.handler(message='guard_disable') + def guard_disable(ctx: bot.Context): + guard = guard_client() + guard.guard_disable() + ctx.reply(ctx.lang('done')) + + _guard_notify(ctx.user, GuardUserAction.DISABLE) + + + def _guard_notify(user: User, action: GuardUserAction): + def text_getter(lang: str): + action_name = bot.lang.get(f'guard_user_action_{action.value}', lang) + user_name = bot.user_any_name(user) + return 'ℹ ' + bot.lang.get('guard_user_action_notification', lang, + user.id, user_name, action_name) + + bot.notify_all(text_getter, exclude=(user.id,)) + + +@bot.defaultreplymarkup +def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [ + [ctx.lang('record'), ctx.lang('settings')], + # [ctx.lang('files'), ctx.lang('remote_files')], + ] + if 'guard_server' in config['bot']: + buttons.append([ + ctx.lang('guard_enable'), ctx.lang('guard_disable'), ctx.lang('guard_status') + ]) + buttons.append([ctx.lang('sound_sensors')]) + if have_cameras(): + buttons.append([ctx.lang('cameras')]) + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +# record client callbacks +# ----------------------- + +def record_onerror(info: dict, userdata: dict): + uid = userdata['user_id'] + node = userdata['node'] + + html = RecordRenderer.record_error(info, node, uid) + try: + bot.notify_user(userdata['user_id'], html) + except TelegramError as exc: + logger.exception(exc) + finally: + record_client.forget(node, info['id']) + + +def record_onfinished(info: dict, fn: str, userdata: dict): + logger.info('record finished: ' + str(info)) + + uid = userdata['user_id'] + node = userdata['node'] + + html = RecordRenderer.record_done(info, node, uid) + bot.notify_user(uid, html) + + try: + # sending audiofile to telegram + with open(fn, 'rb') as f: + bot.send_audio(uid, audio=f, filename='audio.mp3') + + # deleting temp file + try: + os.unlink(fn) + except OSError as exc: + logger.exception(exc) + bot.notify_user(uid, exc) + + # remove the recording from sound_node's history + record_client.forget(node, info['id']) + + # remove file from storage + # node_client(node).storage_delete(info['file']['fileid']) + except Exception as e: + logger.exception(e) + + +if __name__ == '__main__': + record_client = SoundRecordClient(nodes, + error_handler=record_onerror, + finished_handler=record_onfinished, + download_on_finish=True) + + bot.run() + record_client.stop() diff --git a/bin/sound_node.py b/bin/sound_node.py new file mode 100755 index 0000000..90e6997 --- /dev/null +++ b/bin/sound_node.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +import os +import __py_include + +from typing import Optional + +from homekit.config import config +from homekit.audio import amixer +from homekit.media import MediaNodeServer, SoundRecordStorage, SoundRecorder +from homekit import http + + +# This script must be run as root as it runs arecord. +# Implements HTTP API for amixer and arecord. +# ------------------------------------------- + +def _amixer_control_response(control): + info = amixer.get(control) + caps = amixer.get_caps(control) + return http.ok({ + 'caps': caps, + 'info': info + }) + + +class SoundNodeServer(MediaNodeServer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.get('/amixer/get-all/', self.amixer_get_all) + self.get('/amixer/get/{control}/', self.amixer_get) + self.get('/amixer/{op:mute|unmute|cap|nocap}/{control}/', self.amixer_set) + self.get('/amixer/{op:incr|decr}/{control}/', self.amixer_volume) + + async def amixer_get_all(self, request: http.Request): + controls_info = amixer.get_all() + return self.ok(controls_info) + + async def amixer_get(self, request: http.Request): + control = request.match_info['control'] + if not amixer.has_control(control): + raise ValueError(f'invalid control: {control}') + + return _amixer_control_response(control) + + async def amixer_set(self, request: http.Request): + op = request.match_info['op'] + control = request.match_info['control'] + if not amixer.has_control(control): + raise ValueError(f'invalid control: {control}') + + f = getattr(amixer, op) + f(control) + + return _amixer_control_response(control) + + async def amixer_volume(self, request: http.Request): + op = request.match_info['op'] + control = request.match_info['control'] + if not amixer.has_control(control): + raise ValueError(f'invalid control: {control}') + + def get_step() -> Optional[int]: + if 'step' in request.query: + step = int(request.query['step']) + if not 1 <= step <= 50: + raise ValueError('invalid step value') + return step + return None + + f = getattr(amixer, op) + f(control, step=get_step()) + + return _amixer_control_response(control) + + +if __name__ == '__main__': + if not os.getegid() == 0: + raise RuntimeError("Must be run as root.") + + config.load_app('sound_node') + + storage = SoundRecordStorage(config['node']['storage']) + + recorder = SoundRecorder(storage=storage) + recorder.start_thread() + + server = SoundNodeServer(recorder=recorder, + storage=storage, + addr=config.get_addr('node.listen')) + server.run() diff --git a/bin/sound_sensor_node.py b/bin/sound_sensor_node.py new file mode 100755 index 0000000..39c3905 --- /dev/null +++ b/bin/sound_sensor_node.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +import logging +import os +import sys +import __py_include + +from homekit.config import config +from homekit.util import Addr +from homekit.soundsensor import SoundSensorNode + +logger = logging.getLogger(__name__) + + +if __name__ == '__main__': + if not os.getegid() == 0: + sys.exit('Must be run as root.') + + config.load_app('sound_sensor_node') + + kwargs = {} + if 'delay' in config['node']: + kwargs['delay'] = config['node']['delay'] + + if 'server_addr' in config['node']: + server_addr = Addr.fromstring(config['node']['server_addr']) + else: + server_addr = None + + node = SoundSensorNode(name=config['node']['name'], + pinname=config['node']['pin'], + threshold=config['node']['threshold'] if 'threshold' in config['node'] else 1, + server_addr=server_addr, + **kwargs) + node.run() diff --git a/bin/sound_sensor_server.py b/bin/sound_sensor_server.py new file mode 100755 index 0000000..fd7ff5a --- /dev/null +++ b/bin/sound_sensor_server.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +import logging +import threading +import __py_include + +from time import sleep +from typing import Optional, List, Dict, Tuple +from functools import partial +from homekit.config import config +from homekit.util import Addr +from homekit.api import WebApiClient, RequestParams +from homekit.api.types import SoundSensorLocation +from homekit.soundsensor import SoundSensorServer, SoundSensorHitHandler +from homekit.media import MediaNodeType, SoundRecordClient, CameraRecordClient, RecordClient + +interrupted = False +logger = logging.getLogger(__name__) +server: SoundSensorServer + + +def get_related_nodes(node_type: MediaNodeType, + sensor_name: str) -> List[str]: + try: + if sensor_name not in config[f'sensor_to_{node_type.name.lower()}_nodes_relations']: + raise ValueError(f'unexpected sensor name {sensor_name}') + return config[f'sensor_to_{node_type.name.lower()}_nodes_relations'][sensor_name] + except KeyError: + return [] + + +def get_node_config(node_type: MediaNodeType, + name: str) -> Optional[dict]: + if name in config[f'{node_type.name.lower()}_nodes']: + cfg = config[f'{node_type.name.lower()}_nodes'][name] + if 'min_hits' not in cfg: + cfg['min_hits'] = 1 + return cfg + else: + return None + + +class HitCounter: + def __init__(self): + self.sensors = {} + self.lock = threading.Lock() + self._reset_sensors() + + def _reset_sensors(self): + for loc in SoundSensorLocation: + self.sensors[loc.name.lower()] = 0 + + def add(self, name: str, hits: int): + if name not in self.sensors: + raise ValueError(f'sensor {name} not found') + + with self.lock: + self.sensors[name] += hits + + def get_all(self) -> List[Tuple[str, int]]: + vals = [] + with self.lock: + for name, hits in self.sensors.items(): + if hits > 0: + vals.append((name, hits)) + self._reset_sensors() + return vals + + +class HitHandler(SoundSensorHitHandler): + def handler(self, name: str, hits: int): + if not hasattr(SoundSensorLocation, name.upper()): + logger.error(f'invalid sensor name: {name}') + return + + should_continue = False + for node_type in MediaNodeType: + try: + nodes = get_related_nodes(node_type, name) + except ValueError: + logger.error(f'config for {node_type.name.lower()} node {name} not found') + return + + for node in nodes: + node_config = get_node_config(node_type, node) + if node_config is None: + logger.error(f'config for {node_type.name.lower()} node {node} not found') + continue + if hits < node_config['min_hits']: + continue + should_continue = True + + if not should_continue: + return + + hc.add(name, hits) + + if not server.is_recording_enabled(): + return + for node_type in MediaNodeType: + try: + nodes = get_related_nodes(node_type, name) + for node in nodes: + node_config = get_node_config(node_type, node) + if node_config is None: + logger.error(f'node config for {node_type.name.lower()} node {node} not found') + continue + + durations = node_config['durations'] + dur = durations[1] if hits > node_config['min_hits'] else durations[0] + record_clients[node_type].record(node, dur*60, {'node': node}) + + except ValueError as exc: + logger.exception(exc) + + +def hits_sender(): + while not interrupted: + all_hits = hc.get_all() + if all_hits: + api.add_sound_sensor_hits(all_hits) + sleep(5) + + +api: Optional[WebApiClient] = None +hc: Optional[HitCounter] = None +record_clients: Dict[MediaNodeType, RecordClient] = {} + + +# record callbacks +# ---------------- + +def record_error(type: MediaNodeType, + info: dict, + userdata: dict): + node = userdata['node'] + logger.error('recording ' + str(dict) + f' from {type.name.lower()} node ' + node + ' failed') + + record_clients[type].forget(node, info['id']) + + +def record_finished(type: MediaNodeType, + info: dict, + fn: str, + userdata: dict): + logger.debug(f'{type.name.lower()} record finished: ' + str(info)) + + # audio could have been requested by other user (telegram bot, for example) + # so we shouldn't 'forget' it here + + # node = userdata['node'] + # record.forget(node, info['id']) + + +# api client callbacks +# -------------------- + +def api_error_handler(exc, name, req: RequestParams): + logger.error(f'api call ({name}, params={req.params}) failed, exception below') + logger.exception(exc) + + +if __name__ == '__main__': + config.load_app('sound_sensor_server') + + hc = HitCounter() + api = WebApiClient(timeout=(10, 60)) + api.enable_async(error_handler=api_error_handler) + + t = threading.Thread(target=hits_sender) + t.daemon = True + t.start() + + sound_nodes = {} + if 'sound_nodes' in config: + for nodename, nodecfg in config['sound_nodes'].items(): + sound_nodes[nodename] = Addr.fromstring(nodecfg['addr']) + + camera_nodes = {} + if 'camera_nodes' in config: + for nodename, nodecfg in config['camera_nodes'].items(): + camera_nodes[nodename] = Addr.fromstring(nodecfg['addr']) + + if sound_nodes: + record_clients[MediaNodeType.SOUND] = SoundRecordClient(sound_nodes, + error_handler=partial(record_error, MediaNodeType.SOUND), + finished_handler=partial(record_finished, MediaNodeType.SOUND)) + + if camera_nodes: + record_clients[MediaNodeType.CAMERA] = CameraRecordClient(camera_nodes, + error_handler=partial(record_error, MediaNodeType.CAMERA), + finished_handler=partial(record_finished, MediaNodeType.CAMERA)) + + try: + server = SoundSensorServer(config.get_addr('server.listen'), HitHandler) + server.run() + except KeyboardInterrupt: + interrupted = True + for c in record_clients.values(): + c.stop() + logging.info('keyboard interrupt, exiting...') diff --git a/bin/ssh_tunnels_config_util.py b/bin/ssh_tunnels_config_util.py new file mode 100755 index 0000000..d08a4f4 --- /dev/null +++ b/bin/ssh_tunnels_config_util.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +import __py_include +from homekit.config import config + +if __name__ == '__main__': + config.load_app('ssh_tunnels_config_util') + + network_prefix = config['network'] + hostnames = [] + + for k, v in config.app_config.get().items(): + if type(v) is not dict: + continue + hostnames.append(k) + + for host in hostnames: + buf = [] + i = 0 + for tun_host in hostnames: + http_bind_port = config['http_bind_base'] + config[host]['bind_slot'] * 10 + i + ssh_bind_port = config['ssh_bind_base'] + config[host]['bind_slot'] * 10 + i + + if tun_host == host: + target_host = '127.0.0.1' + else: + target_host = f'{network_prefix}.{config[tun_host]["ipv4"]}' + + buf.append(f'-R 127.0.0.1:{http_bind_port}:{target_host}:{config[tun_host]["http_port"]}') + buf.append(f'-R 127.0.0.1:{ssh_bind_port}:{target_host}:{config[tun_host]["ssh_port"]}') + + i += 1 + + print(host) + print(' '.join(buf)) + print() diff --git a/bin/temphum_mqtt_node.py b/bin/temphum_mqtt_node.py new file mode 100755 index 0000000..9ea436d --- /dev/null +++ b/bin/temphum_mqtt_node.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +import asyncio +import json +import logging +import __py_include + +from typing import Optional + +from homekit.config import config +from homekit.temphum import SensorType, BaseSensor +from homekit.temphum.i2c import create_sensor + +logger = logging.getLogger(__name__) +sensor: Optional[BaseSensor] = None +lock = asyncio.Lock() +delay = 0.01 + + +async def get_measurements(): + async with lock: + await asyncio.sleep(delay) + + temp = sensor.temperature() + rh = sensor.humidity() + + return rh, temp + + +async def handle_client(reader, writer): + request = None + while request != 'quit': + try: + request = await reader.read(255) + if request == b'\x04': + break + request = request.decode('utf-8').strip() + except Exception: + break + + if request == 'read': + try: + rh, temp = await asyncio.wait_for(get_measurements(), timeout=3) + data = dict(humidity=rh, temp=temp) + except asyncio.TimeoutError as e: + logger.exception(e) + data = dict(error='i2c call timed out') + else: + data = dict(error='invalid request') + + writer.write((json.dumps(data) + '\r\n').encode('utf-8')) + try: + await writer.drain() + except ConnectionResetError: + pass + + writer.close() + + +async def run_server(host, port): + server = await asyncio.start_server(handle_client, host, port) + async with server: + logger.info('Server started.') + await server.serve_forever() + + +if __name__ == '__main__': + config.load_app() + + if 'measure_delay' in config['sensor']: + delay = float(config['sensor']['measure_delay']) + + sensor = create_sensor(SensorType(config['sensor']['type']), + int(config['sensor']['bus'])) + + try: + host, port = config.get_addr('server.listen') + asyncio.run(run_server(host, port)) + except KeyboardInterrupt: + logging.info('Exiting...') diff --git a/bin/temphum_mqtt_receiver.py b/bin/temphum_mqtt_receiver.py new file mode 100755 index 0000000..d0a378e --- /dev/null +++ b/bin/temphum_mqtt_receiver.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +import paho.mqtt.client as mqtt +import re +import __py_include + +from homekit.config import config +from homekit.mqtt import MqttWrapper, MqttNode + + +class MqttServer(Mqtt): + def __init__(self): + super().__init__(clean_session=False) + self.database = SensorsDatabase() + + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + self._logger.info("subscribing to hk/#") + client.subscribe('hk/#', qos=1) + + def on_message(self, client: mqtt.Client, userdata, msg): + super().on_message(client, userdata, msg) + try: + variants = '|'.join([s.name.lower() for s in TemperatureSensorLocation]) + match = re.match(rf'hk/(\d+)/si7021/({variants})', msg.topic) + if not match: + return + + # FIXME string home_id must be supported + home_id = int(match.group(1)) + sensor = get_sensor_type(match.group(2)) + + payload = Temperature.unpack(msg.payload) + self.database.add_temperature(home_id, payload.time, sensor, + temp=int(payload.temp*100), + rh=int(payload.rh*100)) + except Exception as e: + self._logger.exception(str(e)) + + +if __name__ == '__main__': + config.load_app('temphum_mqtt_receiver') + + mqtt = MqttWrapper(clean_session=False) + node = MqttNode(node_id='+') + node.load_module('temphum', write_to_database=True) + mqtt.add_node(node) + + mqtt.connect_and_loop()
\ No newline at end of file diff --git a/bin/temphum_nodes_util.py b/bin/temphum_nodes_util.py new file mode 100755 index 0000000..aa46494 --- /dev/null +++ b/bin/temphum_nodes_util.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +import __py_include + +from homekit.mqtt.temphum import MqttTempHumNodes + +if __name__ == '__main__': + max_name_len = 0 + for node in MqttTempHumNodes: + if len(node.name) > max_name_len: + max_name_len = len(node.name) + + values = [] + for node in MqttTempHumNodes: + hash = node.hash() + if hash in values: + raise ValueError(f'collision detected: {hash}') + values.append(values) + print(' '*(max_name_len-len(node.name)), end='') + print(f'{node.name}: {hash}') diff --git a/bin/temphum_smbus_util.py b/bin/temphum_smbus_util.py new file mode 100755 index 0000000..1cfaa84 --- /dev/null +++ b/bin/temphum_smbus_util.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +import __py_include + +from argparse import ArgumentParser +from homekit.temphum import SensorType +from homekit.temphum.i2c import create_sensor + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('-t', '--type', choices=[item.value for item in SensorType], + required=True, + help='Sensor type') + parser.add_argument('-b', '--bus', type=int, default=0, + help='I2C bus number') + arg = parser.parse_args() + + sensor = create_sensor(SensorType(arg.type), arg.bus) + temp = sensor.temperature() + hum = sensor.humidity() + + print(f'temperature: {temp}') + print(f'rel. humidity: {hum}') diff --git a/bin/temphumd.py b/bin/temphumd.py new file mode 100755 index 0000000..9ea436d --- /dev/null +++ b/bin/temphumd.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +import asyncio +import json +import logging +import __py_include + +from typing import Optional + +from homekit.config import config +from homekit.temphum import SensorType, BaseSensor +from homekit.temphum.i2c import create_sensor + +logger = logging.getLogger(__name__) +sensor: Optional[BaseSensor] = None +lock = asyncio.Lock() +delay = 0.01 + + +async def get_measurements(): + async with lock: + await asyncio.sleep(delay) + + temp = sensor.temperature() + rh = sensor.humidity() + + return rh, temp + + +async def handle_client(reader, writer): + request = None + while request != 'quit': + try: + request = await reader.read(255) + if request == b'\x04': + break + request = request.decode('utf-8').strip() + except Exception: + break + + if request == 'read': + try: + rh, temp = await asyncio.wait_for(get_measurements(), timeout=3) + data = dict(humidity=rh, temp=temp) + except asyncio.TimeoutError as e: + logger.exception(e) + data = dict(error='i2c call timed out') + else: + data = dict(error='invalid request') + + writer.write((json.dumps(data) + '\r\n').encode('utf-8')) + try: + await writer.drain() + except ConnectionResetError: + pass + + writer.close() + + +async def run_server(host, port): + server = await asyncio.start_server(handle_client, host, port) + async with server: + logger.info('Server started.') + await server.serve_forever() + + +if __name__ == '__main__': + config.load_app() + + if 'measure_delay' in config['sensor']: + delay = float(config['sensor']['measure_delay']) + + sensor = create_sensor(SensorType(config['sensor']['type']), + int(config['sensor']['bus'])) + + try: + host, port = config.get_addr('server.listen') + asyncio.run(run_server(host, port)) + except KeyboardInterrupt: + logging.info('Exiting...') diff --git a/bin/web_api.py b/bin/web_api.py new file mode 100755 index 0000000..d221838 --- /dev/null +++ b/bin/web_api.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +import asyncio +import json +import os +import __py_include + +from datetime import datetime, timedelta + +from aiohttp import web +from homekit import http +from homekit.config import config, is_development_mode +from homekit.database import BotsDatabase, SensorsDatabase, InverterDatabase +from homekit.database.inverter_time_formats import * +from homekit.api.types import TemperatureSensorLocation, SoundSensorLocation +from homekit.media import SoundRecordStorage + + +def strptime_auto(s: str) -> datetime: + e = None + for fmt in (FormatTime, FormatDate): + try: + return datetime.strptime(s, fmt) + except ValueError as _e: + e = _e + raise e + + +class AuthError(Exception): + def __init__(self, message: str): + super().__init__() + self.message = message + + +class WebAPIServer(http.HTTPServer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.app.middlewares.append(self.validate_auth) + + self.get('/', self.get_index) + self.get('/sensors/data/', self.GET_sensors_data) + self.get('/sound_sensors/hits/', self.GET_sound_sensors_hits) + self.post('/sound_sensors/hits/', self.POST_sound_sensors_hits) + + self.post('/log/openwrt/', self.POST_openwrt_log) + + self.get('/inverter/consumed_energy/', self.GET_consumed_energy) + self.get('/inverter/grid_consumed_energy/', self.GET_grid_consumed_energy) + + self.get('/recordings/list/', self.GET_recordings_list) + + @staticmethod + @web.middleware + async def validate_auth(req: http.Request, handler): + def get_token() -> str: + name = 'X-Token' + if name in req.headers: + return req.headers[name] + + return req.query['token'] + + try: + token = get_token() + except KeyError: + raise AuthError('no token') + + if token != config['api']['token']: + raise AuthError('invalid token') + + return await handler(req) + + @staticmethod + async def get_index(req: http.Request): + message = "nothing here, keep lurking" + if is_development_mode(): + message += ' (dev mode)' + return http.Response(text=message, content_type='text/plain') + + async def GET_sensors_data(self, req: http.Request): + try: + hours = int(req.query['hours']) + if hours < 1 or hours > 24: + raise ValueError('invalid hours value') + except KeyError: + hours = 1 + + sensor = TemperatureSensorLocation(int(req.query['sensor'])) + + dt_to = datetime.now() + dt_from = dt_to - timedelta(hours=hours) + + db = SensorsDatabase() + data = db.get_temperature_recordings(sensor, (dt_from, dt_to)) + return self.ok(data) + + async def GET_sound_sensors_hits(self, req: http.Request): + location = SoundSensorLocation(int(req.query['location'])) + + after = int(req.query['after']) + kwargs = {} + if after is None: + last = int(req.query['last']) + if last is None: + raise ValueError('you must pass `after` or `last` params') + else: + if not 0 < last < 100: + raise ValueError('invalid last value: must be between 0 and 100') + kwargs['last'] = last + else: + kwargs['after'] = datetime.fromtimestamp(after) + + data = BotsDatabase().get_sound_hits(location, **kwargs) + return self.ok(data) + + async def POST_sound_sensors_hits(self, req: http.Request): + hits = [] + data = await req.post() + for hit, count in json.loads(data['hits']): + if not hasattr(SoundSensorLocation, hit.upper()): + raise ValueError('invalid sensor location') + if count < 1: + raise ValueError(f'invalid count: {count}') + hits.append((SoundSensorLocation[hit.upper()], count)) + + BotsDatabase().add_sound_hits(hits, datetime.now()) + return self.ok() + + async def POST_openwrt_log(self, req: http.Request): + data = await req.post() + + try: + logs = data['logs'] + ap = int(data['ap']) + except KeyError: + logs = '' + ap = 0 + + # validate it + logs = json.loads(logs) + assert type(logs) is list, "invalid json data (list expected)" + + lines = [] + for line in logs: + assert type(line) is list, "invalid line type (list expected)" + assert len(line) == 2, f"expected 2 items in line, got {len(line)}" + assert type(line[0]) is int, "invalid line[0] type (int expected)" + assert type(line[1]) is str, "invalid line[1] type (str expected)" + + lines.append(( + datetime.fromtimestamp(line[0]), + line[1] + )) + + BotsDatabase().add_openwrt_logs(lines, ap) + return self.ok() + + async def GET_recordings_list(self, req: http.Request): + data = await req.post() + + try: + extended = bool(int(data['extended'])) + except KeyError: + extended = False + + node = data['node'] + + root = os.path.join(config['recordings']['directory'], node) + if not os.path.isdir(root): + raise ValueError(f'invalid node {node}: no such directory') + + storage = SoundRecordStorage(root) + files = storage.getfiles(as_objects=extended) + if extended: + files = list(map(lambda file: file.__dict__(), files)) + + return self.ok(files) + + @staticmethod + def _get_inverter_from_to(req: http.Request): + s_from = req.query['from'] + s_to = req.query['to'] + + dt_from = strptime_auto(s_from) + + if s_to == 'now': + dt_to = datetime.now() + else: + dt_to = strptime_auto(s_to) + + return dt_from, dt_to + + async def GET_consumed_energy(self, req: http.Request): + dt_from, dt_to = self._get_inverter_from_to(req) + wh = InverterDatabase().get_consumed_energy(dt_from, dt_to) + return self.ok(wh) + + async def GET_grid_consumed_energy(self, req: http.Request): + dt_from, dt_to = self._get_inverter_from_to(req) + wh = InverterDatabase().get_grid_consumed_energy(dt_from, dt_to) + return self.ok(wh) + + +# start of the program +# -------------------- + +if __name__ == '__main__': + _app_name = 'web_api' + if is_development_mode(): + _app_name += '_dev' + config.load_app(_app_name) + + loop = asyncio.get_event_loop() + + server = WebAPIServer(config.get_addr('server.listen')) + server.run() |