From e3d3d6b76010a6dd5c417f017339bec17fb07887 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 14 Jun 2022 02:44:43 +0300 Subject: media: refactor sound_node, introduce camera_node --- doc/camera_node.md | 5 + doc/sensors_bot.md | 6 + doc/sound_bot.md | 86 +++++-- doc/sound_sensor_server.md | 33 +++ requirements.txt | 1 + src/camera_node.py | 88 +++++++ src/esp32_capture.py | 57 +++++ src/home/api/web_api_client.py | 4 +- src/home/audio/__init__.py | 0 src/home/audio/amixer.py | 91 +++++++ src/home/camera/__init__.py | 1 + src/home/camera/esp32.py | 140 ++++++----- src/home/camera/types.py | 5 + src/home/http/__init__.py | 2 +- src/home/media/__init__.py | 21 ++ src/home/media/__init__.pyi | 27 +++ src/home/media/node_client.py | 120 ++++++++++ src/home/media/node_server.py | 86 +++++++ src/home/media/record.py | 453 +++++++++++++++++++++++++++++++++++ src/home/media/record_client.py | 166 +++++++++++++ src/home/media/storage.py | 197 +++++++++++++++ src/home/media/types.py | 13 + src/home/sound/__init__.py | 8 - src/home/sound/amixer.py | 91 ------- src/home/sound/node_client.py | 109 --------- src/home/sound/record.py | 400 ------------------------------- src/home/sound/record_client.py | 142 ----------- src/home/sound/storage.py | 155 ------------ src/home/web_api/web_api.py | 4 +- src/openwrt_log_analyzer.py | 0 src/sound_bot.py | 40 ++-- src/sound_node.py | 192 ++++----------- src/sound_sensor_server.py | 129 +++++----- src/test/__init__.py | 0 src/test/test.py | 7 - src/test/test_amixer.py | 79 ------ src/test/test_api.py | 11 - src/test/test_esp32_cam.py | 56 ----- src/test/test_inverter_monitor.py | 376 ----------------------------- src/test/test_record_upload.py | 88 ------- src/test/test_send_fake_sound_hit.py | 25 -- src/test/test_sensors_plot.py | 0 src/test/test_sound_node_client.py | 19 -- src/test/test_sound_server_api.py | 66 ----- src/test/test_stopwatch.py | 16 -- systemd/camera_node.service | 13 + systemd/camera_node@.service | 13 + test/__init__.py | 0 test/test.py | 7 + test/test_amixer.py | 79 ++++++ test/test_api.py | 11 + test/test_esp32_cam.py | 56 +++++ test/test_inverter_monitor.py | 376 +++++++++++++++++++++++++++++ test/test_record_upload.py | 88 +++++++ test/test_send_fake_sound_hit.py | 25 ++ test/test_sensors_plot.py | 0 test/test_sound_node_client.py | 19 ++ test/test_sound_server_api.py | 66 +++++ test/test_stopwatch.py | 16 ++ 59 files changed, 2411 insertions(+), 1973 deletions(-) create mode 100644 doc/camera_node.md create mode 100644 doc/sound_sensor_server.md create mode 100755 src/camera_node.py create mode 100755 src/esp32_capture.py create mode 100644 src/home/audio/__init__.py create mode 100644 src/home/audio/amixer.py create mode 100644 src/home/camera/types.py create mode 100644 src/home/media/__init__.py create mode 100644 src/home/media/__init__.pyi create mode 100644 src/home/media/node_client.py create mode 100644 src/home/media/node_server.py create mode 100644 src/home/media/record.py create mode 100644 src/home/media/record_client.py create mode 100644 src/home/media/storage.py create mode 100644 src/home/media/types.py delete mode 100644 src/home/sound/__init__.py delete mode 100644 src/home/sound/amixer.py delete mode 100644 src/home/sound/node_client.py delete mode 100644 src/home/sound/record.py delete mode 100644 src/home/sound/record_client.py delete mode 100644 src/home/sound/storage.py mode change 100644 => 100755 src/openwrt_log_analyzer.py delete mode 100644 src/test/__init__.py delete mode 100755 src/test/test.py delete mode 100755 src/test/test_amixer.py delete mode 100755 src/test/test_api.py delete mode 100755 src/test/test_esp32_cam.py delete mode 100755 src/test/test_inverter_monitor.py delete mode 100755 src/test/test_record_upload.py delete mode 100755 src/test/test_send_fake_sound_hit.py delete mode 100755 src/test/test_sensors_plot.py delete mode 100755 src/test/test_sound_node_client.py delete mode 100755 src/test/test_sound_server_api.py delete mode 100755 src/test/test_stopwatch.py create mode 100644 systemd/camera_node.service create mode 100644 systemd/camera_node@.service create mode 100644 test/__init__.py create mode 100755 test/test.py create mode 100755 test/test_amixer.py create mode 100755 test/test_api.py create mode 100755 test/test_esp32_cam.py create mode 100755 test/test_inverter_monitor.py create mode 100755 test/test_record_upload.py create mode 100755 test/test_send_fake_sound_hit.py create mode 100755 test/test_sensors_plot.py create mode 100755 test/test_sound_node_client.py create mode 100755 test/test_sound_server_api.py create mode 100755 test/test_stopwatch.py diff --git a/doc/camera_node.md b/doc/camera_node.md new file mode 100644 index 0000000..5f2d8d8 --- /dev/null +++ b/doc/camera_node.md @@ -0,0 +1,5 @@ +## Configuration + +``` + +``` \ No newline at end of file diff --git a/doc/sensors_bot.md b/doc/sensors_bot.md index 9f1c008..c6dba42 100644 --- a/doc/sensors_bot.md +++ b/doc/sensors_bot.md @@ -30,4 +30,10 @@ label_en = "There" [logging] verbose = false +``` + +## Dependencies + +``` +apt install python3-matplotlib ``` \ No newline at end of file diff --git a/doc/sound_bot.md b/doc/sound_bot.md index f273e7c..93241be 100644 --- a/doc/sound_bot.md +++ b/doc/sound_bot.md @@ -2,35 +2,73 @@ ## Configuration example -```toml -[bot] -token = "..." -users = [1, 2] -manual_record_allowlist = [ 1 ] -notify_users = [ 1, 2 ] +```yaml +bot: + token: "..." + users: [1, 2] + manual_record_allowlist: [ 1 ] + notify_users: [ 1, 2 ] + + record_intervals: [15, 30, 60, 180, 300, 600] + guard_server: "1.2.3.4:8311" + +api: + token: "..." + host: "..." -record_intervals = [15, 30, 60, 180, 300, 600] -guard_server = "1.2.3.4:8311" +nodes: + name1: + addr: "1.2.3.4:8313" + label: + ru: "название 1" + en: "name 1" + name2: + addr: "1.2.3.5:8313" + label: + ru: "название2" + en: "name 2" -[api] -token = "..." -host = "..." +sound_sensors: + name1: + ru: "название 1" + en: "name 1" + name2: + ru: "название 2" + en: "name 2" -[nodes] -name1.addr = '1.2.3.4:8313' -name1.label = { ru="название 1", en="name 1" } +cameras: + cam1: + label: + ru: "название 1" + en: "name 1" + type: esp32 + addr: "1.2.3.4:80" + settings: + framesize: 9 + vflip: true + hmirror: true + lenc: true + wpc: true + bpc: false + raw_gma: false + agc: true + gainceiling: 5 + quality: 10 + awb_gain: false + awb: true + aec_dsp: true + aec: true + flash: true -name2.addr = '1.2.3.5:8313' -name2.label = { ru="название2", en="name 2" } +logging: + verbose: false + default_fmt: true -[sound_sensors] -name1 = { ru="название 1", en="name 1" } -name2 = { ru="название 2", en="name 2" } +``` -[cameras] -name1 = { ru="название 1", en="name 1", type="esp32", addr="1.2.3.4:80", settings = {framesize=9, vflip=true, hmirror=true, lenc=true, wpc=true, bpc=false, raw_gma=false, agc=true, gainceiling=5, quality=10, awb_gain=false, awb=true, aec_dsp=true, aec=true} } -[logging] -verbose = false -default_fmt = true +## Dependencies + +``` +apt install python3-pil ``` \ No newline at end of file diff --git a/doc/sound_sensor_server.md b/doc/sound_sensor_server.md new file mode 100644 index 0000000..9543562 --- /dev/null +++ b/doc/sound_sensor_server.md @@ -0,0 +1,33 @@ +## Configuration + +``` +[server] +listen = "0.0.0.0:8311" +guard_control = true +guard_recording_default = false + +[sensor_to_sound_nodes_relations] +big_house = ['bh1', 'bh2'] +john = ['john'] + +[sensor_to_camera_nodes_relations] +big_house = ['bh'] +john = ['john'] + +[sound_nodes] +bh1 = { addr = '192.168.1.2:8313', durations = [7, 30] } +bh2 = { addr = '192.168.1.3:8313', durations = [10, 60] } +john = { addr = '192.168.1.4:8313', durations = [10, 60] } + +[camera_nodes] +bh = { addr = '192.168.1.2:8314', durations = [7, 30] } +john = { addr = '192.168.1.4:8314', durations = [10, 60] } + +[api] +token = "..." +host = "..." + +[logging] +verbose = false +default_fmt = true +``` \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 9782e86..53c3cd0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ pytz~=2021.3 PyYAML~=6.0 apscheduler~=3.9.1 psutil~=5.9.1 +aioshutil~=1.1 # following can be installed from debian repositories # matplotlib~=3.5.0 diff --git a/src/camera_node.py b/src/camera_node.py new file mode 100755 index 0000000..206361a --- /dev/null +++ b/src/camera_node.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +import asyncio +import time + +from home.config import config +from home.media import MediaNodeServer, CameraRecordStorage, CameraRecorder +from home.camera import CameraType, esp32 +from home.util import parse_addr, Addr +from home import http + + +# Implements HTTP API for a camera. +# --------------------------------- + +class ESP32CameraNodeServer(MediaNodeServer): + def __init__(self, web_addr: Addr, *args, **kwargs): + super().__init__(*args, **kwargs) + self.last_settings_sync = 0 + + self.web = esp32.WebClient(web_addr) + self.get('/capture/', self.capture) + + async def capture(self, req: http.Request): + await self.sync_settings_if_needed() + + try: + with_flash = int(req.query['with_flash']) + except KeyError: + with_flash = 0 + + if with_flash: + await self.web.setflash(True) + await asyncio.sleep(0.2) + + bytes = (await self.web.capture()).read() + + if with_flash: + await self.web.setflash(False) + + res = http.StreamResponse() + res.content_type = 'image/jpeg' + res.content_length = len(bytes) + + await res.prepare(req) + await res.write(bytes) + await res.write_eof() + + return res + + async def do_record(self, request: http.Request): + await self.sync_settings_if_needed() + + # sync settings + return super().do_record(request) + + async def sync_settings_if_needed(self): + if self.last_settings_sync != 0 and time.time() - self.last_settings_sync < 300: + return + changed = await self.web.syncsettings(config['camera']['settings']) + if changed: + self.logger.debug('sync_settings_if_needed: some settings were changed, sleeping for 0.4 sec') + await asyncio.sleep(0.4) + self.last_settings_sync = time.time() + + +if __name__ == '__main__': + config.load('camera_node') + + storage = CameraRecordStorage(config['node']['storage']) + + recorder_kwargs = {} + camera_type = CameraType(config['camera']['type']) + if camera_type == CameraType.ESP32: + recorder_kwargs['stream_addr'] = parse_addr(config['camera']['stream_addr']) + else: + raise RuntimeError(f'unsupported camera type {camera_type}') + + recorder = CameraRecorder(storage=storage, + camera_type=camera_type, + **recorder_kwargs) + recorder.start_thread() + + server = ESP32CameraNodeServer( + recorder=recorder, + storage=storage, + web_addr=parse_addr(config['camera']['web_addr']), + addr=parse_addr(config['node']['listen'])) + server.run() diff --git a/src/esp32_capture.py b/src/esp32_capture.py new file mode 100755 index 0000000..4a9ce10 --- /dev/null +++ b/src/esp32_capture.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +import asyncio +import logging +import os.path + +from argparse import ArgumentParser +from home.camera.esp32 import WebClient +from home.util import parse_addr, Addr +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from datetime import datetime +from typing import Optional + +logger = logging.getLogger(__name__) +cam: Optional[WebClient] = None + + +class ESP32Capture: + def __init__(self, addr: Addr, interval: float, output_directory: str): + self.logger = logging.getLogger(self.__class__.__name__) + self.client = WebClient(addr) + self.output_directory = output_directory + self.interval = interval + + self.scheduler = AsyncIOScheduler() + self.scheduler.add_job(self.capture, 'interval', seconds=arg.interval) + self.scheduler.start() + + async def capture(self): + self.logger.debug('capture: start') + now = datetime.now() + filename = os.path.join( + self.output_directory, + now.strftime('%Y-%m-%d-%H:%M:%S.%f.jpg') + ) + if not await self.client.capture(filename): + self.logger.error('failed to capture') + self.logger.debug('capture: done') + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--addr', type=str, required=True) + parser.add_argument('--output-directory', type=str, required=True) + parser.add_argument('--interval', type=float, default=0.5) + parser.add_argument('--verbose', action='store_true') + arg = parser.parse_args() + + if arg.verbose: + logging.basicConfig(level=logging.DEBUG) + + loop = asyncio.get_event_loop() + + ESP32Capture(parse_addr(arg.addr), arg.interval, arg.output_directory) + try: + loop.run_forever() + except KeyboardInterrupt: + pass diff --git a/src/home/api/web_api_client.py b/src/home/api/web_api_client.py index e3b0988..34d080c 100644 --- a/src/home/api/web_api_client.py +++ b/src/home/api/web_api_client.py @@ -13,7 +13,7 @@ from .errors import ApiResponseError from .types import * from ..config import config from ..util import stringify -from ..sound import RecordFile, SoundNodeClient +from ..media import RecordFile, MediaNodeClient logger = logging.getLogger(__name__) @@ -103,7 +103,7 @@ class WebAPIClient: def recordings_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]: files = self._get('recordings/list/', {'extended': int(extended)})['data'] if as_objects: - return SoundNodeClient.record_list_from_serialized(files) + return MediaNodeClient.record_list_from_serialized(files) return files def _process_sound_sensor_hits_data(self, data: list[dict]) -> list[dict]: diff --git a/src/home/audio/__init__.py b/src/home/audio/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/home/audio/amixer.py b/src/home/audio/amixer.py new file mode 100644 index 0000000..0ab2c64 --- /dev/null +++ b/src/home/audio/amixer.py @@ -0,0 +1,91 @@ +import subprocess + +from ..config import config +from threading import Lock +from typing import Union + + +_lock = Lock() +_default_step = 5 + + +def has_control(s: str) -> bool: + for control in config['amixer']['controls']: + if control['name'] == s: + return True + return False + + +def get_caps(s: str) -> list[str]: + for control in config['amixer']['controls']: + if control['name'] == s: + return control['caps'] + raise KeyError(f'control {s} not found') + + +def get_all() -> list: + controls = [] + for control in config['amixer']['controls']: + controls.append({ + 'name': control['name'], + 'info': get(control['name']), + 'caps': control['caps'] + }) + return controls + + +def get(control: str): + return call('get', control) + + +def mute(control): + return call('set', control, 'mute') + + +def unmute(control): + return call('set', control, 'unmute') + + +def cap(control): + return call('set', control, 'cap') + + +def nocap(control): + return call('set', control, 'nocap') + + +def _get_default_step() -> int: + if 'step' in config['amixer']: + return int(config['amixer']['step']) + + return _default_step + + +def incr(control, step=None): + if step is None: + step = _get_default_step() + return call('set', control, f'{step}%+') + + +def decr(control, step=None): + if step is None: + step = _get_default_step() + return call('set', control, f'{step}%-') + + +def call(*args, return_code=False) -> Union[int, str]: + with _lock: + result = subprocess.run([config['amixer']['bin'], *args], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if return_code: + return result.returncode + + if result.returncode != 0: + raise AmixerError(result.stderr.decode().strip()) + + return result.stdout.decode().strip() + + +class AmixerError(OSError): + pass diff --git a/src/home/camera/__init__.py b/src/home/camera/__init__.py index e69de29..626930b 100644 --- a/src/home/camera/__init__.py +++ b/src/home/camera/__init__.py @@ -0,0 +1 @@ +from .types import CameraType \ No newline at end of file diff --git a/src/home/camera/esp32.py b/src/home/camera/esp32.py index 246022b..fe6de0e 100644 --- a/src/home/camera/esp32.py +++ b/src/home/camera/esp32.py @@ -1,10 +1,12 @@ import logging -import shutil import requests import json +import asyncio +import aioshutil +from io import BytesIO +from functools import partial from typing import Union, Optional -from time import sleep from enum import Enum from ..api.errors import ApiResponseError from ..util import Addr @@ -41,14 +43,15 @@ def _assert_bounds(n: int, min: int, max: int): class WebClient: - def __init__(self, addr: Addr): + def __init__(self, + addr: Addr): self.endpoint = f'http://{addr[0]}:{addr[1]}' self.logger = logging.getLogger(self.__class__.__name__) self.delay = 0 self.isfirstrequest = True - def syncsettings(self, settings) -> bool: - status = self.getstatus() + async def syncsettings(self, settings) -> bool: + status = await self.getstatus() self.logger.debug(f'syncsettings: status={status}') changed_anything = False @@ -82,7 +85,7 @@ class WebClient: func = getattr(self, f'set{name}') self.logger.debug(f'syncsettings: calling set{name}({value})') - func(value) + await func(value) changed_anything = True except AttributeError as exc: @@ -94,99 +97,106 @@ class WebClient: def setdelay(self, delay: int): self.delay = delay - def capture(self, save_to: str): - self._call('capture', save_to=save_to) + async def capture(self, output: Optional[str] = None) -> Union[BytesIO, bool]: + kw = {} + if output: + kw['save_to'] = output + else: + kw['as_bytes'] = True + return await self._call('capture', **kw) - def getstatus(self): - return json.loads(self._call('status')) + async def getstatus(self): + return json.loads(await self._call('status')) - def setflash(self, enable: bool): - self._control('flash', int(enable)) + async def setflash(self, enable: bool): + await self._control('flash', int(enable)) - def setframesize(self, fs: Union[int, FrameSize]): + async def setframesize(self, fs: Union[int, FrameSize]): if type(fs) is int: fs = FrameSize(fs) - self._control('framesize', fs.value) + await self._control('framesize', fs.value) - def sethmirror(self, enable: bool): - self._control('hmirror', int(enable)) + async def sethmirror(self, enable: bool): + await self._control('hmirror', int(enable)) - def setvflip(self, enable: bool): - self._control('vflip', int(enable)) + async def setvflip(self, enable: bool): + await self._control('vflip', int(enable)) - def setawb(self, enable: bool): - self._control('awb', int(enable)) + async def setawb(self, enable: bool): + await self._control('awb', int(enable)) - def setawbgain(self, enable: bool): - self._control('awb_gain', int(enable)) + async def setawbgain(self, enable: bool): + await self._control('awb_gain', int(enable)) - def setwbmode(self, mode: WBMode): - self._control('wb_mode', mode.value) + async def setwbmode(self, mode: WBMode): + await self._control('wb_mode', mode.value) - def setaecsensor(self, enable: bool): - self._control('aec', int(enable)) + async def setaecsensor(self, enable: bool): + await self._control('aec', int(enable)) - def setaecdsp(self, enable: bool): - self._control('aec2', int(enable)) + async def setaecdsp(self, enable: bool): + await self._control('aec2', int(enable)) - def setagc(self, enable: bool): - self._control('agc', int(enable)) + async def setagc(self, enable: bool): + await self._control('agc', int(enable)) - def setagcgain(self, gain: int): + async def setagcgain(self, gain: int): _assert_bounds(gain, 1, 31) - self._control('agc_gain', gain) + await self._control('agc_gain', gain) - def setgainceiling(self, gainceiling: int): + async def setgainceiling(self, gainceiling: int): _assert_bounds(gainceiling, 2, 128) - self._control('gainceiling', gainceiling) + await self._control('gainceiling', gainceiling) - def setbpc(self, enable: bool): - self._control('bpc', int(enable)) + async def setbpc(self, enable: bool): + await self._control('bpc', int(enable)) - def setwpc(self, enable: bool): - self._control('wpc', int(enable)) + async def setwpc(self, enable: bool): + await self._control('wpc', int(enable)) - def setrawgma(self, enable: bool): - self._control('raw_gma', int(enable)) + async def setrawgma(self, enable: bool): + await self._control('raw_gma', int(enable)) - def setlenscorrection(self, enable: bool): - self._control('lenc', int(enable)) + async def setlenscorrection(self, enable: bool): + await self._control('lenc', int(enable)) - def setdcw(self, enable: bool): - self._control('dcw', int(enable)) + async def setdcw(self, enable: bool): + await self._control('dcw', int(enable)) - def setcolorbar(self, enable: bool): - self._control('colorbar', int(enable)) + async def setcolorbar(self, enable: bool): + await self._control('colorbar', int(enable)) - def setquality(self, q: int): + async def setquality(self, q: int): _assert_bounds(q, 4, 63) - self._control('quality', q) + await self._control('quality', q) - def setbrightness(self, brightness: int): + async def setbrightness(self, brightness: int): _assert_bounds(brightness, -2, -2) - self._control('brightness', brightness) + await self._control('brightness', brightness) - def setcontrast(self, contrast: int): + async def setcontrast(self, contrast: int): _assert_bounds(contrast, -2, 2) - self._control('contrast', contrast) + await self._control('contrast', contrast) - def setsaturation(self, saturation: int): + async def setsaturation(self, saturation: int): _assert_bounds(saturation, -2, 2) - self._control('saturation', saturation) + await self._control('saturation', saturation) - def _control(self, var: str, value: Union[int, str]): - self._call('control', params={'var': var, 'val': value}) + async def _control(self, var: str, value: Union[int, str]): + return await self._call('control', params={'var': var, 'val': value}) - def _call(self, - method: str, - params: Optional[dict] = None, - save_to: Optional[str] = None): + async def _call(self, + method: str, + params: Optional[dict] = None, + save_to: Optional[str] = None, + as_bytes=False) -> Union[str, bool, BytesIO]: + loop = asyncio.get_event_loop() if not self.isfirstrequest and self.delay > 0: sleeptime = self.delay / 1000 self.logger.debug(f'sleeping for {sleeptime}') - sleep(sleeptime) + await asyncio.sleep(sleeptime) self.isfirstrequest = False @@ -199,14 +209,18 @@ class WebClient: if save_to: kwargs['stream'] = True - r = requests.get(url, **kwargs) + r = await loop.run_in_executor(None, + partial(requests.get, url, **kwargs)) if r.status_code != 200: raise ApiResponseError(status_code=r.status_code) + if as_bytes: + return BytesIO(r.content) + if save_to: r.raise_for_status() with open(save_to, 'wb') as f: - shutil.copyfileobj(r.raw, f) + await aioshutil.copyfileobj(r.raw, f) return True return r.text diff --git a/src/home/camera/types.py b/src/home/camera/types.py new file mode 100644 index 0000000..de59022 --- /dev/null +++ b/src/home/camera/types.py @@ -0,0 +1,5 @@ +from enum import Enum + + +class CameraType(Enum): + ESP32 = 'esp32' diff --git a/src/home/http/__init__.py b/src/home/http/__init__.py index 2597457..963e13c 100644 --- a/src/home/http/__init__.py +++ b/src/home/http/__init__.py @@ -1,2 +1,2 @@ from .http import serve, ok, routes, HTTPServer -from aiohttp.web import FileResponse, Request +from aiohttp.web import FileResponse, StreamResponse, Request \ No newline at end of file diff --git a/src/home/media/__init__.py b/src/home/media/__init__.py new file mode 100644 index 0000000..e8268cf --- /dev/null +++ b/src/home/media/__init__.py @@ -0,0 +1,21 @@ +import importlib +import itertools + +__map__ = { + 'types': ['MediaNodeType'], + 'record_client': ['SoundRecordClient', 'CameraRecordClient', 'RecordClient'], + 'node_server': ['MediaNodeServer'], + 'node_client': ['SoundNodeClient', 'CameraNodeClient', 'MediaNodeClient'], + 'storage': ['SoundRecordStorage', 'CameraRecordStorage', 'SoundRecordFile', 'CameraRecordFile', 'RecordFile'], + 'record': ['SoundRecorder', 'CameraRecorder'] +} + +__all__ = list(itertools.chain(*__map__.values())) + +def __getattr__(name): + if name in __all__: + for file, names in __map__.items(): + if name in names: + module = importlib.import_module(f'.{file}', __name__) + return getattr(module, name) + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/home/media/__init__.pyi b/src/home/media/__init__.pyi new file mode 100644 index 0000000..0e85cde --- /dev/null +++ b/src/home/media/__init__.pyi @@ -0,0 +1,27 @@ +from .types import ( + MediaNodeType as MediaNodeType +) +from .record_client import ( + SoundRecordClient as SoundRecordClient, + CameraRecordClient as CameraRecordClient, + RecordClient as RecordClient +) +from .node_server import ( + MediaNodeServer as MediaNodeServer +) +from .node_client import ( + SoundNodeClient as SoundNodeClient, + CameraNodeClient as CameraNodeClient, + MediaNodeClient as MediaNodeClient +) +from .storage import ( + SoundRecordStorage as SoundRecordStorage, + CameraRecordStorage as CameraRecordStorage, + SoundRecordFile as SoundRecordFile, + CameraRecordFile as CameraRecordFile, + RecordFile as RecordFile +) +from .record import ( + SoundRecorder as SoundRecorder, + CameraRecorder as CameraRecorder +) \ No newline at end of file diff --git a/src/home/media/node_client.py b/src/home/media/node_client.py new file mode 100644 index 0000000..18f0779 --- /dev/null +++ b/src/home/media/node_client.py @@ -0,0 +1,120 @@ +import requests +import shutil +import logging + +from typing import Optional, Union +from .storage import RecordFile +from ..util import Addr +from ..camera.types import CameraType +from ..api.errors import ApiResponseError + + +class MediaNodeClient: + def __init__(self, addr: Addr): + self.endpoint = f'http://{addr[0]}:{addr[1]}' + self.logger = logging.getLogger(self.__class__.__name__) + + def record(self, duration: int): + return self._call('record/', params={"duration": duration}) + + def record_info(self, record_id: int): + return self._call(f'record/info/{record_id}/') + + def record_forget(self, record_id: int): + return self._call(f'record/forget/{record_id}/') + + def record_download(self, record_id: int, output: str): + return self._call(f'record/download/{record_id}/', save_to=output) + + def storage_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]: + r = self._call('storage/list/', params={'extended': int(extended)}) + files = r['files'] + if as_objects: + return self.record_list_from_serialized(files) + return files + + @staticmethod + def record_list_from_serialized(files: Union[list[str], list[dict]]): + new_files = [] + for f in files: + kwargs = {'remote': True} + if isinstance(f, dict): + name = f['filename'] + kwargs['remote_filesize'] = f['filesize'] + else: + name = f + item = RecordFile.create(name, **kwargs) + new_files.append(item) + return new_files + + def storage_delete(self, file_id: str): + return self._call('storage/delete/', params={'file_id': file_id}) + + def storage_download(self, file_id: str, output: str): + return self._call('storage/download/', params={'file_id': file_id}, save_to=output) + + def _call(self, + method: str, + params: dict = None, + save_to: Optional[str] = None): + kwargs = {} + if isinstance(params, dict): + kwargs['params'] = params + if save_to: + kwargs['stream'] = True + + url = f'{self.endpoint}/{method}' + self.logger.debug(f'calling {url}, kwargs: {kwargs}') + + r = requests.get(url, **kwargs) + if r.status_code != 200: + response = r.json() + raise ApiResponseError(status_code=r.status_code, + error_type=response['error'], + error_message=response['message'] or None, + error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None) + + if save_to: + r.raise_for_status() + with open(save_to, 'wb') as f: + shutil.copyfileobj(r.raw, f) + return True + + return r.json()['response'] + + +class SoundNodeClient(MediaNodeClient): + def amixer_get_all(self): + return self._call('amixer/get-all/') + + def amixer_get(self, control: str): + return self._call(f'amixer/get/{control}/') + + def amixer_incr(self, control: str, step: Optional[int] = None): + params = {'step': step} if step is not None else None + return self._call(f'amixer/incr/{control}/', params=params) + + def amixer_decr(self, control: str, step: Optional[int] = None): + params = {'step': step} if step is not None else None + return self._call(f'amixer/decr/{control}/', params=params) + + def amixer_mute(self, control: str): + return self._call(f'amixer/mute/{control}/') + + def amixer_unmute(self, control: str): + return self._call(f'amixer/unmute/{control}/') + + def amixer_cap(self, control: str): + return self._call(f'amixer/cap/{control}/') + + def amixer_nocap(self, control: str): + return self._call(f'amixer/nocap/{control}/') + + +class CameraNodeClient(MediaNodeClient): + def capture(self, + save_to: str, + with_flash: bool = False): + return self._call('capture/', + {'with_flash': int(with_flash)}, + save_to=save_to) diff --git a/src/home/media/node_server.py b/src/home/media/node_server.py new file mode 100644 index 0000000..5d0803c --- /dev/null +++ b/src/home/media/node_server.py @@ -0,0 +1,86 @@ +from .. import http +from .record import Recorder +from .types import RecordStatus +from .storage import RecordStorage + + +class MediaNodeServer(http.HTTPServer): + recorder: Recorder + storage: RecordStorage + + def __init__(self, + recorder: Recorder, + storage: RecordStorage, + *args, **kwargs): + super().__init__(*args, **kwargs) + + self.recorder = recorder + self.storage = storage + + self.get('/record/', self.do_record) + self.get('/record/info/{id}/', self.record_info) + self.get('/record/forget/{id}/', self.record_forget) + self.get('/record/download/{id}/', self.record_download) + + self.get('/storage/list/', self.storage_list) + self.get('/storage/delete/', self.storage_delete) + self.get('/storage/download/', self.storage_download) + + async def do_record(self, request: http.Request): + duration = int(request.query['duration']) + max = Recorder.get_max_record_time()*15 + if not 0 < duration <= max: + raise ValueError(f'invalid duration: max duration is {max}') + + record_id = self.recorder.record(duration) + return http.ok({'id': record_id}) + + async def record_info(self, request: http.Request): + record_id = int(request.match_info['id']) + info = self.recorder.get_info(record_id) + return http.ok(info.as_dict()) + + async def record_forget(self, request: http.Request): + record_id = int(request.match_info['id']) + + info = self.recorder.get_info(record_id) + assert info.status in (RecordStatus.FINISHED, RecordStatus.ERROR), f"can't forget: record status is {info.status}" + + self.recorder.forget(record_id) + return http.ok() + + async def record_download(self, request: http.Request): + record_id = int(request.match_info['id']) + + info = self.recorder.get_info(record_id) + assert info.status == RecordStatus.FINISHED, f"record status is {info.status}" + + return http.FileResponse(info.file.path) + + async def storage_list(self, request: http.Request): + extended = 'extended' in request.query and int(request.query['extended']) == 1 + + files = self.storage.getfiles(as_objects=extended) + if extended: + files = list(map(lambda file: file.__dict__(), files)) + + return http.ok({ + 'files': files + }) + + async def storage_delete(self, request: http.Request): + file_id = request.query['file_id'] + file = self.storage.find(file_id) + if not file: + raise ValueError(f'file {file} not found') + + self.storage.delete(file) + return http.ok() + + async def storage_download(self, request): + file_id = request.query['file_id'] + file = self.storage.find(file_id) + if not file: + raise ValueError(f'file {file} not found') + + return http.FileResponse(file.path) diff --git a/src/home/media/record.py b/src/home/media/record.py new file mode 100644 index 0000000..d3abfbb --- /dev/null +++ b/src/home/media/record.py @@ -0,0 +1,453 @@ +import os +import threading +import logging +import time +import subprocess +import signal + +from typing import Optional +from ..util import find_child_processes, Addr +from ..config import config +from .storage import RecordFile, RecordStorage +from .types import RecordStatus +from ..camera.types import CameraType + + +_history_item_timeout = 7200 +_history_cleanup_freq = 3600 + + +class RecordHistoryItem: + id: int + request_time: float + start_time: float + stop_time: float + relations: list[int] + status: RecordStatus + error: Optional[Exception] + file: Optional[RecordFile] + creation_time: float + + def __init__(self, id): + self.id = id + self.request_time = 0 + self.start_time = 0 + self.stop_time = 0 + self.relations = [] + self.status = RecordStatus.WAITING + self.file = None + self.error = None + self.creation_time = time.time() + + def add_relation(self, related_id: int): + self.relations.append(related_id) + + def mark_started(self, start_time: float): + self.start_time = start_time + self.status = RecordStatus.RECORDING + + def mark_finished(self, end_time: float, file: RecordFile): + self.stop_time = end_time + self.file = file + self.status = RecordStatus.FINISHED + + def mark_failed(self, error: Exception): + self.status = RecordStatus.ERROR + self.error = error + + def as_dict(self) -> dict: + data = { + 'id': self.id, + 'request_time': self.request_time, + 'status': self.status.value, + 'relations': self.relations, + 'start_time': self.start_time, + 'stop_time': self.stop_time, + } + if self.error: + data['error'] = str(self.error) + if self.file: + data['file'] = self.file.__dict__() + return data + + +class RecordingNotFoundError(Exception): + pass + + +class RecordHistory: + history: dict[int, RecordHistoryItem] + + def __init__(self): + self.history = {} + self.logger = logging.getLogger(self.__class__.__name__) + + def add(self, record_id: int): + self.logger.debug(f'add: record_id={record_id}') + + r = RecordHistoryItem(record_id) + r.request_time = time.time() + + self.history[record_id] = r + + def delete(self, record_id: int): + self.logger.debug(f'delete: record_id={record_id}') + del self.history[record_id] + + def cleanup(self): + del_ids = [] + for rid, item in self.history.items(): + if item.creation_time < time.time()-_history_item_timeout: + del_ids.append(rid) + for rid in del_ids: + self.delete(rid) + + def __getitem__(self, key): + if key not in self.history: + raise RecordingNotFoundError() + + return self.history[key] + + def __setitem__(self, key, value): + raise NotImplementedError('setting history item this way is prohibited') + + def __contains__(self, key): + return key in self.history + + +class Recording: + RECORDER_PROGRAM = None + + start_time: float + stop_time: float + duration: int + record_id: int + recorder_program_pid: Optional[int] + process: Optional[subprocess.Popen] + + g_record_id = 1 + + def __init__(self): + if self.RECORDER_PROGRAM is None: + raise RuntimeError('this is abstract class') + + self.start_time = 0 + self.stop_time = 0 + self.duration = 0 + self.process = None + self.recorder_program_pid = None + self.record_id = Recording.next_id() + self.logger = logging.getLogger(self.__class__.__name__) + + def is_started(self) -> bool: + return self.start_time > 0 and self.stop_time > 0 + + def is_waiting(self): + return self.duration > 0 + + def ask_for(self, duration) -> int: + overtime = 0 + orig_duration = duration + + if self.is_started(): + already_passed = time.time() - self.start_time + max_duration = Recorder.get_max_record_time() - already_passed + self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}') + else: + max_duration = Recorder.get_max_record_time() + + if duration > max_duration: + overtime = duration - max_duration + duration = max_duration + + self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}') + + self.duration += duration + if self.is_started(): + til_end = self.stop_time - time.time() + if til_end < 0: + til_end = 0 + + _prev_stop_time = self.stop_time + _to_add = duration - til_end + if _to_add < 0: + _to_add = 0 + + self.stop_time += _to_add + self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})') + + return overtime + + def start(self, output: str): + assert self.start_time == 0 and self.stop_time == 0, "already started?!" + assert self.process is None, "self.process is not None, what the hell?" + + cur = time.time() + self.start_time = cur + self.stop_time = cur + self.duration + + cmd = self.get_command(output) + self.logger.debug(f'start: running `{cmd}`') + self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True) + + sh_pid = self.process.pid + self.logger.debug(f'start: started, pid of shell is {sh_pid}') + + pid = self.find_recorder_program_pid(sh_pid) + if pid is not None: + self.recorder_program_pid = pid + self.logger.debug(f'start: pid of {self.RECORDER_PROGRAM} is {pid}') + + def get_command(self, output: str) -> str: + pass + + def stop(self): + if self.process: + if self.recorder_program_pid is None: + self.recorder_program_pid = self.find_recorder_program_pid(self.process.pid) + + if self.recorder_program_pid is not None: + os.kill(self.recorder_program_pid, signal.SIGINT) + timeout = config['node']['process_wait_timeout'] + + self.logger.debug(f'stop: sent SIGINT to {self.recorder_program_pid}. now waiting up to {timeout} seconds...') + try: + self.process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + self.logger.warning(f'stop: wait({timeout}): timeout expired, calling terminate()') + self.process.terminate() + else: + self.logger.warning(f'stop: pid of {self.RECORDER_PROGRAM} is unknown, calling terminate()') + self.process.terminate() + + rc = self.process.returncode + self.logger.debug(f'stop: rc={rc}') + + self.process = None + self.recorder_program_pid = 0 + + self.duration = 0 + self.start_time = 0 + self.stop_time = 0 + + def find_recorder_program_pid(self, sh_pid: int): + try: + children = find_child_processes(sh_pid) + except OSError as exc: + self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc)) + return None + + for child in children: + if self.RECORDER_PROGRAM in child.cmd: + return child.pid + + return None + + @staticmethod + def next_id() -> int: + cur_id = Recording.g_record_id + Recording.g_record_id += 1 + return cur_id + + def increment_id(self): + self.record_id = Recording.next_id() + + +class Recorder: + TEMP_NAME = None + + interrupted: bool + lock: threading.Lock + history_lock: threading.Lock + recording: Optional[Recording] + overtime: int + history: RecordHistory + next_history_cleanup_time: float + storage: RecordStorage + + def __init__(self, + storage: RecordStorage, + recording: Recording): + if self.TEMP_NAME is None: + raise RuntimeError('this is abstract class') + + self.storage = storage + self.recording = recording + self.interrupted = False + self.lock = threading.Lock() + self.history_lock = threading.Lock() + self.overtime = 0 + self.history = RecordHistory() + self.next_history_cleanup_time = 0 + self.logger = logging.getLogger(self.__class__.__name__) + + def start_thread(self): + t = threading.Thread(target=self.loop) + t.daemon = True + t.start() + + def loop(self) -> None: + tempname = os.path.join(self.storage.root, self.TEMP_NAME) + + while not self.interrupted: + cur = time.time() + stopped = False + cur_record_id = None + + if self.next_history_cleanup_time == 0: + self.next_history_cleanup_time = time.time() + _history_cleanup_freq + elif self.next_history_cleanup_time <= time.time(): + self.logger.debug('loop: calling history.cleanup()') + try: + self.history.cleanup() + except Exception as e: + self.logger.error('loop: error while history.cleanup(): ' + str(e)) + self.next_history_cleanup_time = time.time() + _history_cleanup_freq + + with self.lock: + cur_record_id = self.recording.record_id + # self.logger.debug(f'cur_record_id={cur_record_id}') + + if not self.recording.is_started(): + if self.recording.is_waiting(): + try: + if os.path.exists(tempname): + self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..') + try: + os.unlink(tempname) + except OSError as e: + self.logger.exception(e) + self.recording.start(tempname) + with self.history_lock: + self.history[cur_record_id].mark_started(self.recording.start_time) + except Exception as exc: + self.logger.exception(exc) + + # there should not be any errors, but still.. + try: + self.recording.stop() + except Exception as exc: + self.logger.exception(exc) + + with self.history_lock: + self.history[cur_record_id].mark_failed(exc) + + self.logger.debug(f'loop: start exc path: calling increment_id()') + self.recording.increment_id() + else: + if cur >= self.recording.stop_time: + try: + start_time = self.recording.start_time + stop_time = self.recording.stop_time + self.recording.stop() + + saved_name = self.storage.save(tempname, + record_id=cur_record_id, + start_time=int(start_time), + stop_time=int(stop_time)) + + with self.history_lock: + self.history[cur_record_id].mark_finished(stop_time, saved_name) + except Exception as exc: + self.logger.exception(exc) + with self.history_lock: + self.history[cur_record_id].mark_failed(exc) + finally: + self.logger.debug(f'loop: stop exc final path: calling increment_id()') + self.recording.increment_id() + + stopped = True + + if stopped and self.overtime > 0: + self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})') + _overtime = self.overtime + self.overtime = 0 + + related_id = self.record(_overtime) + self.logger.info(f'enqueued another record with id {related_id}') + + if cur_record_id is not None: + with self.history_lock: + self.history[cur_record_id].add_relation(related_id) + + time.sleep(0.2) + + def record(self, duration: int) -> int: + self.logger.debug(f'record: duration={duration}') + with self.lock: + overtime = self.recording.ask_for(duration) + self.logger.debug(f'overtime={overtime}') + + if overtime > self.overtime: + self.overtime = overtime + + if not self.recording.is_started(): + with self.history_lock: + self.history.add(self.recording.record_id) + + return self.recording.record_id + + def stop(self): + self.interrupted = True + + def get_info(self, record_id: int) -> RecordHistoryItem: + with self.history_lock: + return self.history[record_id] + + def forget(self, record_id: int): + with self.history_lock: + self.logger.info(f'forget: removing record {record_id} from history') + self.history.delete(record_id) + + @staticmethod + def get_max_record_time() -> int: + return config['node']['record_max_time'] + + +class SoundRecorder(Recorder): + TEMP_NAME = 'temp.mp3' + + def __init__(self, *args, **kwargs): + super().__init__(recording=SoundRecording(), + *args, **kwargs) + + +class CameraRecorder(Recorder): + TEMP_NAME = 'temp.mp4' + + def __init__(self, + camera_type: CameraType, + *args, **kwargs): + if camera_type == CameraType.ESP32: + recording = ESP32CameraRecording(stream_addr=kwargs['stream_addr']) + del kwargs['stream_addr'] + else: + raise RuntimeError(f'unsupported camera type {camera_type}') + + super().__init__(recording=recording, + *args, **kwargs) + + +class SoundRecording(Recording): + RECORDER_PROGRAM = 'arecord' + + def get_command(self, output: str) -> str: + arecord = config['arecord']['bin'] + lame = config['lame']['bin'] + b = config['lame']['bitrate'] + + return f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null' + + +class ESP32CameraRecording(Recording): + RECORDER_PROGRAM = 'esp32_capture.py' + + stream_addr: Addr + + def __init__(self, stream_addr: Addr): + super().__init__() + self.stream_addr = stream_addr + + def get_command(self, output: str) -> str: + bin = config['esp32_capture']['bin'] + return f'{bin} --addr {self.stream_addr[0]}:{self.stream_addr[1]} --output-directory {output} >/dev/null 2>/dev/null' \ No newline at end of file diff --git a/src/home/media/record_client.py b/src/home/media/record_client.py new file mode 100644 index 0000000..f264155 --- /dev/null +++ b/src/home/media/record_client.py @@ -0,0 +1,166 @@ +import time +import logging +import threading +import os.path + +from tempfile import gettempdir +from .record import RecordStatus +from .node_client import SoundNodeClient, MediaNodeClient, CameraNodeClient +from ..util import Addr +from typing import Optional, Callable + + +class RecordClient: + DOWNLOAD_EXTENSION = None + + interrupted: bool + logger: logging.Logger + clients: dict[str, MediaNodeClient] + awaiting: dict[str, dict[int, Optional[dict]]] + error_handler: Optional[Callable] + finished_handler: Optional[Callable] + download_on_finish: bool + + def __init__(self, + nodes: dict[str, Addr], + error_handler: Optional[Callable] = None, + finished_handler: Optional[Callable] = None, + download_on_finish=False): + if self.DOWNLOAD_EXTENSION is None: + raise RuntimeError('this is abstract class') + + self.interrupted = False + self.logger = logging.getLogger(self.__class__.__name__) + self.clients = {} + self.awaiting = {} + + self.download_on_finish = download_on_finish + self.error_handler = error_handler + self.finished_handler = finished_handler + + self.awaiting_lock = threading.Lock() + + self.make_clients(nodes) + + try: + t = threading.Thread(target=self.loop) + t.daemon = True + t.start() + except (KeyboardInterrupt, SystemExit) as exc: + self.stop() + self.logger.exception(exc) + + def make_clients(self, nodes: dict[str, Addr]): + pass + + def stop(self): + self.interrupted = True + + def loop(self): + while not self.interrupted: + for node in self.awaiting.keys(): + with self.awaiting_lock: + record_ids = list(self.awaiting[node].keys()) + if not record_ids: + continue + + self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}') + + cl = self.getclient(node) + del_ids = [] + for rid in record_ids: + info = cl.record_info(rid) + + if info['relations']: + for relid in info['relations']: + self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True) + + status = RecordStatus(info['status']) + if status in (RecordStatus.FINISHED, RecordStatus.ERROR): + if status == RecordStatus.FINISHED: + if self.download_on_finish: + local_fn = self.download(node, rid, info['file']['fileid']) + else: + local_fn = None + self._report_finished(info, local_fn, self.awaiting[node][rid]) + else: + self._report_error(info, self.awaiting[node][rid]) + del_ids.append(rid) + self.logger.debug(f'record {rid}: status {status}') + + if del_ids: + self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list') + with self.awaiting_lock: + for del_id in del_ids: + del self.awaiting[node][del_id] + + time.sleep(5) + + self.logger.info('loop ended') + + def getclient(self, node: str): + return self.clients[node] + + def record(self, + node: str, + duration: int, + userdata: Optional[dict] = None) -> int: + self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}') + + cl = self.getclient(node) + record_id = cl.record(duration)['id'] + self.logger.debug(f'record: request sent, record_id={record_id}') + + self.wait_for_record(node, record_id, userdata) + return record_id + + def wait_for_record(self, + node: str, + record_id: int, + userdata: Optional[dict] = None, + is_relative=False): + with self.awaiting_lock: + if record_id not in self.awaiting[node]: + msg = f'wait_for_record: adding {record_id} to {node}' + if is_relative: + msg += ' (by relation)' + self.logger.debug(msg) + + self.awaiting[node][record_id] = userdata + + def download(self, node: str, record_id: int, fileid: str): + dst = os.path.join(gettempdir(), f'{node}_{fileid}.{self.DOWNLOAD_EXTENSION}') + cl = self.getclient(node) + cl.record_download(record_id, dst) + return dst + + def forget(self, node: str, rid: int): + self.getclient(node).record_forget(rid) + + def _report_finished(self, *args): + if self.finished_handler: + self.finished_handler(*args) + + def _report_error(self, *args): + if self.error_handler: + self.error_handler(*args) + + +class SoundRecordClient(RecordClient): + DOWNLOAD_EXTENSION = 'mp3' + # clients: dict[str, SoundNodeClient] + + def make_clients(self, nodes: dict[str, Addr]): + for node, addr in nodes.items(): + self.clients[node] = SoundNodeClient(addr) + self.awaiting[node] = {} + + +class CameraRecordClient(RecordClient): + DOWNLOAD_EXTENSION = 'mp4' + # clients: dict[str, CameraNodeClient] + + def make_clients(self, nodes: dict[str, Addr]): + for node, addr in nodes.items(): + self.clients[node] = CameraNodeClient(addr) + self.awaiting[node] = {} \ No newline at end of file diff --git a/src/home/media/storage.py b/src/home/media/storage.py new file mode 100644 index 0000000..880b899 --- /dev/null +++ b/src/home/media/storage.py @@ -0,0 +1,197 @@ +import os +import re +import shutil +import logging + +from typing import Optional, Union +from datetime import datetime +from ..util import strgen + +logger = logging.getLogger(__name__) + + +# record file +# ----------- + +class RecordFile: + EXTENSION = None + + start_time: Optional[datetime] + stop_time: Optional[datetime] + record_id: Optional[int] + name: str + file_id: Optional[str] + remote: bool + remote_filesize: int + storage_root: str + + human_date_dmt = '%d.%m.%y' + human_time_fmt = '%H:%M:%S' + + @staticmethod + def create(filename: str, *args, **kwargs): + if filename.endswith(f'.{SoundRecordFile.EXTENSION}'): + return SoundRecordFile(filename, *args, **kwargs) + elif filename.endswith(f'.{CameraRecordFile.EXTENSION}'): + return CameraRecordFile(filename, *args, **kwargs) + else: + raise RuntimeError(f'unsupported file extension: {filename}') + + def __init__(self, filename: str, remote=False, remote_filesize=None, storage_root='/'): + if self.EXTENSION is None: + raise RuntimeError('this is abstract class') + + self.name = filename + self.storage_root = storage_root + + self.remote = remote + self.remote_filesize = remote_filesize + + m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.'+self.EXTENSION+'$', filename) + if m: + self.start_time = datetime.strptime(m.group(1), RecordStorage.time_fmt) + self.stop_time = datetime.strptime(m.group(2), RecordStorage.time_fmt) + self.record_id = int(m.group(3)) + self.file_id = (m.group(1) + '_' + m.group(2)).replace('-', '_') + else: + logger.warning(f'unexpected filename: {filename}') + self.start_time = None + self.stop_time = None + self.record_id = None + self.file_id = None + + @property + def path(self): + if self.remote: + return RuntimeError('remote recording, can\'t get real path') + + return os.path.realpath(os.path.join( + self.storage_root, self.name + )) + + @property + def start_humantime(self) -> str: + if self.start_time is None: + return '?' + fmt = f'{RecordFile.human_date_dmt} {RecordFile.human_time_fmt}' + return self.start_time.strftime(fmt) + + @property + def stop_humantime(self) -> str: + if self.stop_time is None: + return '?' + fmt = RecordFile.human_time_fmt + if self.start_time.date() != self.stop_time.date(): + fmt = f'{RecordFile.human_date_dmt} {fmt}' + return self.stop_time.strftime(fmt) + + @property + def start_unixtime(self) -> int: + if self.start_time is None: + return 0 + return int(self.start_time.timestamp()) + + @property + def stop_unixtime(self) -> int: + if self.stop_time is None: + return 0 + return int(self.stop_time.timestamp()) + + @property + def filesize(self): + if self.remote: + if self.remote_filesize is None: + raise RuntimeError('file is remote and remote_filesize is not set') + return self.remote_filesize + return os.path.getsize(self.path) + + def __dict__(self) -> dict: + return { + 'start_unixtime': self.start_unixtime, + 'stop_unixtime': self.stop_unixtime, + 'filename': self.name, + 'filesize': self.filesize, + 'fileid': self.file_id, + 'record_id': self.record_id or 0, + } + + +class SoundRecordFile(RecordFile): + EXTENSION = 'mp3' + + +class CameraRecordFile(RecordFile): + EXTENSION = 'mp4' + + +# record storage +# -------------- + +class RecordStorage: + EXTENSION = None + + time_fmt = '%d%m%y-%H%M%S' + + def __init__(self, root: str): + if self.EXTENSION is None: + raise RuntimeError('this is abstract class') + + self.root = root + + def getfiles(self, as_objects=False) -> Union[list[str], list[RecordFile]]: + files = [] + for name in os.listdir(self.root): + path = os.path.join(self.root, name) + if os.path.isfile(path) and name.endswith(f'.{self.EXTENSION}'): + files.append(name if not as_objects else RecordFile.create(name, storage_root=self.root)) + return files + + def find(self, file_id: str) -> Optional[RecordFile]: + for name in os.listdir(self.root): + if os.path.isfile(os.path.join(self.root, name)) and name.endswith(f'.{self.EXTENSION}'): + item = RecordFile.create(name, storage_root=self.root) + if item.file_id == file_id: + return item + return None + + def purge(self): + files = self.getfiles() + if files: + logger = logging.getLogger(self.__name__) + for f in files: + try: + path = os.path.join(self.root, f) + logger.debug(f'purge: deleting {path}') + os.unlink(path) + except OSError as exc: + logger.exception(exc) + + def delete(self, file: RecordFile): + os.unlink(file.path) + + def save(self, + fn: str, + record_id: int, + start_time: int, + stop_time: int) -> RecordFile: + + start_time_s = datetime.fromtimestamp(start_time).strftime(self.time_fmt) + stop_time_s = datetime.fromtimestamp(stop_time).strftime(self.time_fmt) + + dst_fn = f'{start_time_s}_{stop_time_s}_id{record_id}' + if os.path.exists(os.path.join(self.root, dst_fn)): + dst_fn += strgen(4) + dst_fn += f'.{self.EXTENSION}' + dst_path = os.path.join(self.root, dst_fn) + + shutil.move(fn, dst_path) + return RecordFile.create(dst_fn, storage_root=self.root) + + +class SoundRecordStorage(RecordStorage): + EXTENSION = 'mp3' + + +class CameraRecordStorage(RecordStorage): + EXTENSION = 'mp4' + diff --git a/src/home/media/types.py b/src/home/media/types.py new file mode 100644 index 0000000..acbc291 --- /dev/null +++ b/src/home/media/types.py @@ -0,0 +1,13 @@ +from enum import Enum, auto + + +class MediaNodeType(Enum): + SOUND = auto() + CAMERA = auto() + + +class RecordStatus(Enum): + WAITING = auto() + RECORDING = auto() + FINISHED = auto() + ERROR = auto() diff --git a/src/home/sound/__init__.py b/src/home/sound/__init__.py deleted file mode 100644 index 43ddaff..0000000 --- a/src/home/sound/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -from .node_client import SoundNodeClient -from .record import ( - RecordStatus, - RecordingNotFoundError, - Recorder, -) -from .storage import RecordStorage, RecordFile -from .record_client import RecordClient diff --git a/src/home/sound/amixer.py b/src/home/sound/amixer.py deleted file mode 100644 index 0ab2c64..0000000 --- a/src/home/sound/amixer.py +++ /dev/null @@ -1,91 +0,0 @@ -import subprocess - -from ..config import config -from threading import Lock -from typing import Union - - -_lock = Lock() -_default_step = 5 - - -def has_control(s: str) -> bool: - for control in config['amixer']['controls']: - if control['name'] == s: - return True - return False - - -def get_caps(s: str) -> list[str]: - for control in config['amixer']['controls']: - if control['name'] == s: - return control['caps'] - raise KeyError(f'control {s} not found') - - -def get_all() -> list: - controls = [] - for control in config['amixer']['controls']: - controls.append({ - 'name': control['name'], - 'info': get(control['name']), - 'caps': control['caps'] - }) - return controls - - -def get(control: str): - return call('get', control) - - -def mute(control): - return call('set', control, 'mute') - - -def unmute(control): - return call('set', control, 'unmute') - - -def cap(control): - return call('set', control, 'cap') - - -def nocap(control): - return call('set', control, 'nocap') - - -def _get_default_step() -> int: - if 'step' in config['amixer']: - return int(config['amixer']['step']) - - return _default_step - - -def incr(control, step=None): - if step is None: - step = _get_default_step() - return call('set', control, f'{step}%+') - - -def decr(control, step=None): - if step is None: - step = _get_default_step() - return call('set', control, f'{step}%-') - - -def call(*args, return_code=False) -> Union[int, str]: - with _lock: - result = subprocess.run([config['amixer']['bin'], *args], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - if return_code: - return result.returncode - - if result.returncode != 0: - raise AmixerError(result.stderr.decode().strip()) - - return result.stdout.decode().strip() - - -class AmixerError(OSError): - pass diff --git a/src/home/sound/node_client.py b/src/home/sound/node_client.py deleted file mode 100644 index 7341208..0000000 --- a/src/home/sound/node_client.py +++ /dev/null @@ -1,109 +0,0 @@ -import requests -import logging -import shutil - -from ..util import Addr -from ..api.errors import ApiResponseError -from typing import Optional, Union -from .record import RecordFile - - -class SoundNodeClient: - def __init__(self, addr: Addr): - self.endpoint = f'http://{addr[0]}:{addr[1]}' - self.logger = logging.getLogger(self.__class__.__name__) - - def amixer_get_all(self): - return self._call('amixer/get-all/') - - def amixer_get(self, control: str): - return self._call(f'amixer/get/{control}/') - - def amixer_incr(self, control: str, step: Optional[int] = None): - params = {'step': step} if step is not None else None - return self._call(f'amixer/incr/{control}/', params=params) - - def amixer_decr(self, control: str, step: Optional[int] = None): - params = {'step': step} if step is not None else None - return self._call(f'amixer/decr/{control}/', params=params) - - def amixer_mute(self, control: str): - return self._call(f'amixer/mute/{control}/') - - def amixer_unmute(self, control: str): - return self._call(f'amixer/unmute/{control}/') - - def amixer_cap(self, control: str): - return self._call(f'amixer/cap/{control}/') - - def amixer_nocap(self, control: str): - return self._call(f'amixer/nocap/{control}/') - - def record(self, duration: int): - return self._call('record/', params={"duration": duration}) - - def record_info(self, record_id: int): - return self._call(f'record/info/{record_id}/') - - def record_forget(self, record_id: int): - return self._call(f'record/forget/{record_id}/') - - def record_download(self, record_id: int, output: str): - return self._call(f'record/download/{record_id}/', save_to=output) - - def storage_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]: - r = self._call('storage/list/', params={'extended': int(extended)}) - files = r['files'] - if as_objects: - return self.record_list_from_serialized(files) - return files - - @staticmethod - def record_list_from_serialized(files: Union[list[str], list[dict]]): - new_files = [] - for f in files: - kwargs = {'remote': True} - if isinstance(f, dict): - name = f['filename'] - kwargs['remote_filesize'] = f['filesize'] - else: - name = f - item = RecordFile(name, **kwargs) - new_files.append(item) - return new_files - - def storage_delete(self, file_id: str): - return self._call('storage/delete/', params={'file_id': file_id}) - - def storage_download(self, file_id: str, output: str): - return self._call('storage/download/', params={'file_id': file_id}, save_to=output) - - def _call(self, - method: str, - params: dict = None, - save_to: Optional[str] = None): - - kwargs = {} - if isinstance(params, dict): - kwargs['params'] = params - if save_to: - kwargs['stream'] = True - - url = f'{self.endpoint}/{method}' - self.logger.debug(f'calling {url}, kwargs: {kwargs}') - - r = requests.get(url, **kwargs) - if r.status_code != 200: - response = r.json() - raise ApiResponseError(status_code=r.status_code, - error_type=response['error'], - error_message=response['message'] or None, - error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None) - - if save_to: - r.raise_for_status() - with open(save_to, 'wb') as f: - shutil.copyfileobj(r.raw, f) - return True - - return r.json()['response'] diff --git a/src/home/sound/record.py b/src/home/sound/record.py deleted file mode 100644 index 1ad8827..0000000 --- a/src/home/sound/record.py +++ /dev/null @@ -1,400 +0,0 @@ -import threading -import time -import subprocess -import signal -import os -import logging - -from enum import Enum, auto -from typing import Optional -from ..config import config -from ..util import find_child_processes -from .storage import RecordFile, RecordStorage - - -_history_item_timeout = 7200 -_history_cleanup_freq = 3600 - - -class RecordStatus(Enum): - WAITING = auto() - RECORDING = auto() - FINISHED = auto() - ERROR = auto() - - -class RecordHistoryItem: - id: int - request_time: float - start_time: float - stop_time: float - relations: list[int] - status: RecordStatus - error: Optional[Exception] - file: Optional[RecordFile] - creation_time: float - - def __init__(self, id): - self.id = id - self.request_time = 0 - self.start_time = 0 - self.stop_time = 0 - self.relations = [] - self.status = RecordStatus.WAITING - self.file = None - self.error = None - self.creation_time = time.time() - - def add_relation(self, related_id: int): - self.relations.append(related_id) - - def mark_started(self, start_time: float): - self.start_time = start_time - self.status = RecordStatus.RECORDING - - def mark_finished(self, end_time: float, file: RecordFile): - self.stop_time = end_time - self.file = file - self.status = RecordStatus.FINISHED - - def mark_failed(self, error: Exception): - self.status = RecordStatus.ERROR - self.error = error - - def as_dict(self) -> dict: - data = { - 'id': self.id, - 'request_time': self.request_time, - 'status': self.status.value, - 'relations': self.relations, - 'start_time': self.start_time, - 'stop_time': self.stop_time, - } - if self.error: - data['error'] = str(self.error) - if self.file: - data['file'] = self.file.__dict__() - return data - - -class RecordingNotFoundError(Exception): - pass - - -class RecordHistory: - history: dict[int, RecordHistoryItem] - - def __init__(self): - self.history = {} - self.logger = logging.getLogger(self.__class__.__name__) - - def add(self, record_id: int): - self.logger.debug(f'add: record_id={record_id}') - - r = RecordHistoryItem(record_id) - r.request_time = time.time() - - self.history[record_id] = r - - def delete(self, record_id: int): - self.logger.debug(f'delete: record_id={record_id}') - del self.history[record_id] - - def cleanup(self): - del_ids = [] - for rid, item in self.history.items(): - if item.creation_time < time.time()-_history_item_timeout: - del_ids.append(rid) - for rid in del_ids: - self.delete(rid) - - def __getitem__(self, key): - if key not in self.history: - raise RecordingNotFoundError() - - return self.history[key] - - def __setitem__(self, key, value): - raise NotImplementedError('setting history item this way is prohibited') - - def __contains__(self, key): - return key in self.history - - -class Recording: - start_time: float - stop_time: float - duration: int - record_id: int - arecord_pid: Optional[int] - process: Optional[subprocess.Popen] - - g_record_id = 1 - - def __init__(self): - self.start_time = 0 - self.stop_time = 0 - self.duration = 0 - self.process = None - self.arecord_pid = None - self.record_id = Recording.next_id() - self.logger = logging.getLogger(self.__class__.__name__) - - def is_started(self) -> bool: - return self.start_time > 0 and self.stop_time > 0 - - def is_waiting(self): - return self.duration > 0 - - def ask_for(self, duration) -> int: - overtime = 0 - orig_duration = duration - - if self.is_started(): - already_passed = time.time() - self.start_time - max_duration = Recorder.get_max_record_time() - already_passed - self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}') - else: - max_duration = Recorder.get_max_record_time() - - if duration > max_duration: - overtime = duration - max_duration - duration = max_duration - - self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}') - - self.duration += duration - if self.is_started(): - til_end = self.stop_time - time.time() - if til_end < 0: - til_end = 0 - - _prev_stop_time = self.stop_time - _to_add = duration - til_end - if _to_add < 0: - _to_add = 0 - - self.stop_time += _to_add - self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})') - - return overtime - - def start(self, output: str): - assert self.start_time == 0 and self.stop_time == 0, "already started?!" - assert self.process is None, "self.process is not None, what the hell?" - - cur = time.time() - self.start_time = cur - self.stop_time = cur + self.duration - - arecord = config['arecord']['bin'] - lame = config['lame']['bin'] - b = config['lame']['bitrate'] - - cmd = f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null' - self.logger.debug(f'start: running `{cmd}`') - self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True) - - sh_pid = self.process.pid - self.logger.debug(f'start: started, pid of shell is {sh_pid}') - - arecord_pid = self.find_arecord_pid(sh_pid) - if arecord_pid is not None: - self.arecord_pid = arecord_pid - self.logger.debug(f'start: pid of arecord is {arecord_pid}') - - def stop(self): - if self.process: - if self.arecord_pid is None: - self.arecord_pid = self.find_arecord_pid(self.process.pid) - - if self.arecord_pid is not None: - os.kill(self.arecord_pid, signal.SIGINT) - timeout = config['node']['process_wait_timeout'] - - self.logger.debug(f'stop: sent SIGINT to {self.arecord_pid}. now waiting up to {timeout} seconds...') - try: - self.process.wait(timeout=timeout) - except subprocess.TimeoutExpired: - self.logger.warning(f'stop: wait({timeout}): timeout expired, calling terminate()') - self.process.terminate() - else: - self.logger.warning('stop: pid of arecord is unknown, calling terminate()') - self.process.terminate() - - rc = self.process.returncode - self.logger.debug(f'stop: rc={rc}') - - self.process = None - self.arecord_pid = 0 - - self.duration = 0 - self.start_time = 0 - self.stop_time = 0 - - def find_arecord_pid(self, sh_pid: int): - try: - children = find_child_processes(sh_pid) - except OSError as exc: - self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc)) - return None - - for child in children: - if 'arecord' in child.cmd: - return child.pid - - return None - - @staticmethod - def next_id() -> int: - cur_id = Recording.g_record_id - Recording.g_record_id += 1 - return cur_id - - def increment_id(self): - self.record_id = Recording.next_id() - - -class Recorder: - interrupted: bool - lock: threading.Lock - history_lock: threading.Lock - recording: Optional[Recording] - overtime: int - history: RecordHistory - next_history_cleanup_time: float - storage: RecordStorage - - def __init__(self, storage: RecordStorage): - self.storage = storage - self.recording = Recording() - self.interrupted = False - self.lock = threading.Lock() - self.history_lock = threading.Lock() - self.overtime = 0 - self.history = RecordHistory() - self.next_history_cleanup_time = 0 - self.logger = logging.getLogger(self.__class__.__name__) - - def start_thread(self): - t = threading.Thread(target=self.loop) - t.daemon = True - t.start() - - def loop(self) -> None: - tempname = os.path.join(self.storage.root, 'temp.mp3') - - while not self.interrupted: - cur = time.time() - stopped = False - cur_record_id = None - - if self.next_history_cleanup_time == 0: - self.next_history_cleanup_time = time.time() + _history_cleanup_freq - elif self.next_history_cleanup_time <= time.time(): - self.logger.debug('loop: calling history.cleanup()') - try: - self.history.cleanup() - except Exception as e: - self.logger.error('loop: error while history.cleanup(): ' + str(e)) - self.next_history_cleanup_time = time.time() + _history_cleanup_freq - - with self.lock: - cur_record_id = self.recording.record_id - # self.logger.debug(f'cur_record_id={cur_record_id}') - - if not self.recording.is_started(): - if self.recording.is_waiting(): - try: - if os.path.exists(tempname): - self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..') - try: - os.unlink(tempname) - except OSError as e: - self.logger.exception(e) - self.recording.start(tempname) - with self.history_lock: - self.history[cur_record_id].mark_started(self.recording.start_time) - except Exception as exc: - self.logger.exception(exc) - - # there should not be any errors, but still.. - try: - self.recording.stop() - except Exception as exc: - self.logger.exception(exc) - - with self.history_lock: - self.history[cur_record_id].mark_failed(exc) - - self.logger.debug(f'loop: start exc path: calling increment_id()') - self.recording.increment_id() - else: - if cur >= self.recording.stop_time: - try: - start_time = self.recording.start_time - stop_time = self.recording.stop_time - self.recording.stop() - - saved_name = self.storage.save(tempname, - record_id=cur_record_id, - start_time=int(start_time), - stop_time=int(stop_time)) - - with self.history_lock: - self.history[cur_record_id].mark_finished(stop_time, saved_name) - except Exception as exc: - self.logger.exception(exc) - with self.history_lock: - self.history[cur_record_id].mark_failed(exc) - finally: - self.logger.debug(f'loop: stop exc final path: calling increment_id()') - self.recording.increment_id() - - stopped = True - - if stopped and self.overtime > 0: - self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})') - _overtime = self.overtime - self.overtime = 0 - - related_id = self.record(_overtime) - self.logger.info(f'enqueued another record with id {related_id}') - - if cur_record_id is not None: - with self.history_lock: - self.history[cur_record_id].add_relation(related_id) - - time.sleep(0.2) - - def record(self, duration: int) -> int: - self.logger.debug(f'record: duration={duration}') - with self.lock: - overtime = self.recording.ask_for(duration) - self.logger.debug(f'overtime={overtime}') - - if overtime > self.overtime: - self.overtime = overtime - - if not self.recording.is_started(): - with self.history_lock: - self.history.add(self.recording.record_id) - - return self.recording.record_id - - def stop(self): - self.interrupted = True - - def get_info(self, record_id: int) -> RecordHistoryItem: - with self.history_lock: - return self.history[record_id] - - def forget(self, record_id: int): - with self.history_lock: - self.logger.info(f'forget: removing record {record_id} from history') - self.history.delete(record_id) - - @staticmethod - def get_max_record_time() -> int: - return config['node']['record_max_time'] - diff --git a/src/home/sound/record_client.py b/src/home/sound/record_client.py deleted file mode 100644 index 2744a8c..0000000 --- a/src/home/sound/record_client.py +++ /dev/null @@ -1,142 +0,0 @@ -import time -import logging -import threading -import os.path - -from tempfile import gettempdir -from .record import RecordStatus -from .node_client import SoundNodeClient -from ..util import Addr -from typing import Optional, Callable - - -class RecordClient: - interrupted: bool - logger: logging.Logger - clients: dict[str, SoundNodeClient] - awaiting: dict[str, dict[int, Optional[dict]]] - error_handler: Optional[Callable] - finished_handler: Optional[Callable] - download_on_finish: bool - - def __init__(self, - nodes: dict[str, Addr], - error_handler: Optional[Callable] = None, - finished_handler: Optional[Callable] = None, - download_on_finish=False): - self.interrupted = False - self.logger = logging.getLogger(self.__class__.__name__) - self.clients = {} - self.awaiting = {} - self.download_on_finish = download_on_finish - - self.error_handler = error_handler - self.finished_handler = finished_handler - - self.awaiting_lock = threading.Lock() - - for node, addr in nodes.items(): - self.clients[node] = SoundNodeClient(addr) - self.awaiting[node] = {} - - try: - t = threading.Thread(target=self.loop) - t.daemon = True - t.start() - except (KeyboardInterrupt, SystemExit) as exc: - self.stop() - self.logger.exception(exc) - - def stop(self): - self.interrupted = True - - def loop(self): - while not self.interrupted: - # self.logger.debug('loop: tick') - - for node in self.awaiting.keys(): - with self.awaiting_lock: - record_ids = list(self.awaiting[node].keys()) - if not record_ids: - continue - - self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}') - - cl = self.getclient(node) - del_ids = [] - for rid in record_ids: - info = cl.record_info(rid) - - if info['relations']: - for relid in info['relations']: - self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True) - - status = RecordStatus(info['status']) - if status in (RecordStatus.FINISHED, RecordStatus.ERROR): - if status == RecordStatus.FINISHED: - if self.download_on_finish: - local_fn = self.download(node, rid, info['file']['fileid']) - else: - local_fn = None - self._report_finished(info, local_fn, self.awaiting[node][rid]) - else: - self._report_error(info, self.awaiting[node][rid]) - del_ids.append(rid) - self.logger.debug(f'record {rid}: status {status}') - - if del_ids: - self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list') - with self.awaiting_lock: - for del_id in del_ids: - del self.awaiting[node][del_id] - - time.sleep(5) - - self.logger.info('loop ended') - - def getclient(self, node: str): - return self.clients[node] - - def record(self, - node: str, - duration: int, - userdata: Optional[dict] = None) -> int: - self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}') - - cl = self.getclient(node) - record_id = cl.record(duration)['id'] - self.logger.debug(f'record: request sent, record_id={record_id}') - - self.wait_for_record(node, record_id, userdata) - return record_id - - def wait_for_record(self, - node: str, - record_id: int, - userdata: Optional[dict] = None, - is_relative=False): - with self.awaiting_lock: - if record_id not in self.awaiting[node]: - msg = f'wait_for_record: adding {record_id} to {node}' - if is_relative: - msg += ' (by relation)' - self.logger.debug(msg) - - self.awaiting[node][record_id] = userdata - - def download(self, node: str, record_id: int, fileid: str): - dst = os.path.join(gettempdir(), f'{node}_{fileid}.mp3') - cl = self.getclient(node) - cl.record_download(record_id, dst) - return dst - - def forget(self, node: str, rid: int): - self.getclient(node).record_forget(rid) - - def _report_finished(self, *args): - if self.finished_handler: - self.finished_handler(*args) - - def _report_error(self, *args): - if self.error_handler: - self.error_handler(*args) diff --git a/src/home/sound/storage.py b/src/home/sound/storage.py deleted file mode 100644 index c61f6f6..0000000 --- a/src/home/sound/storage.py +++ /dev/null @@ -1,155 +0,0 @@ -import os -import re -import shutil -import logging - -from typing import Optional, Union -from datetime import datetime -from ..util import strgen - -logger = logging.getLogger(__name__) - - -class RecordFile: - start_time: Optional[datetime] - stop_time: Optional[datetime] - record_id: Optional[int] - name: str - file_id: Optional[str] - remote: bool - remote_filesize: int - storage_root: str - - human_date_dmt = '%d.%m.%y' - human_time_fmt = '%H:%M:%S' - - def __init__(self, filename: str, remote=False, remote_filesize=None, storage_root='/'): - self.name = filename - self.storage_root = storage_root - - self.remote = remote - self.remote_filesize = remote_filesize - - m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.mp3$', filename) - if m: - self.start_time = datetime.strptime(m.group(1), RecordStorage.time_fmt) - self.stop_time = datetime.strptime(m.group(2), RecordStorage.time_fmt) - self.record_id = int(m.group(3)) - self.file_id = (m.group(1) + '_' + m.group(2)).replace('-', '_') - else: - logger.warning(f'unexpected filename: {filename}') - self.start_time = None - self.stop_time = None - self.record_id = None - self.file_id = None - - @property - def path(self): - if self.remote: - return RuntimeError('remote recording, can\'t get real path') - - return os.path.realpath(os.path.join( - self.storage_root, self.name - )) - - @property - def start_humantime(self) -> str: - if self.start_time is None: - return '?' - fmt = f'{RecordFile.human_date_dmt} {RecordFile.human_time_fmt}' - return self.start_time.strftime(fmt) - - @property - def stop_humantime(self) -> str: - if self.stop_time is None: - return '?' - fmt = RecordFile.human_time_fmt - if self.start_time.date() != self.stop_time.date(): - fmt = f'{RecordFile.human_date_dmt} {fmt}' - return self.stop_time.strftime(fmt) - - @property - def start_unixtime(self) -> int: - if self.start_time is None: - return 0 - return int(self.start_time.timestamp()) - - @property - def stop_unixtime(self) -> int: - if self.stop_time is None: - return 0 - return int(self.stop_time.timestamp()) - - @property - def filesize(self): - if self.remote: - if self.remote_filesize is None: - raise RuntimeError('file is remote and remote_filesize is not set') - return self.remote_filesize - return os.path.getsize(self.path) - - def __dict__(self) -> dict: - return { - 'start_unixtime': self.start_unixtime, - 'stop_unixtime': self.stop_unixtime, - 'filename': self.name, - 'filesize': self.filesize, - 'fileid': self.file_id, - 'record_id': self.record_id or 0, - } - - -class RecordStorage: - time_fmt = '%d%m%y-%H%M%S' - - def __init__(self, root: str): - self.root = root - - def getfiles(self, as_objects=False) -> Union[list[str], list[RecordFile]]: - files = [] - for name in os.listdir(self.root): - path = os.path.join(self.root, name) - if os.path.isfile(path) and name.endswith('.mp3'): - files.append(name if not as_objects else RecordFile(name, storage_root=self.root)) - return files - - def find(self, file_id: str) -> Optional[RecordFile]: - for name in os.listdir(self.root): - if os.path.isfile(os.path.join(self.root, name)) and name.endswith('.mp3'): - item = RecordFile(name, storage_root=self.root) - if item.file_id == file_id: - return item - return None - - def purge(self): - files = self.getfiles() - if files: - logger = logging.getLogger(self.__name__) - for f in files: - try: - path = os.path.join(self.root, f) - logger.debug(f'purge: deleting {path}') - os.unlink(path) - except OSError as exc: - logger.exception(exc) - - def delete(self, file: RecordFile): - os.unlink(file.path) - - def save(self, - fn: str, - record_id: int, - start_time: int, - stop_time: int) -> RecordFile: - - start_time_s = datetime.fromtimestamp(start_time).strftime(self.time_fmt) - stop_time_s = datetime.fromtimestamp(stop_time).strftime(self.time_fmt) - - dst_fn = f'{start_time_s}_{stop_time_s}_id{record_id}' - if os.path.exists(os.path.join(self.root, dst_fn)): - dst_fn += strgen(4) - dst_fn += '.mp3' - dst_path = os.path.join(self.root, dst_fn) - - shutil.move(fn, dst_path) - return RecordFile(dst_fn, storage_root=self.root) diff --git a/src/home/web_api/web_api.py b/src/home/web_api/web_api.py index c75c031..6b8c54e 100644 --- a/src/home/web_api/web_api.py +++ b/src/home/web_api/web_api.py @@ -12,7 +12,7 @@ from ..config import config, is_development_mode from ..database import BotsDatabase, SensorsDatabase from ..util import stringify, format_tb from ..api.types import BotType, TemperatureSensorLocation, SoundSensorLocation -from ..sound import RecordStorage +from ..media import SoundRecordStorage db: Optional[BotsDatabase] = None sensors_db: Optional[SensorsDatabase] = None @@ -136,7 +136,7 @@ def recordings_list(): if not os.path.isdir(root): raise ValueError(f'invalid node {node}: no such directory') - storage = RecordStorage(root) + storage = SoundRecordStorage(root) files = storage.getfiles(as_objects=extended) if extended: files = list(map(lambda file: file.__dict__(), files)) diff --git a/src/openwrt_log_analyzer.py b/src/openwrt_log_analyzer.py old mode 100644 new mode 100755 diff --git a/src/sound_bot.py b/src/sound_bot.py index 70d89a8..2503893 100755 --- a/src/sound_bot.py +++ b/src/sound_bot.py @@ -8,16 +8,15 @@ from enum import Enum from datetime import datetime, timedelta from html import escape from typing import Optional + from home.config import config from home.bot import Wrapper, Context, text_filter, user_any_name -from home.api.types import BotType +from home.api import WebAPIClient +from home.api.types import SoundSensorLocation, BotType from home.api.errors import ApiResponseError -from home.sound import SoundNodeClient, RecordClient, RecordFile +from home.media import SoundNodeClient, SoundRecordClient, SoundRecordFile, CameraNodeClient from home.soundsensor import SoundSensorServerGuardClient -from home.camera import esp32 from home.util import parse_addr, chunks, filesize_fmt -from home.api import WebAPIClient -from home.api.types import SoundSensorLocation from telegram.error import TelegramError from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton, User @@ -30,10 +29,10 @@ from PIL import Image logger = logging.getLogger(__name__) RenderedContent = tuple[str, Optional[InlineKeyboardMarkup]] -record_client: Optional[RecordClient] = None +record_client: Optional[SoundRecordClient] = None bot: Optional[Wrapper] = None node_client_links: dict[str, SoundNodeClient] = {} -cam_client_links: dict[str, esp32.WebClient] = {} +cam_client_links: dict[str, CameraNodeClient] = {} def node_client(node: str) -> SoundNodeClient: @@ -42,9 +41,9 @@ def node_client(node: str) -> SoundNodeClient: return node_client_links[node] -def camera_client(cam: str) -> esp32.WebClient: +def camera_client(cam: str) -> CameraNodeClient: if cam not in node_client_links: - cam_client_links[cam] = esp32.WebClient(parse_addr(config['cameras'][cam]['addr'])) + cam_client_links[cam] = CameraNodeClient(parse_addr(config['cameras'][cam]['addr'])) return cam_client_links[cam] @@ -243,7 +242,7 @@ class FilesRenderer(Renderer): return html, cls.places_markup(ctx, callback_prefix='f0') @classmethod - def filelist(cls, ctx: Context, files: list[RecordFile]) -> RenderedContent: + def filelist(cls, ctx: Context, files: list[SoundRecordFile]) -> RenderedContent: node, = callback_unpack(ctx) html_files = map(lambda file: cls.file(ctx, file, node), files) @@ -255,7 +254,7 @@ class FilesRenderer(Renderer): return html, InlineKeyboardMarkup(buttons) @classmethod - def file(cls, ctx: Context, file: RecordFile, node: str) -> str: + def file(cls, ctx: Context, file: SoundRecordFile, node: str) -> str: html = ctx.lang('file_line', file.start_humantime, file.stop_humantime, filesize_fmt(file.filesize)) if file.file_id is not None: html += f'/audio_{node}_{file.file_id}' @@ -437,23 +436,12 @@ def camera_capture(ctx: Context) -> None: ctx.answer() client = camera_client(cam) - if client.syncsettings(camera_settings(cam)) is True: - logger.debug('some settings were changed, sleeping for 0.4 sec') - time.sleep(0.4) - - client.setflash(True if flash else False) - time.sleep(0.2) - fd = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') fd.close() client.capture(fd.name) logger.debug(f'captured photo ({cam}), saved to {fd.name}') - # disable flash led - if flash: - client.setflash(False) - camera_config = config['cameras'][cam] if 'rotate' in camera_config: im = Image.open(fd.name) @@ -972,10 +960,10 @@ if __name__ == '__main__': for nodename, nodecfg in config['nodes'].items(): nodes[nodename] = parse_addr(nodecfg['addr']) - record_client = RecordClient(nodes, - error_handler=record_onerror, - finished_handler=record_onfinished, - download_on_finish=True) + record_client = SoundRecordClient(nodes, + error_handler=record_onerror, + finished_handler=record_onfinished, + download_on_finish=True) bot = SoundBot() if 'api' in config: diff --git a/src/sound_node.py b/src/sound_node.py index a96a098..7d8ba1a 100755 --- a/src/sound_node.py +++ b/src/sound_node.py @@ -3,110 +3,16 @@ import os from typing import Optional -from home.config import config from home.util import parse_addr -from home.sound import ( - amixer, - Recorder, - RecordStatus, - RecordStorage -) +from home.config import config +from home.audio import amixer +from home.media import MediaNodeServer, SoundRecordStorage, SoundRecorder from home import http -""" -This script must be run as root as it runs arecord. - -This script implements HTTP API for amixer and arecord. -""" - - -# some global variables -# --------------------- - -recorder: Optional[Recorder] -routes = http.routes() -storage: Optional[RecordStorage] - - -# recording methods -# ----------------- - -@routes.get('/record/') -async def do_record(request): - duration = int(request.query['duration']) - max = Recorder.get_max_record_time()*15 - if not 0 < duration <= max: - raise ValueError(f'invalid duration: max duration is {max}') - - record_id = recorder.record(duration) - return http.ok({'id': record_id}) - - -@routes.get('/record/info/{id}/') -async def record_info(request): - record_id = int(request.match_info['id']) - info = recorder.get_info(record_id) - return http.ok(info.as_dict()) - - -@routes.get('/record/forget/{id}/') -async def record_forget(request): - record_id = int(request.match_info['id']) - - info = recorder.get_info(record_id) - assert info.status in (RecordStatus.FINISHED, RecordStatus.ERROR), f"can't forget: record status is {info.status}" - - recorder.forget(record_id) - return http.ok() - - -@routes.get('/record/download/{id}/') -async def record_download(request): - record_id = int(request.match_info['id']) - - info = recorder.get_info(record_id) - assert info.status == RecordStatus.FINISHED, f"record status is {info.status}" - - return http.FileResponse(info.file.path) - - -@routes.get('/storage/list/') -async def storage_list(request): - extended = 'extended' in request.query and int(request.query['extended']) == 1 - - files = storage.getfiles(as_objects=extended) - if extended: - files = list(map(lambda file: file.__dict__(), files)) - - return http.ok({ - 'files': files - }) - - -@routes.get('/storage/delete/') -async def storage_delete(request): - file_id = request.query['file_id'] - file = storage.find(file_id) - if not file: - raise ValueError(f'file {file} not found') - - storage.delete(file) - return http.ok() - - -@routes.get('/storage/download/') -async def storage_download(request): - file_id = request.query['file_id'] - file = storage.find(file_id) - if not file: - raise ValueError(f'file {file} not found') - - return http.FileResponse(file.path) - - -# ALSA mixer methods -# ------------------ +# This script must be run as root as it runs arecord. +# Implements HTTP API for amixer and arecord. +# ------------------------------------------- def _amixer_control_response(control): info = amixer.get(control) @@ -117,57 +23,56 @@ def _amixer_control_response(control): }) -@routes.get('/amixer/get-all/') -async def amixer_get_all(request): - controls_info = amixer.get_all() - return http.ok(controls_info) - - -@routes.get('/amixer/get/{control}/') -async def amixer_get(request): - control = request.match_info['control'] - if not amixer.has_control(control): - raise ValueError(f'invalid control: {control}') +class SoundNodeServer(MediaNodeServer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) - return _amixer_control_response(control) + self.get('/amixer/get-all/', self.amixer_get_all) + self.get('/amixer/get/{control}/', self.amixer_get) + self.get('/amixer/{op:mute|unmute|cap|nocap}/{control}/', self.amixer_set) + self.get('/amixer/{op:incr|decr}/{control}/', self.amixer_volume) + async def amixer_get_all(self, request: http.Request): + controls_info = amixer.get_all() + return self.ok(controls_info) -@routes.get('/amixer/{op:mute|unmute|cap|nocap}/{control}/') -async def amixer_set(request): - op = request.match_info['op'] - control = request.match_info['control'] - if not amixer.has_control(control): - raise ValueError(f'invalid control: {control}') + async def amixer_get(self, request: http.Request): + control = request.match_info['control'] + if not amixer.has_control(control): + raise ValueError(f'invalid control: {control}') - f = getattr(amixer, op) - f(control) + return _amixer_control_response(control) - return _amixer_control_response(control) + async def amixer_set(self, request: http.Request): + op = request.match_info['op'] + control = request.match_info['control'] + if not amixer.has_control(control): + raise ValueError(f'invalid control: {control}') + f = getattr(amixer, op) + f(control) -@routes.get('/amixer/{op:incr|decr}/{control}/') -async def amixer_volume(request): - op = request.match_info['op'] - control = request.match_info['control'] - if not amixer.has_control(control): - raise ValueError(f'invalid control: {control}') + return _amixer_control_response(control) - def get_step() -> Optional[int]: - if 'step' in request.query: - step = int(request.query['step']) - if not 1 <= step <= 50: - raise ValueError('invalid step value') - return step - return None + async def amixer_volume(self, request: http.Request): + op = request.match_info['op'] + control = request.match_info['control'] + if not amixer.has_control(control): + raise ValueError(f'invalid control: {control}') - f = getattr(amixer, op) - f(control, step=get_step()) + def get_step() -> Optional[int]: + if 'step' in request.query: + step = int(request.query['step']) + if not 1 <= step <= 50: + raise ValueError('invalid step value') + return step + return None - return _amixer_control_response(control) + f = getattr(amixer, op) + f(control, step=get_step()) + return _amixer_control_response(control) -# entry point -# ----------- if __name__ == '__main__': if not os.getegid() == 0: @@ -175,9 +80,12 @@ if __name__ == '__main__': config.load('sound_node') - storage = RecordStorage(config['node']['storage']) + storage = SoundRecordStorage(config['node']['storage']) - recorder = Recorder(storage=storage) + recorder = SoundRecorder(storage=storage) recorder.start_thread() - http.serve(parse_addr(config['node']['listen']), routes) + server = SoundNodeServer(recorder=recorder, + storage=storage, + addr=parse_addr(config['node']['listen'])) + server.run() diff --git a/src/sound_sensor_server.py b/src/sound_sensor_server.py index 20d7f0a..0303d6d 100755 --- a/src/sound_sensor_server.py +++ b/src/sound_sensor_server.py @@ -1,31 +1,33 @@ #!/usr/bin/env python3 import logging import threading -import os from time import sleep from typing import Optional +from functools import partial from home.config import config from home.util import parse_addr from home.api import WebAPIClient, RequestParams from home.api.types import SoundSensorLocation from home.soundsensor import SoundSensorServer, SoundSensorHitHandler -from home.sound import RecordClient +from home.media import MediaNodeType, SoundRecordClient, CameraRecordClient, RecordClient interrupted = False logger = logging.getLogger(__name__) server: SoundSensorServer -def get_related_sound_nodes(sensor_name: str) -> list[str]: - if sensor_name not in config['sensor_to_sound_nodes_relations']: +def get_related_nodes(node_type: MediaNodeType, + sensor_name: str) -> list[str]: + if sensor_name not in config[f'sensor_to_{node_type.name.lower()}_nodes_relations']: raise ValueError(f'unexpected sensor name {sensor_name}') - return config['sensor_to_sound_nodes_relations'][sensor_name] + return config[f'sensor_to_{node_type.name.lower()}_nodes_relations'][sensor_name] -def get_sound_node_config(name: str) -> Optional[dict]: - if name in config['sound_nodes']: - cfg = config['sound_nodes'][name] +def get_node_config(node_type: MediaNodeType, + name: str) -> Optional[dict]: + if name in config[f'{node_type.name.lower()}_nodes']: + cfg = config[f'{node_type.name.lower()}_nodes'][name] if 'min_hits' not in cfg: cfg['min_hits'] = 1 return cfg @@ -66,38 +68,43 @@ class HitHandler(SoundSensorHitHandler): logger.error(f'invalid sensor name: {name}') return - try: - nodes = get_related_sound_nodes(name) - except ValueError: - logger.error(f'config for node {name} not found') - return - should_continue = False - for node in nodes: - node_config = get_sound_node_config(node) - if node_config is None: - logger.error(f'config for node {node} not found') - continue - if hits < node_config['min_hits']: - continue - should_continue = True + for node_type in MediaNodeType: + try: + nodes = get_related_nodes(node_type, name) + except ValueError: + logger.error(f'config for {node_type.name.lower()} node {name} not found') + return + + for node in nodes: + node_config = get_node_config(node_type, node) + if node_config is None: + logger.error(f'config for {node_type.name.lower()} node {node} not found') + continue + if hits < node_config['min_hits']: + continue + should_continue = True if not should_continue: return hc.add(name, hits) - if server.is_recording_enabled(): + if not server.is_recording_enabled(): + return + for node_type in MediaNodeType: try: + nodes = get_related_nodes(node_type, name) for node in nodes: - node_config = get_sound_node_config(node) + node_config = get_node_config(node_type, node) if node_config is None: - logger.error(f'node config for {node} not found') + logger.error(f'node config for {node_type.name.lower()} node {node} not found') continue durations = node_config['durations'] dur = durations[1] if hits > node_config['min_hits'] else durations[0] - record.record(node, dur*60, {'node': node}) + record_clients[node_type].record(node, dur*60, {'node': node}) + except ValueError as exc: logger.exception(exc) @@ -112,22 +119,26 @@ def hits_sender(): api: Optional[WebAPIClient] = None hc: Optional[HitCounter] = None -record: Optional[RecordClient] = None +record_clients: dict[MediaNodeType, RecordClient] = {} # record callbacks - # ---------------- -def record_error(info: dict, userdata: dict): +def record_error(type: MediaNodeType, + info: dict, + userdata: dict): node = userdata['node'] - logger.error('recording ' + str(dict) + ' from node ' + node + ' failed') + logger.error('recording ' + str(dict) + f' from {type.name.lower()} node ' + node + ' failed') - record.forget(node, info['id']) + record_clients[type].forget(node, info['id']) -def record_finished(info: dict, fn: str, userdata: dict): - logger.debug('record finished: ' + str(info)) +def record_finished(type: MediaNodeType, + info: dict, + fn: str, + userdata: dict): + logger.debug(f'{type.name.lower()} record finished: ' + str(info)) # audio could have been requested by other user (telegram bot, for example) # so we shouldn't 'forget' it here @@ -140,30 +151,8 @@ def record_finished(info: dict, fn: str, userdata: dict): # -------------------- def api_error_handler(exc, name, req: RequestParams): - if name == 'upload_recording': - logger.error('failed to upload recording, exception below') - logger.exception(exc) - - else: - logger.error(f'api call ({name}, params={req.params}) failed, exception below') - logger.exception(exc) - - -def api_success_handler(response, name, req: RequestParams): - if name == 'upload_recording': - node = req.params['node'] - rid = req.params['record_id'] - - logger.debug(f'successfully uploaded recording (node={node}, record_id={rid}), api response:' + str(response)) - - # deleting temp file - try: - os.unlink(req.files['file']) - except OSError as exc: - logger.error(f'error while deleting temp file:') - logger.exception(exc) - - record.forget(node, rid) + logger.error(f'api call ({name}, params={req.params}) failed, exception below') + logger.exception(exc) if __name__ == '__main__': @@ -171,25 +160,35 @@ if __name__ == '__main__': hc = HitCounter() api = WebAPIClient(timeout=(10, 60)) - api.enable_async(error_handler=api_error_handler, - success_handler=api_success_handler) + api.enable_async(error_handler=api_error_handler) t = threading.Thread(target=hits_sender) t.daemon = True t.start() - nodes = {} + sound_nodes = {} for nodename, nodecfg in config['sound_nodes'].items(): - nodes[nodename] = parse_addr(nodecfg['addr']) + sound_nodes[nodename] = parse_addr(nodecfg['addr']) + + camera_nodes = {} + for nodename, nodecfg in config['camera_nodes'].items(): + camera_nodes[nodename] = parse_addr(nodecfg['addr']) + + if sound_nodes: + record_clients[MediaNodeType.SOUND] = SoundRecordClient(sound_nodes, + error_handler=partial(record_error, MediaNodeType.SOUND), + finished_handler=partial(record_finished, MediaNodeType.SOUND)) - record = RecordClient(nodes, - error_handler=record_error, - finished_handler=record_finished) + if camera_nodes: + record_clients[MediaNodeType.CAMERA] = CameraRecordClient(camera_nodes, + error_handler=partial(record_error, MediaNodeType.CAMERA), + finished_handler=partial(record_finished, MediaNodeType.CAMERA)) try: server = SoundSensorServer(parse_addr(config['server']['listen']), HitHandler) server.run() except KeyboardInterrupt: interrupted = True - record.stop() + for c in record_clients.values(): + c.stop() logging.info('keyboard interrupt, exiting...') diff --git a/src/test/__init__.py b/src/test/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/test/test.py b/src/test/test.py deleted file mode 100755 index 7ea37e6..0000000 --- a/src/test/test.py +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python -from home.relay import RelayClient - - -if __name__ == '__main__': - c = RelayClient() - print(c, c._host) \ No newline at end of file diff --git a/src/test/test_amixer.py b/src/test/test_amixer.py deleted file mode 100755 index ac96881..0000000 --- a/src/test/test_amixer.py +++ /dev/null @@ -1,79 +0,0 @@ -#!/usr/bin/env python3 -import sys, os.path -sys.path.extend([ - os.path.realpath(os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..')), -]) - -from argparse import ArgumentParser -from src.home.config import config -from src.home.sound import amixer - - -def validate_control(input: str): - for control in config['amixer']['controls']: - if control['name'] == input: - return - raise ValueError(f'invalid control name: {input}') - - -if __name__ == '__main__': - parser = ArgumentParser() - parser.add_argument('--get-all', action='store_true') - parser.add_argument('--mute', type=str) - parser.add_argument('--unmute', type=str) - parser.add_argument('--cap', type=str) - parser.add_argument('--nocap', type=str) - parser.add_argument('--get', type=str) - parser.add_argument('--incr', type=str) - parser.add_argument('--decr', type=str) - # parser.add_argument('--dump-config', action='store_true') - - args = config.load('test_amixer', parser=parser) - - # if args.dump_config: - # print(config.data) - # sys.exit() - - if args.get_all: - for control in amixer.get_all(): - print(f'control = {control["name"]}') - for line in control['info'].split('\n'): - print(f' {line}') - print() - sys.exit() - - if args.get: - info = amixer.get(args.get) - print(info) - sys.exit() - - for action in ['incr', 'decr']: - if hasattr(args, action): - control = getattr(args, action) - if control is None: - continue - - print(f'attempting to {action} {control}') - validate_control(control) - func = getattr(amixer, action) - try: - func(control, step=5) - except amixer.AmixerError as e: - print('error: ' + str(e)) - sys.exit() - - for action in ['mute', 'unmute', 'cap', 'nocap']: - if hasattr(args, action): - control = getattr(args, action) - if control is None: - continue - - print(f"attempting to {action} {control}") - - validate_control(control) - func = getattr(amixer, action) - try: - func(control) - except amixer.AmixerError as e: - print('error: ' + str(e)) - sys.exit() diff --git a/src/test/test_api.py b/src/test/test_api.py deleted file mode 100755 index 959b2b3..0000000 --- a/src/test/test_api.py +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env python3 -from home.api import WebAPIClient -from home.api.types import BotType -from home.config import config - - -if __name__ == '__main__': - config.load('test_api') - - api = WebAPIClient() - print(api.log_bot_request(BotType.ADMIN, 1, "test_api.py")) diff --git a/src/test/test_esp32_cam.py b/src/test/test_esp32_cam.py deleted file mode 100755 index 883b6f0..0000000 --- a/src/test/test_esp32_cam.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python3 -import sys -import os.path -sys.path.extend([ - os.path.realpath( - os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..') - ) -]) - -from pprint import pprint -from argparse import ArgumentParser -from time import sleep -from src.home.util import parse_addr -from src.home.camera import esp32 -from src.home.config import config - -if __name__ == '__main__': - parser = ArgumentParser() - parser.add_argument('--addr', type=str, required=True, - help='camera server address, in host:port format') - parser.add_argument('--status', action='store_true', - help='print status and exit') - - arg = config.load(False, parser=parser) - cam = esp32.WebClient(addr=parse_addr(arg.addr)) - - if arg.status: - status = cam.getstatus() - pprint(status) - sys.exit(0) - - if cam.syncsettings(dict( - vflip=True, - hmirror=True, - framesize=esp32.FrameSize.SVGA_800x600, - lenc=True, - wpc=False, - bpc=False, - raw_gma=False, - agc=True, - gainceiling=5, - quality=10, - awb_gain=False, - awb=True, - aec_dsp=True, - aec=True - )) is True: - print('some settings were changed, sleeping for 0.5 sec') - sleep(0.5) - - # cam.setdelay(200) - - cam.setflash(True) - sleep(0.2) - cam.capture('/tmp/capture.jpg') - cam.setflash(False) diff --git a/src/test/test_inverter_monitor.py b/src/test/test_inverter_monitor.py deleted file mode 100755 index d9b63d3..0000000 --- a/src/test/test_inverter_monitor.py +++ /dev/null @@ -1,376 +0,0 @@ -#!/usr/bin/env python3 -import cmd -import time -import logging -import socket -import sys -import threading -import os.path -sys.path.extend([ - os.path.realpath( - os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..') - ) -]) - -from enum import Enum, auto -from typing import Optional -from src.home.util import stringify -from src.home.config import config -from src.home.inverter import ( - wrapper_instance as inverter, - - InverterMonitor, - ChargingEvent, - BatteryState, - BatteryPowerDirection, -) - - -def monitor_charging(event: ChargingEvent, **kwargs) -> None: - msg = 'event: ' + event.name - if event == ChargingEvent.AC_CURRENT_CHANGED: - msg += f' (current={kwargs["current"]})' - evt_logger.info(msg) - - -def monitor_battery(state: BatteryState, v: float, load_watts: int) -> None: - evt_logger.info(f'bat: {state.name}, v: {v}, load_watts: {load_watts}') - - -def monitor_error(error: str) -> None: - evt_logger.warning('error: ' + error) - - -class InverterTestShell(cmd.Cmd): - intro = 'Welcome to the test shell. Type help or ? to list commands.\n' - prompt = '(test) ' - file = None - - def do_connect_ac(self, arg): - server.connect_ac() - - def do_disconnect_ac(self, arg): - server.disconnect_ac() - - def do_pd_charge(self, arg): - server.set_pd(BatteryPowerDirection.CHARGING) - - def do_pd_nothing(self, arg): - server.set_pd(BatteryPowerDirection.DO_NOTHING) - - def do_pd_discharge(self, arg): - server.set_pd(BatteryPowerDirection.DISCHARGING) - - -class ChargerMode(Enum): - NONE = auto() - CHARGING = auto() - - -class ChargerEmulator(threading.Thread): - def __init__(self): - super().__init__() - self.setName('ChargerEmulator') - - self.logger = logging.getLogger('charger') - self.interrupted = False - self.mode = ChargerMode.NONE - - self.pd = None - self.ac_connected = False - self.mppt_connected = False - - def run(self): - while not self.interrupted: - if self.pd == BatteryPowerDirection.CHARGING\ - and self.ac_connected\ - and not self.mppt_connected: - - v = server._get_voltage() + 0.02 - self.logger.info('incrementing voltage') - server.set_voltage(v) - - time.sleep(2) - - def stop(self): - self.interrupted = True - - def setmode(self, mode: ChargerMode): - self.mode = mode - - def ac_changed(self, connected: bool): - self.ac_connected = connected - - def mppt_changed(self, connected: bool): - self.mppt_connected = connected - - def current_changed(self, amps): - # FIXME - # this method is not being called and voltage is not changing] - # when current changes - v = None - if amps == 2: - v = 49 - elif amps == 10: - v = 51 - elif amps == 20: - v = 52.5 - elif amps == 30: - v = 53.5 - elif amps == 40: - v = 54.5 - if v is not None: - self.logger.info(f'setting voltage {v}') - server.set_voltage(v) - - def pd_changed(self, pd: BatteryPowerDirection): - self.pd = pd - - -class InverterEmulator(threading.Thread): - def __init__(self, host: str, port: int): - super().__init__() - self.setName('InverterEmulatorServer') - self.lock = threading.Lock() - - self.status = {"grid_voltage": {"unit": "V", "value": 0.0}, - "grid_freq": {"unit": "Hz", "value": 0.0}, - "ac_output_voltage": {"unit": "V", "value": 230.0}, - "ac_output_freq": {"unit": "Hz", "value": 50.0}, - "ac_output_apparent_power": {"unit": "VA", "value": 92}, - "ac_output_active_power": {"unit": "Wh", "value": 30}, - "output_load_percent": {"unit": "%", "value": 1}, - "battery_voltage": {"unit": "V", "value": 48.4}, - "battery_voltage_scc": {"unit": "V", "value": 0.0}, - "battery_voltage_scc2": {"unit": "V", "value": 0.0}, - "battery_discharging_current": {"unit": "A", "value": 0}, - "battery_charging_current": {"unit": "A", "value": 0}, - "battery_capacity": {"unit": "%", "value": 62}, - "inverter_heat_sink_temp": {"unit": "°C", "value": 8}, - "mppt1_charger_temp": {"unit": "°C", "value": 0}, - "mppt2_charger_temp": {"unit": "°C", "value": 0}, - "pv1_input_power": {"unit": "Wh", "value": 0}, - "pv2_input_power": {"unit": "Wh", "value": 0}, - "pv1_input_voltage": {"unit": "V", "value": 0.0}, - "pv2_input_voltage": {"unit": "V", "value": 0.0}, - "configuration_status": "Default", - "mppt1_charger_status": "Abnormal", - "mppt2_charger_status": "Abnormal", - "load_connected": "Connected", - "battery_power_direction": "Discharge", - "dc_ac_power_direction": "DC/AC", - "line_power_direction": "Do nothing", - "local_parallel_id": 0} - self.rated = {"ac_input_rating_voltage": {"unit": "V", "value": 230.0}, - "ac_input_rating_current": {"unit": "A", "value": 21.7}, - "ac_output_rating_voltage": {"unit": "V", "value": 230.0}, - "ac_output_rating_freq": {"unit": "Hz", "value": 50.0}, - "ac_output_rating_current": {"unit": "A", "value": 21.7}, - "ac_output_rating_apparent_power": {"unit": "VA", "value": 5000}, - "ac_output_rating_active_power": {"unit": "Wh", "value": 5000}, - "battery_rating_voltage": {"unit": "V", "value": 48.0}, - "battery_recharge_voltage": {"unit": "V", "value": 51.0}, - "battery_redischarge_voltage": {"unit": "V", "value": 58.0}, - "battery_under_voltage": {"unit": "V", "value": 42.0}, - "battery_bulk_voltage": {"unit": "V", "value": 57.6}, - "battery_float_voltage": {"unit": "V", "value": 54.0}, - "battery_type": "User", - "max_charging_current": {"unit": "A", "value": 60}, - "max_ac_charging_current": {"unit": "A", "value": 10}, - "input_voltage_range": "Appliance", - "output_source_priority": "Parallel output", - "charge_source_priority": "Solar-and-Utility", - "parallel_max_num": 6, - "machine_type": "Off-Grid-Tie", - "topology": "Transformer-less", - "output_model_setting": "Single module", - "solar_power_priority": "Load-Battery-Utility", - "mppt": "2"} - - self.host = host - self.port = port - self.interrupted = False - self.logger = logging.getLogger('srv') - - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.sock.bind((self.host, self.port)) - - def run(self): - self.sock.listen(5) - - while not self.interrupted: - conn, address = self.sock.accept() - - alive = True - while alive: - try: - buf = conn.recv(2048) - message = buf.decode().strip() - except OSError as exc: - self.logger.error('failed to recv()') - self.logger.exception(exc) - - alive = False - - try: - conn.close() - except: - pass - - continue # exit the loop - - self.logger.log(0, f'< {message}') - - if message.strip() == '': - continue - - if message == 'format json': - # self.logger.info(f'got {message}') - self.reply_ok(conn) - - elif message.startswith('exec '): - command = message[5:].split() - args = command[1:] - command = command[0] - - if command == 'get-allowed-ac-charging-currents': - self.reply_ok(conn, [2, 10, 20, 30, 40, 50, 60]) - elif command == 'get-status': - self.reply_ok(conn, self._get_status()) - elif command == 'get-rated': - self.reply_ok(conn, self._get_rated()) - elif command == 'set-max-ac-charging-current': - self.set_ac_current(args[1]) - self.reply_ok(conn, 1) - else: - raise ValueError('unsupported command: ' + command) - else: - raise ValueError('unexpected request: ' + message) - - def reply_ok(self, connection, data=None): - buf = 'ok' + '\r\n' - if data: - if not isinstance(data, str): - data = stringify({'result': 'ok', 'data': data}) - buf += data + '\r\n' - buf += '\r\n' - self.logger.log(0, f'> {buf.strip()}') - connection.sendall(buf.encode()) - - def _get_status(self) -> dict: - with self.lock: - return self.status - - def _get_rated(self) -> dict: - with self.lock: - return self.rated - - def _get_voltage(self) -> float: - with self.lock: - return self.status['battery_voltage']['value'] - - def stop(self): - self.interrupted = True - self.sock.close() - - def connect_ac(self): - with self.lock: - self.status['grid_voltage']['value'] = 230 - self.status['grid_freq']['value'] = 50 - charger.ac_changed(True) - - def disconnect_ac(self): - with self.lock: - self.status['grid_voltage']['value'] = 0 - self.status['grid_freq']['value'] = 0 - #self.status['battery_voltage']['value'] = 48.4 # revert to initial value - charger.ac_changed(False) - - def connect_mppt(self): - with self.lock: - self.status['pv1_input_power']['value'] = 1 - self.status['pv1_input_voltage']['value'] = 50 - self.status['mppt1_charger_status'] = 'Charging' - charger.mppt_changed(True) - - def disconnect_mppt(self): - with self.lock: - self.status['pv1_input_power']['value'] = 0 - self.status['pv1_input_voltage']['value'] = 0 - self.status['mppt1_charger_status'] = 'Abnormal' - charger.mppt_changed(False) - - def set_voltage(self, v: float): - with self.lock: - self.status['battery_voltage']['value'] = v - - def set_ac_current(self, amps): - with self.lock: - self.rated['max_ac_charging_current']['value'] = amps - charger.current_changed(amps) - - def set_pd(self, pd: BatteryPowerDirection): - if pd == BatteryPowerDirection.CHARGING: - val = 'Charge' - elif pd == BatteryPowerDirection.DISCHARGING: - val = 'Discharge' - else: - val = 'Do nothing' - with self.lock: - self.status['battery_power_direction'] = val - charger.pd_changed(pd) - - -logger = logging.getLogger(__name__) -evt_logger = logging.getLogger('evt') -server: Optional[InverterEmulator] = None -charger: Optional[ChargerEmulator] = None - - -def main(): - global server, charger - - # start fake inverterd server - try: - server = InverterEmulator(host=config['inverter']['host'], - port=config['inverter']['port']) - server.start() - except OSError as e: - logger.error('failed to start server') - logger.exception(e) - return - logger.info('server started') - - # start charger thread - charger = ChargerEmulator() - charger.start() - - # init inverterd wrapper - inverter.init(host=config['inverter']['host'], - port=config['inverter']['port']) - - # start monitor - mon = InverterMonitor() - mon.set_charging_event_handler(monitor_charging) - mon.set_battery_event_handler(monitor_battery) - mon.set_error_handler(monitor_error) - mon.start() - logger.info('monitor started') - - try: - InverterTestShell().cmdloop() - - server.join() - mon.join() - charger.join() - - except KeyboardInterrupt: - server.stop() - mon.stop() - charger.stop() - - -if __name__ == '__main__': - config.load('test_inverter_monitor') - main() diff --git a/src/test/test_record_upload.py b/src/test/test_record_upload.py deleted file mode 100755 index 54ff06f..0000000 --- a/src/test/test_record_upload.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env python3 -import logging -import sys -import os.path -sys.path.extend([ - os.path.realpath( - os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..') - ) -]) - -import time - -from src.home.api import WebAPIClient, RequestParams -from src.home.config import config -from src.home.sound import RecordClient -from src.home.util import parse_addr - -logger = logging.getLogger(__name__) - - -# record callbacks -# ---------------- - -def record_error(info: dict, userdata: dict): - node = userdata['node'] - # TODO - - -def record_finished(info: dict, fn: str, userdata: dict): - logger.info('record finished: ' + str(info)) - - node = userdata['node'] - api.upload_recording(fn, node, info['id'], int(info['start_time']), int(info['stop_time'])) - - -# api client callbacks -# -------------------- - -def api_error_handler(exc, name, req: RequestParams): - if name == 'upload_recording': - logger.error('failed to upload recording, exception below') - logger.exception(exc) - - else: - logger.error(f'api call ({name}, params={req.params}) failed, exception below') - logger.exception(exc) - - -def api_success_handler(response, name, req: RequestParams): - if name == 'upload_recording': - node = req.params['node'] - rid = req.params['record_id'] - - logger.debug(f'successfully uploaded recording (node={node}, record_id={rid}), api response:' + str(response)) - - # deleting temp file - try: - os.unlink(req.files['file']) - except OSError as exc: - logger.error(f'error while deleting temp file:') - logger.exception(exc) - - record.forget(node, rid) - - -if __name__ == '__main__': - config.load('test_record_upload') - - nodes = {} - for name, addr in config['nodes'].items(): - nodes[name] = parse_addr(addr) - record = RecordClient(nodes, - error_handler=record_error, - finished_handler=record_finished, - download_on_finish=True) - - api = WebAPIClient() - api.enable_async(error_handler=api_error_handler, - success_handler=api_success_handler) - - record_id = record.record('localhost', 3, {'node': 'localhost'}) - print(f'record_id: {record_id}') - - while True: - try: - time.sleep(0.1) - except (KeyboardInterrupt, SystemExit): - break \ No newline at end of file diff --git a/src/test/test_send_fake_sound_hit.py b/src/test/test_send_fake_sound_hit.py deleted file mode 100755 index af6b7eb..0000000 --- a/src/test/test_send_fake_sound_hit.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python3 -import sys -import os.path -sys.path.extend([ - os.path.realpath( - os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..') - ) -]) - -from argparse import ArgumentParser -from src.home.util import send_datagram, stringify, parse_addr - - -if __name__ == '__main__': - parser = ArgumentParser() - parser.add_argument('--name', type=str, required=True, - help='node name, like `diana`') - parser.add_argument('--hits', type=int, required=True, - help='hits count') - parser.add_argument('--server', type=str, required=True, - help='center server addr in host:port format') - - args = parser.parse_args() - - send_datagram(stringify([args.name, args.hits]), parse_addr(args.server)) diff --git a/src/test/test_sensors_plot.py b/src/test/test_sensors_plot.py deleted file mode 100755 index e69de29..0000000 diff --git a/src/test/test_sound_node_client.py b/src/test/test_sound_node_client.py deleted file mode 100755 index 795165a..0000000 --- a/src/test/test_sound_node_client.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python3 -import sys, os.path -sys.path.extend([ - os.path.realpath(os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..')), -]) - -from src.home.api.errors import ApiResponseError -from src.home.sound import SoundNodeClient - - -if __name__ == '__main__': - client = SoundNodeClient(('127.0.0.1', 8313)) - print(client.amixer_get_all()) - - try: - client.amixer_get('invalidname') - except ApiResponseError as exc: - print(exc) - diff --git a/src/test/test_sound_server_api.py b/src/test/test_sound_server_api.py deleted file mode 100755 index 568ea7e..0000000 --- a/src/test/test_sound_server_api.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 -import sys -import os.path -sys.path.extend([ - os.path.realpath( - os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..') - ) -]) -import threading - -from time import sleep -from src.home.config import config -from src.home.api import WebAPIClient -from src.home.api.types import SoundSensorLocation - -interrupted = False - - -class HitCounter: - def __init__(self): - self.sensors = {} - self.lock = threading.Lock() - self._reset_sensors() - - def _reset_sensors(self): - for loc in SoundSensorLocation: - self.sensors[loc.name.lower()] = 0 - - def add(self, name: str, hits: int): - if name not in self.sensors: - raise ValueError(f'sensor {name} not found') - - with self.lock: - self.sensors[name] += hits - - def get_all(self) -> list[tuple[str, int]]: - vals = [] - with self.lock: - for name, hits in self.sensors.items(): - if hits > 0: - vals.append((name, hits)) - self._reset_sensors() - return vals - - -def hits_sender(): - while True: - try: - all_hits = hc.get_all() - if all_hits: - api.add_sound_sensor_hits(all_hits) - sleep(5) - except (KeyboardInterrupt, SystemExit): - return - - -if __name__ == '__main__': - config.load('test_api') - - hc = HitCounter() - api = WebAPIClient() - - hc.add('spb1', 1) - # hc.add('big_house', 123) - - hits_sender() diff --git a/src/test/test_stopwatch.py b/src/test/test_stopwatch.py deleted file mode 100755 index 6ff2c0e..0000000 --- a/src/test/test_stopwatch.py +++ /dev/null @@ -1,16 +0,0 @@ -from home.util import Stopwatch, StopwatchError -from time import sleep - - -if __name__ == '__main__': - s = Stopwatch() - s.go() - sleep(2) - s.pause() - s.go() - sleep(1) - print(s.get_elapsed_time()) - sleep(1) - print(s.get_elapsed_time()) - s.pause() - print(s.get_elapsed_time()) diff --git a/systemd/camera_node.service b/systemd/camera_node.service new file mode 100644 index 0000000..0de3cc1 --- /dev/null +++ b/systemd/camera_node.service @@ -0,0 +1,13 @@ +[Unit] +Description=HomeKit Camera Node +After=network-online.target + +[Service] +User=user +Group=user +Restart=on-failure +ExecStart=/home/user/homekit/src/camera_node.py +WorkingDirectory=/home/user + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/systemd/camera_node@.service b/systemd/camera_node@.service new file mode 100644 index 0000000..414881e --- /dev/null +++ b/systemd/camera_node@.service @@ -0,0 +1,13 @@ +[Unit] +Description=HomeKit Camera Node +After=network-online.target + +[Service] +User=user +Group=user +Restart=on-failure +ExecStart=/home/user/homekit/src/camera_node.py --config /home/user/.config/camera_node.%i.yaml +WorkingDirectory=/home/user + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/test.py b/test/test.py new file mode 100755 index 0000000..7ea37e6 --- /dev/null +++ b/test/test.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +from home.relay import RelayClient + + +if __name__ == '__main__': + c = RelayClient() + print(c, c._host) \ No newline at end of file diff --git a/test/test_amixer.py b/test/test_amixer.py new file mode 100755 index 0000000..c8bd546 --- /dev/null +++ b/test/test_amixer.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +import sys, os.path +sys.path.extend([ + os.path.realpath(os.path.join(os.path.dirname(os.path.join(__file__)), '..')), +]) + +from argparse import ArgumentParser +from src.home.config import config +from src.home.audio import amixer + + +def validate_control(input: str): + for control in config['amixer']['controls']: + if control['name'] == input: + return + raise ValueError(f'invalid control name: {input}') + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--get-all', action='store_true') + parser.add_argument('--mute', type=str) + parser.add_argument('--unmute', type=str) + parser.add_argument('--cap', type=str) + parser.add_argument('--nocap', type=str) + parser.add_argument('--get', type=str) + parser.add_argument('--incr', type=str) + parser.add_argument('--decr', type=str) + # parser.add_argument('--dump-config', action='store_true') + + args = config.load('test_amixer', parser=parser) + + # if args.dump_config: + # print(config.data) + # sys.exit() + + if args.get_all: + for control in amixer.get_all(): + print(f'control = {control["name"]}') + for line in control['info'].split('\n'): + print(f' {line}') + print() + sys.exit() + + if args.get: + info = amixer.get(args.get) + print(info) + sys.exit() + + for action in ['incr', 'decr']: + if hasattr(args, action): + control = getattr(args, action) + if control is None: + continue + + print(f'attempting to {action} {control}') + validate_control(control) + func = getattr(amixer, action) + try: + func(control, step=5) + except amixer.AmixerError as e: + print('error: ' + str(e)) + sys.exit() + + for action in ['mute', 'unmute', 'cap', 'nocap']: + if hasattr(args, action): + control = getattr(args, action) + if control is None: + continue + + print(f"attempting to {action} {control}") + + validate_control(control) + func = getattr(amixer, action) + try: + func(control) + except amixer.AmixerError as e: + print('error: ' + str(e)) + sys.exit() diff --git a/test/test_api.py b/test/test_api.py new file mode 100755 index 0000000..959b2b3 --- /dev/null +++ b/test/test_api.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +from home.api import WebAPIClient +from home.api.types import BotType +from home.config import config + + +if __name__ == '__main__': + config.load('test_api') + + api = WebAPIClient() + print(api.log_bot_request(BotType.ADMIN, 1, "test_api.py")) diff --git a/test/test_esp32_cam.py b/test/test_esp32_cam.py new file mode 100755 index 0000000..27ce379 --- /dev/null +++ b/test/test_esp32_cam.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +import sys +import os.path +sys.path.extend([ + os.path.realpath( + os.path.join(os.path.dirname(os.path.join(__file__)), '..') + ) +]) + +from pprint import pprint +from argparse import ArgumentParser +from time import sleep +from src.home.util import parse_addr +from src.home.camera import esp32 +from src.home.config import config + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--addr', type=str, required=True, + help='camera server address, in host:port format') + parser.add_argument('--status', action='store_true', + help='print status and exit') + + arg = config.load(False, parser=parser) + cam = esp32.WebClient(addr=parse_addr(arg.addr)) + + if arg.status: + status = cam.getstatus() + pprint(status) + sys.exit(0) + + if cam.syncsettings(dict( + vflip=True, + hmirror=True, + framesize=esp32.FrameSize.SVGA_800x600, + lenc=True, + wpc=False, + bpc=False, + raw_gma=False, + agc=True, + gainceiling=5, + quality=10, + awb_gain=False, + awb=True, + aec_dsp=True, + aec=True + )) is True: + print('some settings were changed, sleeping for 0.5 sec') + sleep(0.5) + + # cam.setdelay(200) + + cam.setflash(True) + sleep(0.2) + cam.capture('/tmp/capture.jpg') + cam.setflash(False) diff --git a/test/test_inverter_monitor.py b/test/test_inverter_monitor.py new file mode 100755 index 0000000..cbf1b82 --- /dev/null +++ b/test/test_inverter_monitor.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +import cmd +import time +import logging +import socket +import sys +import threading +import os.path +sys.path.extend([ + os.path.realpath( + os.path.join(os.path.dirname(os.path.join(__file__)), '..') + ) +]) + +from enum import Enum, auto +from typing import Optional +from src.home.util import stringify +from src.home.config import config +from src.home.inverter import ( + wrapper_instance as inverter, + + InverterMonitor, + ChargingEvent, + BatteryState, + BatteryPowerDirection, +) + + +def monitor_charging(event: ChargingEvent, **kwargs) -> None: + msg = 'event: ' + event.name + if event == ChargingEvent.AC_CURRENT_CHANGED: + msg += f' (current={kwargs["current"]})' + evt_logger.info(msg) + + +def monitor_battery(state: BatteryState, v: float, load_watts: int) -> None: + evt_logger.info(f'bat: {state.name}, v: {v}, load_watts: {load_watts}') + + +def monitor_error(error: str) -> None: + evt_logger.warning('error: ' + error) + + +class InverterTestShell(cmd.Cmd): + intro = 'Welcome to the test shell. Type help or ? to list commands.\n' + prompt = '(test) ' + file = None + + def do_connect_ac(self, arg): + server.connect_ac() + + def do_disconnect_ac(self, arg): + server.disconnect_ac() + + def do_pd_charge(self, arg): + server.set_pd(BatteryPowerDirection.CHARGING) + + def do_pd_nothing(self, arg): + server.set_pd(BatteryPowerDirection.DO_NOTHING) + + def do_pd_discharge(self, arg): + server.set_pd(BatteryPowerDirection.DISCHARGING) + + +class ChargerMode(Enum): + NONE = auto() + CHARGING = auto() + + +class ChargerEmulator(threading.Thread): + def __init__(self): + super().__init__() + self.setName('ChargerEmulator') + + self.logger = logging.getLogger('charger') + self.interrupted = False + self.mode = ChargerMode.NONE + + self.pd = None + self.ac_connected = False + self.mppt_connected = False + + def run(self): + while not self.interrupted: + if self.pd == BatteryPowerDirection.CHARGING\ + and self.ac_connected\ + and not self.mppt_connected: + + v = server._get_voltage() + 0.02 + self.logger.info('incrementing voltage') + server.set_voltage(v) + + time.sleep(2) + + def stop(self): + self.interrupted = True + + def setmode(self, mode: ChargerMode): + self.mode = mode + + def ac_changed(self, connected: bool): + self.ac_connected = connected + + def mppt_changed(self, connected: bool): + self.mppt_connected = connected + + def current_changed(self, amps): + # FIXME + # this method is not being called and voltage is not changing] + # when current changes + v = None + if amps == 2: + v = 49 + elif amps == 10: + v = 51 + elif amps == 20: + v = 52.5 + elif amps == 30: + v = 53.5 + elif amps == 40: + v = 54.5 + if v is not None: + self.logger.info(f'setting voltage {v}') + server.set_voltage(v) + + def pd_changed(self, pd: BatteryPowerDirection): + self.pd = pd + + +class InverterEmulator(threading.Thread): + def __init__(self, host: str, port: int): + super().__init__() + self.setName('InverterEmulatorServer') + self.lock = threading.Lock() + + self.status = {"grid_voltage": {"unit": "V", "value": 0.0}, + "grid_freq": {"unit": "Hz", "value": 0.0}, + "ac_output_voltage": {"unit": "V", "value": 230.0}, + "ac_output_freq": {"unit": "Hz", "value": 50.0}, + "ac_output_apparent_power": {"unit": "VA", "value": 92}, + "ac_output_active_power": {"unit": "Wh", "value": 30}, + "output_load_percent": {"unit": "%", "value": 1}, + "battery_voltage": {"unit": "V", "value": 48.4}, + "battery_voltage_scc": {"unit": "V", "value": 0.0}, + "battery_voltage_scc2": {"unit": "V", "value": 0.0}, + "battery_discharging_current": {"unit": "A", "value": 0}, + "battery_charging_current": {"unit": "A", "value": 0}, + "battery_capacity": {"unit": "%", "value": 62}, + "inverter_heat_sink_temp": {"unit": "°C", "value": 8}, + "mppt1_charger_temp": {"unit": "°C", "value": 0}, + "mppt2_charger_temp": {"unit": "°C", "value": 0}, + "pv1_input_power": {"unit": "Wh", "value": 0}, + "pv2_input_power": {"unit": "Wh", "value": 0}, + "pv1_input_voltage": {"unit": "V", "value": 0.0}, + "pv2_input_voltage": {"unit": "V", "value": 0.0}, + "configuration_status": "Default", + "mppt1_charger_status": "Abnormal", + "mppt2_charger_status": "Abnormal", + "load_connected": "Connected", + "battery_power_direction": "Discharge", + "dc_ac_power_direction": "DC/AC", + "line_power_direction": "Do nothing", + "local_parallel_id": 0} + self.rated = {"ac_input_rating_voltage": {"unit": "V", "value": 230.0}, + "ac_input_rating_current": {"unit": "A", "value": 21.7}, + "ac_output_rating_voltage": {"unit": "V", "value": 230.0}, + "ac_output_rating_freq": {"unit": "Hz", "value": 50.0}, + "ac_output_rating_current": {"unit": "A", "value": 21.7}, + "ac_output_rating_apparent_power": {"unit": "VA", "value": 5000}, + "ac_output_rating_active_power": {"unit": "Wh", "value": 5000}, + "battery_rating_voltage": {"unit": "V", "value": 48.0}, + "battery_recharge_voltage": {"unit": "V", "value": 51.0}, + "battery_redischarge_voltage": {"unit": "V", "value": 58.0}, + "battery_under_voltage": {"unit": "V", "value": 42.0}, + "battery_bulk_voltage": {"unit": "V", "value": 57.6}, + "battery_float_voltage": {"unit": "V", "value": 54.0}, + "battery_type": "User", + "max_charging_current": {"unit": "A", "value": 60}, + "max_ac_charging_current": {"unit": "A", "value": 10}, + "input_voltage_range": "Appliance", + "output_source_priority": "Parallel output", + "charge_source_priority": "Solar-and-Utility", + "parallel_max_num": 6, + "machine_type": "Off-Grid-Tie", + "topology": "Transformer-less", + "output_model_setting": "Single module", + "solar_power_priority": "Load-Battery-Utility", + "mppt": "2"} + + self.host = host + self.port = port + self.interrupted = False + self.logger = logging.getLogger('srv') + + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind((self.host, self.port)) + + def run(self): + self.sock.listen(5) + + while not self.interrupted: + conn, address = self.sock.accept() + + alive = True + while alive: + try: + buf = conn.recv(2048) + message = buf.decode().strip() + except OSError as exc: + self.logger.error('failed to recv()') + self.logger.exception(exc) + + alive = False + + try: + conn.close() + except: + pass + + continue # exit the loop + + self.logger.log(0, f'< {message}') + + if message.strip() == '': + continue + + if message == 'format json': + # self.logger.info(f'got {message}') + self.reply_ok(conn) + + elif message.startswith('exec '): + command = message[5:].split() + args = command[1:] + command = command[0] + + if command == 'get-allowed-ac-charging-currents': + self.reply_ok(conn, [2, 10, 20, 30, 40, 50, 60]) + elif command == 'get-status': + self.reply_ok(conn, self._get_status()) + elif command == 'get-rated': + self.reply_ok(conn, self._get_rated()) + elif command == 'set-max-ac-charging-current': + self.set_ac_current(args[1]) + self.reply_ok(conn, 1) + else: + raise ValueError('unsupported command: ' + command) + else: + raise ValueError('unexpected request: ' + message) + + def reply_ok(self, connection, data=None): + buf = 'ok' + '\r\n' + if data: + if not isinstance(data, str): + data = stringify({'result': 'ok', 'data': data}) + buf += data + '\r\n' + buf += '\r\n' + self.logger.log(0, f'> {buf.strip()}') + connection.sendall(buf.encode()) + + def _get_status(self) -> dict: + with self.lock: + return self.status + + def _get_rated(self) -> dict: + with self.lock: + return self.rated + + def _get_voltage(self) -> float: + with self.lock: + return self.status['battery_voltage']['value'] + + def stop(self): + self.interrupted = True + self.sock.close() + + def connect_ac(self): + with self.lock: + self.status['grid_voltage']['value'] = 230 + self.status['grid_freq']['value'] = 50 + charger.ac_changed(True) + + def disconnect_ac(self): + with self.lock: + self.status['grid_voltage']['value'] = 0 + self.status['grid_freq']['value'] = 0 + #self.status['battery_voltage']['value'] = 48.4 # revert to initial value + charger.ac_changed(False) + + def connect_mppt(self): + with self.lock: + self.status['pv1_input_power']['value'] = 1 + self.status['pv1_input_voltage']['value'] = 50 + self.status['mppt1_charger_status'] = 'Charging' + charger.mppt_changed(True) + + def disconnect_mppt(self): + with self.lock: + self.status['pv1_input_power']['value'] = 0 + self.status['pv1_input_voltage']['value'] = 0 + self.status['mppt1_charger_status'] = 'Abnormal' + charger.mppt_changed(False) + + def set_voltage(self, v: float): + with self.lock: + self.status['battery_voltage']['value'] = v + + def set_ac_current(self, amps): + with self.lock: + self.rated['max_ac_charging_current']['value'] = amps + charger.current_changed(amps) + + def set_pd(self, pd: BatteryPowerDirection): + if pd == BatteryPowerDirection.CHARGING: + val = 'Charge' + elif pd == BatteryPowerDirection.DISCHARGING: + val = 'Discharge' + else: + val = 'Do nothing' + with self.lock: + self.status['battery_power_direction'] = val + charger.pd_changed(pd) + + +logger = logging.getLogger(__name__) +evt_logger = logging.getLogger('evt') +server: Optional[InverterEmulator] = None +charger: Optional[ChargerEmulator] = None + + +def main(): + global server, charger + + # start fake inverterd server + try: + server = InverterEmulator(host=config['inverter']['host'], + port=config['inverter']['port']) + server.start() + except OSError as e: + logger.error('failed to start server') + logger.exception(e) + return + logger.info('server started') + + # start charger thread + charger = ChargerEmulator() + charger.start() + + # init inverterd wrapper + inverter.init(host=config['inverter']['host'], + port=config['inverter']['port']) + + # start monitor + mon = InverterMonitor() + mon.set_charging_event_handler(monitor_charging) + mon.set_battery_event_handler(monitor_battery) + mon.set_error_handler(monitor_error) + mon.start() + logger.info('monitor started') + + try: + InverterTestShell().cmdloop() + + server.join() + mon.join() + charger.join() + + except KeyboardInterrupt: + server.stop() + mon.stop() + charger.stop() + + +if __name__ == '__main__': + config.load('test_inverter_monitor') + main() diff --git a/test/test_record_upload.py b/test/test_record_upload.py new file mode 100755 index 0000000..a0c3faf --- /dev/null +++ b/test/test_record_upload.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +import logging +import sys +import os.path +sys.path.extend([ + os.path.realpath( + os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..') + ) +]) + +import time + +from src.home.api import WebAPIClient, RequestParams +from src.home.config import config +from src.home.media import SoundRecordClient +from src.home.util import parse_addr + +logger = logging.getLogger(__name__) + + +# record callbacks +# ---------------- + +def record_error(info: dict, userdata: dict): + node = userdata['node'] + # TODO + + +def record_finished(info: dict, fn: str, userdata: dict): + logger.info('record finished: ' + str(info)) + + node = userdata['node'] + api.upload_recording(fn, node, info['id'], int(info['start_time']), int(info['stop_time'])) + + +# api client callbacks +# -------------------- + +def api_error_handler(exc, name, req: RequestParams): + if name == 'upload_recording': + logger.error('failed to upload recording, exception below') + logger.exception(exc) + + else: + logger.error(f'api call ({name}, params={req.params}) failed, exception below') + logger.exception(exc) + + +def api_success_handler(response, name, req: RequestParams): + if name == 'upload_recording': + node = req.params['node'] + rid = req.params['record_id'] + + logger.debug(f'successfully uploaded recording (node={node}, record_id={rid}), api response:' + str(response)) + + # deleting temp file + try: + os.unlink(req.files['file']) + except OSError as exc: + logger.error(f'error while deleting temp file:') + logger.exception(exc) + + record.forget(node, rid) + + +if __name__ == '__main__': + config.load('test_record_upload') + + nodes = {} + for name, addr in config['nodes'].items(): + nodes[name] = parse_addr(addr) + record = SoundRecordClient(nodes, + error_handler=record_error, + finished_handler=record_finished, + download_on_finish=True) + + api = WebAPIClient() + api.enable_async(error_handler=api_error_handler, + success_handler=api_success_handler) + + record_id = record.record('localhost', 3, {'node': 'localhost'}) + print(f'record_id: {record_id}') + + while True: + try: + time.sleep(0.1) + except (KeyboardInterrupt, SystemExit): + break \ No newline at end of file diff --git a/test/test_send_fake_sound_hit.py b/test/test_send_fake_sound_hit.py new file mode 100755 index 0000000..9660c45 --- /dev/null +++ b/test/test_send_fake_sound_hit.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +import sys +import os.path +sys.path.extend([ + os.path.realpath( + os.path.join(os.path.dirname(os.path.join(__file__)), '..') + ) +]) + +from argparse import ArgumentParser +from src.home.util import send_datagram, stringify, parse_addr + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--name', type=str, required=True, + help='node name, like `diana`') + parser.add_argument('--hits', type=int, required=True, + help='hits count') + parser.add_argument('--server', type=str, required=True, + help='center server addr in host:port format') + + args = parser.parse_args() + + send_datagram(stringify([args.name, args.hits]), parse_addr(args.server)) diff --git a/test/test_sensors_plot.py b/test/test_sensors_plot.py new file mode 100755 index 0000000..e69de29 diff --git a/test/test_sound_node_client.py b/test/test_sound_node_client.py new file mode 100755 index 0000000..16feb78 --- /dev/null +++ b/test/test_sound_node_client.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +import sys, os.path +sys.path.extend([ + os.path.realpath(os.path.join(os.path.dirname(os.path.join(__file__)), '..')), +]) + +from src.home.api.errors import ApiResponseError +from src.home.media import SoundNodeClient + + +if __name__ == '__main__': + client = SoundNodeClient(('127.0.0.1', 8313)) + print(client.amixer_get_all()) + + try: + client.amixer_get('invalidname') + except ApiResponseError as exc: + print(exc) + diff --git a/test/test_sound_server_api.py b/test/test_sound_server_api.py new file mode 100755 index 0000000..1b4eb8b --- /dev/null +++ b/test/test_sound_server_api.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +import sys +import os.path +sys.path.extend([ + os.path.realpath( + os.path.join(os.path.dirname(os.path.join(__file__)), '..') + ) +]) +import threading + +from time import sleep +from src.home.config import config +from src.home.api import WebAPIClient +from src.home.api.types import SoundSensorLocation + +interrupted = False + + +class HitCounter: + def __init__(self): + self.sensors = {} + self.lock = threading.Lock() + self._reset_sensors() + + def _reset_sensors(self): + for loc in SoundSensorLocation: + self.sensors[loc.name.lower()] = 0 + + def add(self, name: str, hits: int): + if name not in self.sensors: + raise ValueError(f'sensor {name} not found') + + with self.lock: + self.sensors[name] += hits + + def get_all(self) -> list[tuple[str, int]]: + vals = [] + with self.lock: + for name, hits in self.sensors.items(): + if hits > 0: + vals.append((name, hits)) + self._reset_sensors() + return vals + + +def hits_sender(): + while True: + try: + all_hits = hc.get_all() + if all_hits: + api.add_sound_sensor_hits(all_hits) + sleep(5) + except (KeyboardInterrupt, SystemExit): + return + + +if __name__ == '__main__': + config.load('test_api') + + hc = HitCounter() + api = WebAPIClient() + + hc.add('spb1', 1) + # hc.add('big_house', 123) + + hits_sender() diff --git a/test/test_stopwatch.py b/test/test_stopwatch.py new file mode 100755 index 0000000..6ff2c0e --- /dev/null +++ b/test/test_stopwatch.py @@ -0,0 +1,16 @@ +from home.util import Stopwatch, StopwatchError +from time import sleep + + +if __name__ == '__main__': + s = Stopwatch() + s.go() + sleep(2) + s.pause() + s.go() + sleep(1) + print(s.get_elapsed_time()) + sleep(1) + print(s.get_elapsed_time()) + s.pause() + print(s.get_elapsed_time()) -- cgit v1.2.3