From e3d3d6b76010a6dd5c417f017339bec17fb07887 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 14 Jun 2022 02:44:43 +0300 Subject: media: refactor sound_node, introduce camera_node --- src/home/api/web_api_client.py | 4 +- src/home/audio/__init__.py | 0 src/home/audio/amixer.py | 91 ++++++++ src/home/camera/__init__.py | 1 + src/home/camera/esp32.py | 140 +++++++------ src/home/camera/types.py | 5 + src/home/http/__init__.py | 2 +- src/home/media/__init__.py | 21 ++ src/home/media/__init__.pyi | 27 +++ src/home/media/node_client.py | 120 +++++++++++ src/home/media/node_server.py | 86 ++++++++ src/home/media/record.py | 453 ++++++++++++++++++++++++++++++++++++++++ src/home/media/record_client.py | 166 +++++++++++++++ src/home/media/storage.py | 197 +++++++++++++++++ src/home/media/types.py | 13 ++ src/home/sound/__init__.py | 8 - src/home/sound/amixer.py | 91 -------- src/home/sound/node_client.py | 109 ---------- src/home/sound/record.py | 400 ----------------------------------- src/home/sound/record_client.py | 142 ------------- src/home/sound/storage.py | 155 -------------- src/home/web_api/web_api.py | 4 +- 22 files changed, 1262 insertions(+), 973 deletions(-) create mode 100644 src/home/audio/__init__.py create mode 100644 src/home/audio/amixer.py create mode 100644 src/home/camera/types.py create mode 100644 src/home/media/__init__.py create mode 100644 src/home/media/__init__.pyi create mode 100644 src/home/media/node_client.py create mode 100644 src/home/media/node_server.py create mode 100644 src/home/media/record.py create mode 100644 src/home/media/record_client.py create mode 100644 src/home/media/storage.py create mode 100644 src/home/media/types.py delete mode 100644 src/home/sound/__init__.py delete mode 100644 src/home/sound/amixer.py delete mode 100644 src/home/sound/node_client.py delete mode 100644 src/home/sound/record.py delete mode 100644 src/home/sound/record_client.py delete mode 100644 src/home/sound/storage.py (limited to 'src/home') diff --git a/src/home/api/web_api_client.py b/src/home/api/web_api_client.py index e3b0988..34d080c 100644 --- a/src/home/api/web_api_client.py +++ b/src/home/api/web_api_client.py @@ -13,7 +13,7 @@ from .errors import ApiResponseError from .types import * from ..config import config from ..util import stringify -from ..sound import RecordFile, SoundNodeClient +from ..media import RecordFile, MediaNodeClient logger = logging.getLogger(__name__) @@ -103,7 +103,7 @@ class WebAPIClient: def recordings_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]: files = self._get('recordings/list/', {'extended': int(extended)})['data'] if as_objects: - return SoundNodeClient.record_list_from_serialized(files) + return MediaNodeClient.record_list_from_serialized(files) return files def _process_sound_sensor_hits_data(self, data: list[dict]) -> list[dict]: diff --git a/src/home/audio/__init__.py b/src/home/audio/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/home/audio/amixer.py b/src/home/audio/amixer.py new file mode 100644 index 0000000..0ab2c64 --- /dev/null +++ b/src/home/audio/amixer.py @@ -0,0 +1,91 @@ +import subprocess + +from ..config import config +from threading import Lock +from typing import Union + + +_lock = Lock() +_default_step = 5 + + +def has_control(s: str) -> bool: + for control in config['amixer']['controls']: + if control['name'] == s: + return True + return False + + +def get_caps(s: str) -> list[str]: + for control in config['amixer']['controls']: + if control['name'] == s: + return control['caps'] + raise KeyError(f'control {s} not found') + + +def get_all() -> list: + controls = [] + for control in config['amixer']['controls']: + controls.append({ + 'name': control['name'], + 'info': get(control['name']), + 'caps': control['caps'] + }) + return controls + + +def get(control: str): + return call('get', control) + + +def mute(control): + return call('set', control, 'mute') + + +def unmute(control): + return call('set', control, 'unmute') + + +def cap(control): + return call('set', control, 'cap') + + +def nocap(control): + return call('set', control, 'nocap') + + +def _get_default_step() -> int: + if 'step' in config['amixer']: + return int(config['amixer']['step']) + + return _default_step + + +def incr(control, step=None): + if step is None: + step = _get_default_step() + return call('set', control, f'{step}%+') + + +def decr(control, step=None): + if step is None: + step = _get_default_step() + return call('set', control, f'{step}%-') + + +def call(*args, return_code=False) -> Union[int, str]: + with _lock: + result = subprocess.run([config['amixer']['bin'], *args], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if return_code: + return result.returncode + + if result.returncode != 0: + raise AmixerError(result.stderr.decode().strip()) + + return result.stdout.decode().strip() + + +class AmixerError(OSError): + pass diff --git a/src/home/camera/__init__.py b/src/home/camera/__init__.py index e69de29..626930b 100644 --- a/src/home/camera/__init__.py +++ b/src/home/camera/__init__.py @@ -0,0 +1 @@ +from .types import CameraType \ No newline at end of file diff --git a/src/home/camera/esp32.py b/src/home/camera/esp32.py index 246022b..fe6de0e 100644 --- a/src/home/camera/esp32.py +++ b/src/home/camera/esp32.py @@ -1,10 +1,12 @@ import logging -import shutil import requests import json +import asyncio +import aioshutil +from io import BytesIO +from functools import partial from typing import Union, Optional -from time import sleep from enum import Enum from ..api.errors import ApiResponseError from ..util import Addr @@ -41,14 +43,15 @@ def _assert_bounds(n: int, min: int, max: int): class WebClient: - def __init__(self, addr: Addr): + def __init__(self, + addr: Addr): self.endpoint = f'http://{addr[0]}:{addr[1]}' self.logger = logging.getLogger(self.__class__.__name__) self.delay = 0 self.isfirstrequest = True - def syncsettings(self, settings) -> bool: - status = self.getstatus() + async def syncsettings(self, settings) -> bool: + status = await self.getstatus() self.logger.debug(f'syncsettings: status={status}') changed_anything = False @@ -82,7 +85,7 @@ class WebClient: func = getattr(self, f'set{name}') self.logger.debug(f'syncsettings: calling set{name}({value})') - func(value) + await func(value) changed_anything = True except AttributeError as exc: @@ -94,99 +97,106 @@ class WebClient: def setdelay(self, delay: int): self.delay = delay - def capture(self, save_to: str): - self._call('capture', save_to=save_to) + async def capture(self, output: Optional[str] = None) -> Union[BytesIO, bool]: + kw = {} + if output: + kw['save_to'] = output + else: + kw['as_bytes'] = True + return await self._call('capture', **kw) - def getstatus(self): - return json.loads(self._call('status')) + async def getstatus(self): + return json.loads(await self._call('status')) - def setflash(self, enable: bool): - self._control('flash', int(enable)) + async def setflash(self, enable: bool): + await self._control('flash', int(enable)) - def setframesize(self, fs: Union[int, FrameSize]): + async def setframesize(self, fs: Union[int, FrameSize]): if type(fs) is int: fs = FrameSize(fs) - self._control('framesize', fs.value) + await self._control('framesize', fs.value) - def sethmirror(self, enable: bool): - self._control('hmirror', int(enable)) + async def sethmirror(self, enable: bool): + await self._control('hmirror', int(enable)) - def setvflip(self, enable: bool): - self._control('vflip', int(enable)) + async def setvflip(self, enable: bool): + await self._control('vflip', int(enable)) - def setawb(self, enable: bool): - self._control('awb', int(enable)) + async def setawb(self, enable: bool): + await self._control('awb', int(enable)) - def setawbgain(self, enable: bool): - self._control('awb_gain', int(enable)) + async def setawbgain(self, enable: bool): + await self._control('awb_gain', int(enable)) - def setwbmode(self, mode: WBMode): - self._control('wb_mode', mode.value) + async def setwbmode(self, mode: WBMode): + await self._control('wb_mode', mode.value) - def setaecsensor(self, enable: bool): - self._control('aec', int(enable)) + async def setaecsensor(self, enable: bool): + await self._control('aec', int(enable)) - def setaecdsp(self, enable: bool): - self._control('aec2', int(enable)) + async def setaecdsp(self, enable: bool): + await self._control('aec2', int(enable)) - def setagc(self, enable: bool): - self._control('agc', int(enable)) + async def setagc(self, enable: bool): + await self._control('agc', int(enable)) - def setagcgain(self, gain: int): + async def setagcgain(self, gain: int): _assert_bounds(gain, 1, 31) - self._control('agc_gain', gain) + await self._control('agc_gain', gain) - def setgainceiling(self, gainceiling: int): + async def setgainceiling(self, gainceiling: int): _assert_bounds(gainceiling, 2, 128) - self._control('gainceiling', gainceiling) + await self._control('gainceiling', gainceiling) - def setbpc(self, enable: bool): - self._control('bpc', int(enable)) + async def setbpc(self, enable: bool): + await self._control('bpc', int(enable)) - def setwpc(self, enable: bool): - self._control('wpc', int(enable)) + async def setwpc(self, enable: bool): + await self._control('wpc', int(enable)) - def setrawgma(self, enable: bool): - self._control('raw_gma', int(enable)) + async def setrawgma(self, enable: bool): + await self._control('raw_gma', int(enable)) - def setlenscorrection(self, enable: bool): - self._control('lenc', int(enable)) + async def setlenscorrection(self, enable: bool): + await self._control('lenc', int(enable)) - def setdcw(self, enable: bool): - self._control('dcw', int(enable)) + async def setdcw(self, enable: bool): + await self._control('dcw', int(enable)) - def setcolorbar(self, enable: bool): - self._control('colorbar', int(enable)) + async def setcolorbar(self, enable: bool): + await self._control('colorbar', int(enable)) - def setquality(self, q: int): + async def setquality(self, q: int): _assert_bounds(q, 4, 63) - self._control('quality', q) + await self._control('quality', q) - def setbrightness(self, brightness: int): + async def setbrightness(self, brightness: int): _assert_bounds(brightness, -2, -2) - self._control('brightness', brightness) + await self._control('brightness', brightness) - def setcontrast(self, contrast: int): + async def setcontrast(self, contrast: int): _assert_bounds(contrast, -2, 2) - self._control('contrast', contrast) + await self._control('contrast', contrast) - def setsaturation(self, saturation: int): + async def setsaturation(self, saturation: int): _assert_bounds(saturation, -2, 2) - self._control('saturation', saturation) + await self._control('saturation', saturation) - def _control(self, var: str, value: Union[int, str]): - self._call('control', params={'var': var, 'val': value}) + async def _control(self, var: str, value: Union[int, str]): + return await self._call('control', params={'var': var, 'val': value}) - def _call(self, - method: str, - params: Optional[dict] = None, - save_to: Optional[str] = None): + async def _call(self, + method: str, + params: Optional[dict] = None, + save_to: Optional[str] = None, + as_bytes=False) -> Union[str, bool, BytesIO]: + loop = asyncio.get_event_loop() if not self.isfirstrequest and self.delay > 0: sleeptime = self.delay / 1000 self.logger.debug(f'sleeping for {sleeptime}') - sleep(sleeptime) + await asyncio.sleep(sleeptime) self.isfirstrequest = False @@ -199,14 +209,18 @@ class WebClient: if save_to: kwargs['stream'] = True - r = requests.get(url, **kwargs) + r = await loop.run_in_executor(None, + partial(requests.get, url, **kwargs)) if r.status_code != 200: raise ApiResponseError(status_code=r.status_code) + if as_bytes: + return BytesIO(r.content) + if save_to: r.raise_for_status() with open(save_to, 'wb') as f: - shutil.copyfileobj(r.raw, f) + await aioshutil.copyfileobj(r.raw, f) return True return r.text diff --git a/src/home/camera/types.py b/src/home/camera/types.py new file mode 100644 index 0000000..de59022 --- /dev/null +++ b/src/home/camera/types.py @@ -0,0 +1,5 @@ +from enum import Enum + + +class CameraType(Enum): + ESP32 = 'esp32' diff --git a/src/home/http/__init__.py b/src/home/http/__init__.py index 2597457..963e13c 100644 --- a/src/home/http/__init__.py +++ b/src/home/http/__init__.py @@ -1,2 +1,2 @@ from .http import serve, ok, routes, HTTPServer -from aiohttp.web import FileResponse, Request +from aiohttp.web import FileResponse, StreamResponse, Request \ No newline at end of file diff --git a/src/home/media/__init__.py b/src/home/media/__init__.py new file mode 100644 index 0000000..e8268cf --- /dev/null +++ b/src/home/media/__init__.py @@ -0,0 +1,21 @@ +import importlib +import itertools + +__map__ = { + 'types': ['MediaNodeType'], + 'record_client': ['SoundRecordClient', 'CameraRecordClient', 'RecordClient'], + 'node_server': ['MediaNodeServer'], + 'node_client': ['SoundNodeClient', 'CameraNodeClient', 'MediaNodeClient'], + 'storage': ['SoundRecordStorage', 'CameraRecordStorage', 'SoundRecordFile', 'CameraRecordFile', 'RecordFile'], + 'record': ['SoundRecorder', 'CameraRecorder'] +} + +__all__ = list(itertools.chain(*__map__.values())) + +def __getattr__(name): + if name in __all__: + for file, names in __map__.items(): + if name in names: + module = importlib.import_module(f'.{file}', __name__) + return getattr(module, name) + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/home/media/__init__.pyi b/src/home/media/__init__.pyi new file mode 100644 index 0000000..0e85cde --- /dev/null +++ b/src/home/media/__init__.pyi @@ -0,0 +1,27 @@ +from .types import ( + MediaNodeType as MediaNodeType +) +from .record_client import ( + SoundRecordClient as SoundRecordClient, + CameraRecordClient as CameraRecordClient, + RecordClient as RecordClient +) +from .node_server import ( + MediaNodeServer as MediaNodeServer +) +from .node_client import ( + SoundNodeClient as SoundNodeClient, + CameraNodeClient as CameraNodeClient, + MediaNodeClient as MediaNodeClient +) +from .storage import ( + SoundRecordStorage as SoundRecordStorage, + CameraRecordStorage as CameraRecordStorage, + SoundRecordFile as SoundRecordFile, + CameraRecordFile as CameraRecordFile, + RecordFile as RecordFile +) +from .record import ( + SoundRecorder as SoundRecorder, + CameraRecorder as CameraRecorder +) \ No newline at end of file diff --git a/src/home/media/node_client.py b/src/home/media/node_client.py new file mode 100644 index 0000000..18f0779 --- /dev/null +++ b/src/home/media/node_client.py @@ -0,0 +1,120 @@ +import requests +import shutil +import logging + +from typing import Optional, Union +from .storage import RecordFile +from ..util import Addr +from ..camera.types import CameraType +from ..api.errors import ApiResponseError + + +class MediaNodeClient: + def __init__(self, addr: Addr): + self.endpoint = f'http://{addr[0]}:{addr[1]}' + self.logger = logging.getLogger(self.__class__.__name__) + + def record(self, duration: int): + return self._call('record/', params={"duration": duration}) + + def record_info(self, record_id: int): + return self._call(f'record/info/{record_id}/') + + def record_forget(self, record_id: int): + return self._call(f'record/forget/{record_id}/') + + def record_download(self, record_id: int, output: str): + return self._call(f'record/download/{record_id}/', save_to=output) + + def storage_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]: + r = self._call('storage/list/', params={'extended': int(extended)}) + files = r['files'] + if as_objects: + return self.record_list_from_serialized(files) + return files + + @staticmethod + def record_list_from_serialized(files: Union[list[str], list[dict]]): + new_files = [] + for f in files: + kwargs = {'remote': True} + if isinstance(f, dict): + name = f['filename'] + kwargs['remote_filesize'] = f['filesize'] + else: + name = f + item = RecordFile.create(name, **kwargs) + new_files.append(item) + return new_files + + def storage_delete(self, file_id: str): + return self._call('storage/delete/', params={'file_id': file_id}) + + def storage_download(self, file_id: str, output: str): + return self._call('storage/download/', params={'file_id': file_id}, save_to=output) + + def _call(self, + method: str, + params: dict = None, + save_to: Optional[str] = None): + kwargs = {} + if isinstance(params, dict): + kwargs['params'] = params + if save_to: + kwargs['stream'] = True + + url = f'{self.endpoint}/{method}' + self.logger.debug(f'calling {url}, kwargs: {kwargs}') + + r = requests.get(url, **kwargs) + if r.status_code != 200: + response = r.json() + raise ApiResponseError(status_code=r.status_code, + error_type=response['error'], + error_message=response['message'] or None, + error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None) + + if save_to: + r.raise_for_status() + with open(save_to, 'wb') as f: + shutil.copyfileobj(r.raw, f) + return True + + return r.json()['response'] + + +class SoundNodeClient(MediaNodeClient): + def amixer_get_all(self): + return self._call('amixer/get-all/') + + def amixer_get(self, control: str): + return self._call(f'amixer/get/{control}/') + + def amixer_incr(self, control: str, step: Optional[int] = None): + params = {'step': step} if step is not None else None + return self._call(f'amixer/incr/{control}/', params=params) + + def amixer_decr(self, control: str, step: Optional[int] = None): + params = {'step': step} if step is not None else None + return self._call(f'amixer/decr/{control}/', params=params) + + def amixer_mute(self, control: str): + return self._call(f'amixer/mute/{control}/') + + def amixer_unmute(self, control: str): + return self._call(f'amixer/unmute/{control}/') + + def amixer_cap(self, control: str): + return self._call(f'amixer/cap/{control}/') + + def amixer_nocap(self, control: str): + return self._call(f'amixer/nocap/{control}/') + + +class CameraNodeClient(MediaNodeClient): + def capture(self, + save_to: str, + with_flash: bool = False): + return self._call('capture/', + {'with_flash': int(with_flash)}, + save_to=save_to) diff --git a/src/home/media/node_server.py b/src/home/media/node_server.py new file mode 100644 index 0000000..5d0803c --- /dev/null +++ b/src/home/media/node_server.py @@ -0,0 +1,86 @@ +from .. import http +from .record import Recorder +from .types import RecordStatus +from .storage import RecordStorage + + +class MediaNodeServer(http.HTTPServer): + recorder: Recorder + storage: RecordStorage + + def __init__(self, + recorder: Recorder, + storage: RecordStorage, + *args, **kwargs): + super().__init__(*args, **kwargs) + + self.recorder = recorder + self.storage = storage + + self.get('/record/', self.do_record) + self.get('/record/info/{id}/', self.record_info) + self.get('/record/forget/{id}/', self.record_forget) + self.get('/record/download/{id}/', self.record_download) + + self.get('/storage/list/', self.storage_list) + self.get('/storage/delete/', self.storage_delete) + self.get('/storage/download/', self.storage_download) + + async def do_record(self, request: http.Request): + duration = int(request.query['duration']) + max = Recorder.get_max_record_time()*15 + if not 0 < duration <= max: + raise ValueError(f'invalid duration: max duration is {max}') + + record_id = self.recorder.record(duration) + return http.ok({'id': record_id}) + + async def record_info(self, request: http.Request): + record_id = int(request.match_info['id']) + info = self.recorder.get_info(record_id) + return http.ok(info.as_dict()) + + async def record_forget(self, request: http.Request): + record_id = int(request.match_info['id']) + + info = self.recorder.get_info(record_id) + assert info.status in (RecordStatus.FINISHED, RecordStatus.ERROR), f"can't forget: record status is {info.status}" + + self.recorder.forget(record_id) + return http.ok() + + async def record_download(self, request: http.Request): + record_id = int(request.match_info['id']) + + info = self.recorder.get_info(record_id) + assert info.status == RecordStatus.FINISHED, f"record status is {info.status}" + + return http.FileResponse(info.file.path) + + async def storage_list(self, request: http.Request): + extended = 'extended' in request.query and int(request.query['extended']) == 1 + + files = self.storage.getfiles(as_objects=extended) + if extended: + files = list(map(lambda file: file.__dict__(), files)) + + return http.ok({ + 'files': files + }) + + async def storage_delete(self, request: http.Request): + file_id = request.query['file_id'] + file = self.storage.find(file_id) + if not file: + raise ValueError(f'file {file} not found') + + self.storage.delete(file) + return http.ok() + + async def storage_download(self, request): + file_id = request.query['file_id'] + file = self.storage.find(file_id) + if not file: + raise ValueError(f'file {file} not found') + + return http.FileResponse(file.path) diff --git a/src/home/media/record.py b/src/home/media/record.py new file mode 100644 index 0000000..d3abfbb --- /dev/null +++ b/src/home/media/record.py @@ -0,0 +1,453 @@ +import os +import threading +import logging +import time +import subprocess +import signal + +from typing import Optional +from ..util import find_child_processes, Addr +from ..config import config +from .storage import RecordFile, RecordStorage +from .types import RecordStatus +from ..camera.types import CameraType + + +_history_item_timeout = 7200 +_history_cleanup_freq = 3600 + + +class RecordHistoryItem: + id: int + request_time: float + start_time: float + stop_time: float + relations: list[int] + status: RecordStatus + error: Optional[Exception] + file: Optional[RecordFile] + creation_time: float + + def __init__(self, id): + self.id = id + self.request_time = 0 + self.start_time = 0 + self.stop_time = 0 + self.relations = [] + self.status = RecordStatus.WAITING + self.file = None + self.error = None + self.creation_time = time.time() + + def add_relation(self, related_id: int): + self.relations.append(related_id) + + def mark_started(self, start_time: float): + self.start_time = start_time + self.status = RecordStatus.RECORDING + + def mark_finished(self, end_time: float, file: RecordFile): + self.stop_time = end_time + self.file = file + self.status = RecordStatus.FINISHED + + def mark_failed(self, error: Exception): + self.status = RecordStatus.ERROR + self.error = error + + def as_dict(self) -> dict: + data = { + 'id': self.id, + 'request_time': self.request_time, + 'status': self.status.value, + 'relations': self.relations, + 'start_time': self.start_time, + 'stop_time': self.stop_time, + } + if self.error: + data['error'] = str(self.error) + if self.file: + data['file'] = self.file.__dict__() + return data + + +class RecordingNotFoundError(Exception): + pass + + +class RecordHistory: + history: dict[int, RecordHistoryItem] + + def __init__(self): + self.history = {} + self.logger = logging.getLogger(self.__class__.__name__) + + def add(self, record_id: int): + self.logger.debug(f'add: record_id={record_id}') + + r = RecordHistoryItem(record_id) + r.request_time = time.time() + + self.history[record_id] = r + + def delete(self, record_id: int): + self.logger.debug(f'delete: record_id={record_id}') + del self.history[record_id] + + def cleanup(self): + del_ids = [] + for rid, item in self.history.items(): + if item.creation_time < time.time()-_history_item_timeout: + del_ids.append(rid) + for rid in del_ids: + self.delete(rid) + + def __getitem__(self, key): + if key not in self.history: + raise RecordingNotFoundError() + + return self.history[key] + + def __setitem__(self, key, value): + raise NotImplementedError('setting history item this way is prohibited') + + def __contains__(self, key): + return key in self.history + + +class Recording: + RECORDER_PROGRAM = None + + start_time: float + stop_time: float + duration: int + record_id: int + recorder_program_pid: Optional[int] + process: Optional[subprocess.Popen] + + g_record_id = 1 + + def __init__(self): + if self.RECORDER_PROGRAM is None: + raise RuntimeError('this is abstract class') + + self.start_time = 0 + self.stop_time = 0 + self.duration = 0 + self.process = None + self.recorder_program_pid = None + self.record_id = Recording.next_id() + self.logger = logging.getLogger(self.__class__.__name__) + + def is_started(self) -> bool: + return self.start_time > 0 and self.stop_time > 0 + + def is_waiting(self): + return self.duration > 0 + + def ask_for(self, duration) -> int: + overtime = 0 + orig_duration = duration + + if self.is_started(): + already_passed = time.time() - self.start_time + max_duration = Recorder.get_max_record_time() - already_passed + self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}') + else: + max_duration = Recorder.get_max_record_time() + + if duration > max_duration: + overtime = duration - max_duration + duration = max_duration + + self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}') + + self.duration += duration + if self.is_started(): + til_end = self.stop_time - time.time() + if til_end < 0: + til_end = 0 + + _prev_stop_time = self.stop_time + _to_add = duration - til_end + if _to_add < 0: + _to_add = 0 + + self.stop_time += _to_add + self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})') + + return overtime + + def start(self, output: str): + assert self.start_time == 0 and self.stop_time == 0, "already started?!" + assert self.process is None, "self.process is not None, what the hell?" + + cur = time.time() + self.start_time = cur + self.stop_time = cur + self.duration + + cmd = self.get_command(output) + self.logger.debug(f'start: running `{cmd}`') + self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True) + + sh_pid = self.process.pid + self.logger.debug(f'start: started, pid of shell is {sh_pid}') + + pid = self.find_recorder_program_pid(sh_pid) + if pid is not None: + self.recorder_program_pid = pid + self.logger.debug(f'start: pid of {self.RECORDER_PROGRAM} is {pid}') + + def get_command(self, output: str) -> str: + pass + + def stop(self): + if self.process: + if self.recorder_program_pid is None: + self.recorder_program_pid = self.find_recorder_program_pid(self.process.pid) + + if self.recorder_program_pid is not None: + os.kill(self.recorder_program_pid, signal.SIGINT) + timeout = config['node']['process_wait_timeout'] + + self.logger.debug(f'stop: sent SIGINT to {self.recorder_program_pid}. now waiting up to {timeout} seconds...') + try: + self.process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + self.logger.warning(f'stop: wait({timeout}): timeout expired, calling terminate()') + self.process.terminate() + else: + self.logger.warning(f'stop: pid of {self.RECORDER_PROGRAM} is unknown, calling terminate()') + self.process.terminate() + + rc = self.process.returncode + self.logger.debug(f'stop: rc={rc}') + + self.process = None + self.recorder_program_pid = 0 + + self.duration = 0 + self.start_time = 0 + self.stop_time = 0 + + def find_recorder_program_pid(self, sh_pid: int): + try: + children = find_child_processes(sh_pid) + except OSError as exc: + self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc)) + return None + + for child in children: + if self.RECORDER_PROGRAM in child.cmd: + return child.pid + + return None + + @staticmethod + def next_id() -> int: + cur_id = Recording.g_record_id + Recording.g_record_id += 1 + return cur_id + + def increment_id(self): + self.record_id = Recording.next_id() + + +class Recorder: + TEMP_NAME = None + + interrupted: bool + lock: threading.Lock + history_lock: threading.Lock + recording: Optional[Recording] + overtime: int + history: RecordHistory + next_history_cleanup_time: float + storage: RecordStorage + + def __init__(self, + storage: RecordStorage, + recording: Recording): + if self.TEMP_NAME is None: + raise RuntimeError('this is abstract class') + + self.storage = storage + self.recording = recording + self.interrupted = False + self.lock = threading.Lock() + self.history_lock = threading.Lock() + self.overtime = 0 + self.history = RecordHistory() + self.next_history_cleanup_time = 0 + self.logger = logging.getLogger(self.__class__.__name__) + + def start_thread(self): + t = threading.Thread(target=self.loop) + t.daemon = True + t.start() + + def loop(self) -> None: + tempname = os.path.join(self.storage.root, self.TEMP_NAME) + + while not self.interrupted: + cur = time.time() + stopped = False + cur_record_id = None + + if self.next_history_cleanup_time == 0: + self.next_history_cleanup_time = time.time() + _history_cleanup_freq + elif self.next_history_cleanup_time <= time.time(): + self.logger.debug('loop: calling history.cleanup()') + try: + self.history.cleanup() + except Exception as e: + self.logger.error('loop: error while history.cleanup(): ' + str(e)) + self.next_history_cleanup_time = time.time() + _history_cleanup_freq + + with self.lock: + cur_record_id = self.recording.record_id + # self.logger.debug(f'cur_record_id={cur_record_id}') + + if not self.recording.is_started(): + if self.recording.is_waiting(): + try: + if os.path.exists(tempname): + self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..') + try: + os.unlink(tempname) + except OSError as e: + self.logger.exception(e) + self.recording.start(tempname) + with self.history_lock: + self.history[cur_record_id].mark_started(self.recording.start_time) + except Exception as exc: + self.logger.exception(exc) + + # there should not be any errors, but still.. + try: + self.recording.stop() + except Exception as exc: + self.logger.exception(exc) + + with self.history_lock: + self.history[cur_record_id].mark_failed(exc) + + self.logger.debug(f'loop: start exc path: calling increment_id()') + self.recording.increment_id() + else: + if cur >= self.recording.stop_time: + try: + start_time = self.recording.start_time + stop_time = self.recording.stop_time + self.recording.stop() + + saved_name = self.storage.save(tempname, + record_id=cur_record_id, + start_time=int(start_time), + stop_time=int(stop_time)) + + with self.history_lock: + self.history[cur_record_id].mark_finished(stop_time, saved_name) + except Exception as exc: + self.logger.exception(exc) + with self.history_lock: + self.history[cur_record_id].mark_failed(exc) + finally: + self.logger.debug(f'loop: stop exc final path: calling increment_id()') + self.recording.increment_id() + + stopped = True + + if stopped and self.overtime > 0: + self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})') + _overtime = self.overtime + self.overtime = 0 + + related_id = self.record(_overtime) + self.logger.info(f'enqueued another record with id {related_id}') + + if cur_record_id is not None: + with self.history_lock: + self.history[cur_record_id].add_relation(related_id) + + time.sleep(0.2) + + def record(self, duration: int) -> int: + self.logger.debug(f'record: duration={duration}') + with self.lock: + overtime = self.recording.ask_for(duration) + self.logger.debug(f'overtime={overtime}') + + if overtime > self.overtime: + self.overtime = overtime + + if not self.recording.is_started(): + with self.history_lock: + self.history.add(self.recording.record_id) + + return self.recording.record_id + + def stop(self): + self.interrupted = True + + def get_info(self, record_id: int) -> RecordHistoryItem: + with self.history_lock: + return self.history[record_id] + + def forget(self, record_id: int): + with self.history_lock: + self.logger.info(f'forget: removing record {record_id} from history') + self.history.delete(record_id) + + @staticmethod + def get_max_record_time() -> int: + return config['node']['record_max_time'] + + +class SoundRecorder(Recorder): + TEMP_NAME = 'temp.mp3' + + def __init__(self, *args, **kwargs): + super().__init__(recording=SoundRecording(), + *args, **kwargs) + + +class CameraRecorder(Recorder): + TEMP_NAME = 'temp.mp4' + + def __init__(self, + camera_type: CameraType, + *args, **kwargs): + if camera_type == CameraType.ESP32: + recording = ESP32CameraRecording(stream_addr=kwargs['stream_addr']) + del kwargs['stream_addr'] + else: + raise RuntimeError(f'unsupported camera type {camera_type}') + + super().__init__(recording=recording, + *args, **kwargs) + + +class SoundRecording(Recording): + RECORDER_PROGRAM = 'arecord' + + def get_command(self, output: str) -> str: + arecord = config['arecord']['bin'] + lame = config['lame']['bin'] + b = config['lame']['bitrate'] + + return f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null' + + +class ESP32CameraRecording(Recording): + RECORDER_PROGRAM = 'esp32_capture.py' + + stream_addr: Addr + + def __init__(self, stream_addr: Addr): + super().__init__() + self.stream_addr = stream_addr + + def get_command(self, output: str) -> str: + bin = config['esp32_capture']['bin'] + return f'{bin} --addr {self.stream_addr[0]}:{self.stream_addr[1]} --output-directory {output} >/dev/null 2>/dev/null' \ No newline at end of file diff --git a/src/home/media/record_client.py b/src/home/media/record_client.py new file mode 100644 index 0000000..f264155 --- /dev/null +++ b/src/home/media/record_client.py @@ -0,0 +1,166 @@ +import time +import logging +import threading +import os.path + +from tempfile import gettempdir +from .record import RecordStatus +from .node_client import SoundNodeClient, MediaNodeClient, CameraNodeClient +from ..util import Addr +from typing import Optional, Callable + + +class RecordClient: + DOWNLOAD_EXTENSION = None + + interrupted: bool + logger: logging.Logger + clients: dict[str, MediaNodeClient] + awaiting: dict[str, dict[int, Optional[dict]]] + error_handler: Optional[Callable] + finished_handler: Optional[Callable] + download_on_finish: bool + + def __init__(self, + nodes: dict[str, Addr], + error_handler: Optional[Callable] = None, + finished_handler: Optional[Callable] = None, + download_on_finish=False): + if self.DOWNLOAD_EXTENSION is None: + raise RuntimeError('this is abstract class') + + self.interrupted = False + self.logger = logging.getLogger(self.__class__.__name__) + self.clients = {} + self.awaiting = {} + + self.download_on_finish = download_on_finish + self.error_handler = error_handler + self.finished_handler = finished_handler + + self.awaiting_lock = threading.Lock() + + self.make_clients(nodes) + + try: + t = threading.Thread(target=self.loop) + t.daemon = True + t.start() + except (KeyboardInterrupt, SystemExit) as exc: + self.stop() + self.logger.exception(exc) + + def make_clients(self, nodes: dict[str, Addr]): + pass + + def stop(self): + self.interrupted = True + + def loop(self): + while not self.interrupted: + for node in self.awaiting.keys(): + with self.awaiting_lock: + record_ids = list(self.awaiting[node].keys()) + if not record_ids: + continue + + self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}') + + cl = self.getclient(node) + del_ids = [] + for rid in record_ids: + info = cl.record_info(rid) + + if info['relations']: + for relid in info['relations']: + self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True) + + status = RecordStatus(info['status']) + if status in (RecordStatus.FINISHED, RecordStatus.ERROR): + if status == RecordStatus.FINISHED: + if self.download_on_finish: + local_fn = self.download(node, rid, info['file']['fileid']) + else: + local_fn = None + self._report_finished(info, local_fn, self.awaiting[node][rid]) + else: + self._report_error(info, self.awaiting[node][rid]) + del_ids.append(rid) + self.logger.debug(f'record {rid}: status {status}') + + if del_ids: + self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list') + with self.awaiting_lock: + for del_id in del_ids: + del self.awaiting[node][del_id] + + time.sleep(5) + + self.logger.info('loop ended') + + def getclient(self, node: str): + return self.clients[node] + + def record(self, + node: str, + duration: int, + userdata: Optional[dict] = None) -> int: + self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}') + + cl = self.getclient(node) + record_id = cl.record(duration)['id'] + self.logger.debug(f'record: request sent, record_id={record_id}') + + self.wait_for_record(node, record_id, userdata) + return record_id + + def wait_for_record(self, + node: str, + record_id: int, + userdata: Optional[dict] = None, + is_relative=False): + with self.awaiting_lock: + if record_id not in self.awaiting[node]: + msg = f'wait_for_record: adding {record_id} to {node}' + if is_relative: + msg += ' (by relation)' + self.logger.debug(msg) + + self.awaiting[node][record_id] = userdata + + def download(self, node: str, record_id: int, fileid: str): + dst = os.path.join(gettempdir(), f'{node}_{fileid}.{self.DOWNLOAD_EXTENSION}') + cl = self.getclient(node) + cl.record_download(record_id, dst) + return dst + + def forget(self, node: str, rid: int): + self.getclient(node).record_forget(rid) + + def _report_finished(self, *args): + if self.finished_handler: + self.finished_handler(*args) + + def _report_error(self, *args): + if self.error_handler: + self.error_handler(*args) + + +class SoundRecordClient(RecordClient): + DOWNLOAD_EXTENSION = 'mp3' + # clients: dict[str, SoundNodeClient] + + def make_clients(self, nodes: dict[str, Addr]): + for node, addr in nodes.items(): + self.clients[node] = SoundNodeClient(addr) + self.awaiting[node] = {} + + +class CameraRecordClient(RecordClient): + DOWNLOAD_EXTENSION = 'mp4' + # clients: dict[str, CameraNodeClient] + + def make_clients(self, nodes: dict[str, Addr]): + for node, addr in nodes.items(): + self.clients[node] = CameraNodeClient(addr) + self.awaiting[node] = {} \ No newline at end of file diff --git a/src/home/media/storage.py b/src/home/media/storage.py new file mode 100644 index 0000000..880b899 --- /dev/null +++ b/src/home/media/storage.py @@ -0,0 +1,197 @@ +import os +import re +import shutil +import logging + +from typing import Optional, Union +from datetime import datetime +from ..util import strgen + +logger = logging.getLogger(__name__) + + +# record file +# ----------- + +class RecordFile: + EXTENSION = None + + start_time: Optional[datetime] + stop_time: Optional[datetime] + record_id: Optional[int] + name: str + file_id: Optional[str] + remote: bool + remote_filesize: int + storage_root: str + + human_date_dmt = '%d.%m.%y' + human_time_fmt = '%H:%M:%S' + + @staticmethod + def create(filename: str, *args, **kwargs): + if filename.endswith(f'.{SoundRecordFile.EXTENSION}'): + return SoundRecordFile(filename, *args, **kwargs) + elif filename.endswith(f'.{CameraRecordFile.EXTENSION}'): + return CameraRecordFile(filename, *args, **kwargs) + else: + raise RuntimeError(f'unsupported file extension: {filename}') + + def __init__(self, filename: str, remote=False, remote_filesize=None, storage_root='/'): + if self.EXTENSION is None: + raise RuntimeError('this is abstract class') + + self.name = filename + self.storage_root = storage_root + + self.remote = remote + self.remote_filesize = remote_filesize + + m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.'+self.EXTENSION+'$', filename) + if m: + self.start_time = datetime.strptime(m.group(1), RecordStorage.time_fmt) + self.stop_time = datetime.strptime(m.group(2), RecordStorage.time_fmt) + self.record_id = int(m.group(3)) + self.file_id = (m.group(1) + '_' + m.group(2)).replace('-', '_') + else: + logger.warning(f'unexpected filename: {filename}') + self.start_time = None + self.stop_time = None + self.record_id = None + self.file_id = None + + @property + def path(self): + if self.remote: + return RuntimeError('remote recording, can\'t get real path') + + return os.path.realpath(os.path.join( + self.storage_root, self.name + )) + + @property + def start_humantime(self) -> str: + if self.start_time is None: + return '?' + fmt = f'{RecordFile.human_date_dmt} {RecordFile.human_time_fmt}' + return self.start_time.strftime(fmt) + + @property + def stop_humantime(self) -> str: + if self.stop_time is None: + return '?' + fmt = RecordFile.human_time_fmt + if self.start_time.date() != self.stop_time.date(): + fmt = f'{RecordFile.human_date_dmt} {fmt}' + return self.stop_time.strftime(fmt) + + @property + def start_unixtime(self) -> int: + if self.start_time is None: + return 0 + return int(self.start_time.timestamp()) + + @property + def stop_unixtime(self) -> int: + if self.stop_time is None: + return 0 + return int(self.stop_time.timestamp()) + + @property + def filesize(self): + if self.remote: + if self.remote_filesize is None: + raise RuntimeError('file is remote and remote_filesize is not set') + return self.remote_filesize + return os.path.getsize(self.path) + + def __dict__(self) -> dict: + return { + 'start_unixtime': self.start_unixtime, + 'stop_unixtime': self.stop_unixtime, + 'filename': self.name, + 'filesize': self.filesize, + 'fileid': self.file_id, + 'record_id': self.record_id or 0, + } + + +class SoundRecordFile(RecordFile): + EXTENSION = 'mp3' + + +class CameraRecordFile(RecordFile): + EXTENSION = 'mp4' + + +# record storage +# -------------- + +class RecordStorage: + EXTENSION = None + + time_fmt = '%d%m%y-%H%M%S' + + def __init__(self, root: str): + if self.EXTENSION is None: + raise RuntimeError('this is abstract class') + + self.root = root + + def getfiles(self, as_objects=False) -> Union[list[str], list[RecordFile]]: + files = [] + for name in os.listdir(self.root): + path = os.path.join(self.root, name) + if os.path.isfile(path) and name.endswith(f'.{self.EXTENSION}'): + files.append(name if not as_objects else RecordFile.create(name, storage_root=self.root)) + return files + + def find(self, file_id: str) -> Optional[RecordFile]: + for name in os.listdir(self.root): + if os.path.isfile(os.path.join(self.root, name)) and name.endswith(f'.{self.EXTENSION}'): + item = RecordFile.create(name, storage_root=self.root) + if item.file_id == file_id: + return item + return None + + def purge(self): + files = self.getfiles() + if files: + logger = logging.getLogger(self.__name__) + for f in files: + try: + path = os.path.join(self.root, f) + logger.debug(f'purge: deleting {path}') + os.unlink(path) + except OSError as exc: + logger.exception(exc) + + def delete(self, file: RecordFile): + os.unlink(file.path) + + def save(self, + fn: str, + record_id: int, + start_time: int, + stop_time: int) -> RecordFile: + + start_time_s = datetime.fromtimestamp(start_time).strftime(self.time_fmt) + stop_time_s = datetime.fromtimestamp(stop_time).strftime(self.time_fmt) + + dst_fn = f'{start_time_s}_{stop_time_s}_id{record_id}' + if os.path.exists(os.path.join(self.root, dst_fn)): + dst_fn += strgen(4) + dst_fn += f'.{self.EXTENSION}' + dst_path = os.path.join(self.root, dst_fn) + + shutil.move(fn, dst_path) + return RecordFile.create(dst_fn, storage_root=self.root) + + +class SoundRecordStorage(RecordStorage): + EXTENSION = 'mp3' + + +class CameraRecordStorage(RecordStorage): + EXTENSION = 'mp4' + diff --git a/src/home/media/types.py b/src/home/media/types.py new file mode 100644 index 0000000..acbc291 --- /dev/null +++ b/src/home/media/types.py @@ -0,0 +1,13 @@ +from enum import Enum, auto + + +class MediaNodeType(Enum): + SOUND = auto() + CAMERA = auto() + + +class RecordStatus(Enum): + WAITING = auto() + RECORDING = auto() + FINISHED = auto() + ERROR = auto() diff --git a/src/home/sound/__init__.py b/src/home/sound/__init__.py deleted file mode 100644 index 43ddaff..0000000 --- a/src/home/sound/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -from .node_client import SoundNodeClient -from .record import ( - RecordStatus, - RecordingNotFoundError, - Recorder, -) -from .storage import RecordStorage, RecordFile -from .record_client import RecordClient diff --git a/src/home/sound/amixer.py b/src/home/sound/amixer.py deleted file mode 100644 index 0ab2c64..0000000 --- a/src/home/sound/amixer.py +++ /dev/null @@ -1,91 +0,0 @@ -import subprocess - -from ..config import config -from threading import Lock -from typing import Union - - -_lock = Lock() -_default_step = 5 - - -def has_control(s: str) -> bool: - for control in config['amixer']['controls']: - if control['name'] == s: - return True - return False - - -def get_caps(s: str) -> list[str]: - for control in config['amixer']['controls']: - if control['name'] == s: - return control['caps'] - raise KeyError(f'control {s} not found') - - -def get_all() -> list: - controls = [] - for control in config['amixer']['controls']: - controls.append({ - 'name': control['name'], - 'info': get(control['name']), - 'caps': control['caps'] - }) - return controls - - -def get(control: str): - return call('get', control) - - -def mute(control): - return call('set', control, 'mute') - - -def unmute(control): - return call('set', control, 'unmute') - - -def cap(control): - return call('set', control, 'cap') - - -def nocap(control): - return call('set', control, 'nocap') - - -def _get_default_step() -> int: - if 'step' in config['amixer']: - return int(config['amixer']['step']) - - return _default_step - - -def incr(control, step=None): - if step is None: - step = _get_default_step() - return call('set', control, f'{step}%+') - - -def decr(control, step=None): - if step is None: - step = _get_default_step() - return call('set', control, f'{step}%-') - - -def call(*args, return_code=False) -> Union[int, str]: - with _lock: - result = subprocess.run([config['amixer']['bin'], *args], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - if return_code: - return result.returncode - - if result.returncode != 0: - raise AmixerError(result.stderr.decode().strip()) - - return result.stdout.decode().strip() - - -class AmixerError(OSError): - pass diff --git a/src/home/sound/node_client.py b/src/home/sound/node_client.py deleted file mode 100644 index 7341208..0000000 --- a/src/home/sound/node_client.py +++ /dev/null @@ -1,109 +0,0 @@ -import requests -import logging -import shutil - -from ..util import Addr -from ..api.errors import ApiResponseError -from typing import Optional, Union -from .record import RecordFile - - -class SoundNodeClient: - def __init__(self, addr: Addr): - self.endpoint = f'http://{addr[0]}:{addr[1]}' - self.logger = logging.getLogger(self.__class__.__name__) - - def amixer_get_all(self): - return self._call('amixer/get-all/') - - def amixer_get(self, control: str): - return self._call(f'amixer/get/{control}/') - - def amixer_incr(self, control: str, step: Optional[int] = None): - params = {'step': step} if step is not None else None - return self._call(f'amixer/incr/{control}/', params=params) - - def amixer_decr(self, control: str, step: Optional[int] = None): - params = {'step': step} if step is not None else None - return self._call(f'amixer/decr/{control}/', params=params) - - def amixer_mute(self, control: str): - return self._call(f'amixer/mute/{control}/') - - def amixer_unmute(self, control: str): - return self._call(f'amixer/unmute/{control}/') - - def amixer_cap(self, control: str): - return self._call(f'amixer/cap/{control}/') - - def amixer_nocap(self, control: str): - return self._call(f'amixer/nocap/{control}/') - - def record(self, duration: int): - return self._call('record/', params={"duration": duration}) - - def record_info(self, record_id: int): - return self._call(f'record/info/{record_id}/') - - def record_forget(self, record_id: int): - return self._call(f'record/forget/{record_id}/') - - def record_download(self, record_id: int, output: str): - return self._call(f'record/download/{record_id}/', save_to=output) - - def storage_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]: - r = self._call('storage/list/', params={'extended': int(extended)}) - files = r['files'] - if as_objects: - return self.record_list_from_serialized(files) - return files - - @staticmethod - def record_list_from_serialized(files: Union[list[str], list[dict]]): - new_files = [] - for f in files: - kwargs = {'remote': True} - if isinstance(f, dict): - name = f['filename'] - kwargs['remote_filesize'] = f['filesize'] - else: - name = f - item = RecordFile(name, **kwargs) - new_files.append(item) - return new_files - - def storage_delete(self, file_id: str): - return self._call('storage/delete/', params={'file_id': file_id}) - - def storage_download(self, file_id: str, output: str): - return self._call('storage/download/', params={'file_id': file_id}, save_to=output) - - def _call(self, - method: str, - params: dict = None, - save_to: Optional[str] = None): - - kwargs = {} - if isinstance(params, dict): - kwargs['params'] = params - if save_to: - kwargs['stream'] = True - - url = f'{self.endpoint}/{method}' - self.logger.debug(f'calling {url}, kwargs: {kwargs}') - - r = requests.get(url, **kwargs) - if r.status_code != 200: - response = r.json() - raise ApiResponseError(status_code=r.status_code, - error_type=response['error'], - error_message=response['message'] or None, - error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None) - - if save_to: - r.raise_for_status() - with open(save_to, 'wb') as f: - shutil.copyfileobj(r.raw, f) - return True - - return r.json()['response'] diff --git a/src/home/sound/record.py b/src/home/sound/record.py deleted file mode 100644 index 1ad8827..0000000 --- a/src/home/sound/record.py +++ /dev/null @@ -1,400 +0,0 @@ -import threading -import time -import subprocess -import signal -import os -import logging - -from enum import Enum, auto -from typing import Optional -from ..config import config -from ..util import find_child_processes -from .storage import RecordFile, RecordStorage - - -_history_item_timeout = 7200 -_history_cleanup_freq = 3600 - - -class RecordStatus(Enum): - WAITING = auto() - RECORDING = auto() - FINISHED = auto() - ERROR = auto() - - -class RecordHistoryItem: - id: int - request_time: float - start_time: float - stop_time: float - relations: list[int] - status: RecordStatus - error: Optional[Exception] - file: Optional[RecordFile] - creation_time: float - - def __init__(self, id): - self.id = id - self.request_time = 0 - self.start_time = 0 - self.stop_time = 0 - self.relations = [] - self.status = RecordStatus.WAITING - self.file = None - self.error = None - self.creation_time = time.time() - - def add_relation(self, related_id: int): - self.relations.append(related_id) - - def mark_started(self, start_time: float): - self.start_time = start_time - self.status = RecordStatus.RECORDING - - def mark_finished(self, end_time: float, file: RecordFile): - self.stop_time = end_time - self.file = file - self.status = RecordStatus.FINISHED - - def mark_failed(self, error: Exception): - self.status = RecordStatus.ERROR - self.error = error - - def as_dict(self) -> dict: - data = { - 'id': self.id, - 'request_time': self.request_time, - 'status': self.status.value, - 'relations': self.relations, - 'start_time': self.start_time, - 'stop_time': self.stop_time, - } - if self.error: - data['error'] = str(self.error) - if self.file: - data['file'] = self.file.__dict__() - return data - - -class RecordingNotFoundError(Exception): - pass - - -class RecordHistory: - history: dict[int, RecordHistoryItem] - - def __init__(self): - self.history = {} - self.logger = logging.getLogger(self.__class__.__name__) - - def add(self, record_id: int): - self.logger.debug(f'add: record_id={record_id}') - - r = RecordHistoryItem(record_id) - r.request_time = time.time() - - self.history[record_id] = r - - def delete(self, record_id: int): - self.logger.debug(f'delete: record_id={record_id}') - del self.history[record_id] - - def cleanup(self): - del_ids = [] - for rid, item in self.history.items(): - if item.creation_time < time.time()-_history_item_timeout: - del_ids.append(rid) - for rid in del_ids: - self.delete(rid) - - def __getitem__(self, key): - if key not in self.history: - raise RecordingNotFoundError() - - return self.history[key] - - def __setitem__(self, key, value): - raise NotImplementedError('setting history item this way is prohibited') - - def __contains__(self, key): - return key in self.history - - -class Recording: - start_time: float - stop_time: float - duration: int - record_id: int - arecord_pid: Optional[int] - process: Optional[subprocess.Popen] - - g_record_id = 1 - - def __init__(self): - self.start_time = 0 - self.stop_time = 0 - self.duration = 0 - self.process = None - self.arecord_pid = None - self.record_id = Recording.next_id() - self.logger = logging.getLogger(self.__class__.__name__) - - def is_started(self) -> bool: - return self.start_time > 0 and self.stop_time > 0 - - def is_waiting(self): - return self.duration > 0 - - def ask_for(self, duration) -> int: - overtime = 0 - orig_duration = duration - - if self.is_started(): - already_passed = time.time() - self.start_time - max_duration = Recorder.get_max_record_time() - already_passed - self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}') - else: - max_duration = Recorder.get_max_record_time() - - if duration > max_duration: - overtime = duration - max_duration - duration = max_duration - - self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}') - - self.duration += duration - if self.is_started(): - til_end = self.stop_time - time.time() - if til_end < 0: - til_end = 0 - - _prev_stop_time = self.stop_time - _to_add = duration - til_end - if _to_add < 0: - _to_add = 0 - - self.stop_time += _to_add - self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})') - - return overtime - - def start(self, output: str): - assert self.start_time == 0 and self.stop_time == 0, "already started?!" - assert self.process is None, "self.process is not None, what the hell?" - - cur = time.time() - self.start_time = cur - self.stop_time = cur + self.duration - - arecord = config['arecord']['bin'] - lame = config['lame']['bin'] - b = config['lame']['bitrate'] - - cmd = f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null' - self.logger.debug(f'start: running `{cmd}`') - self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True) - - sh_pid = self.process.pid - self.logger.debug(f'start: started, pid of shell is {sh_pid}') - - arecord_pid = self.find_arecord_pid(sh_pid) - if arecord_pid is not None: - self.arecord_pid = arecord_pid - self.logger.debug(f'start: pid of arecord is {arecord_pid}') - - def stop(self): - if self.process: - if self.arecord_pid is None: - self.arecord_pid = self.find_arecord_pid(self.process.pid) - - if self.arecord_pid is not None: - os.kill(self.arecord_pid, signal.SIGINT) - timeout = config['node']['process_wait_timeout'] - - self.logger.debug(f'stop: sent SIGINT to {self.arecord_pid}. now waiting up to {timeout} seconds...') - try: - self.process.wait(timeout=timeout) - except subprocess.TimeoutExpired: - self.logger.warning(f'stop: wait({timeout}): timeout expired, calling terminate()') - self.process.terminate() - else: - self.logger.warning('stop: pid of arecord is unknown, calling terminate()') - self.process.terminate() - - rc = self.process.returncode - self.logger.debug(f'stop: rc={rc}') - - self.process = None - self.arecord_pid = 0 - - self.duration = 0 - self.start_time = 0 - self.stop_time = 0 - - def find_arecord_pid(self, sh_pid: int): - try: - children = find_child_processes(sh_pid) - except OSError as exc: - self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc)) - return None - - for child in children: - if 'arecord' in child.cmd: - return child.pid - - return None - - @staticmethod - def next_id() -> int: - cur_id = Recording.g_record_id - Recording.g_record_id += 1 - return cur_id - - def increment_id(self): - self.record_id = Recording.next_id() - - -class Recorder: - interrupted: bool - lock: threading.Lock - history_lock: threading.Lock - recording: Optional[Recording] - overtime: int - history: RecordHistory - next_history_cleanup_time: float - storage: RecordStorage - - def __init__(self, storage: RecordStorage): - self.storage = storage - self.recording = Recording() - self.interrupted = False - self.lock = threading.Lock() - self.history_lock = threading.Lock() - self.overtime = 0 - self.history = RecordHistory() - self.next_history_cleanup_time = 0 - self.logger = logging.getLogger(self.__class__.__name__) - - def start_thread(self): - t = threading.Thread(target=self.loop) - t.daemon = True - t.start() - - def loop(self) -> None: - tempname = os.path.join(self.storage.root, 'temp.mp3') - - while not self.interrupted: - cur = time.time() - stopped = False - cur_record_id = None - - if self.next_history_cleanup_time == 0: - self.next_history_cleanup_time = time.time() + _history_cleanup_freq - elif self.next_history_cleanup_time <= time.time(): - self.logger.debug('loop: calling history.cleanup()') - try: - self.history.cleanup() - except Exception as e: - self.logger.error('loop: error while history.cleanup(): ' + str(e)) - self.next_history_cleanup_time = time.time() + _history_cleanup_freq - - with self.lock: - cur_record_id = self.recording.record_id - # self.logger.debug(f'cur_record_id={cur_record_id}') - - if not self.recording.is_started(): - if self.recording.is_waiting(): - try: - if os.path.exists(tempname): - self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..') - try: - os.unlink(tempname) - except OSError as e: - self.logger.exception(e) - self.recording.start(tempname) - with self.history_lock: - self.history[cur_record_id].mark_started(self.recording.start_time) - except Exception as exc: - self.logger.exception(exc) - - # there should not be any errors, but still.. - try: - self.recording.stop() - except Exception as exc: - self.logger.exception(exc) - - with self.history_lock: - self.history[cur_record_id].mark_failed(exc) - - self.logger.debug(f'loop: start exc path: calling increment_id()') - self.recording.increment_id() - else: - if cur >= self.recording.stop_time: - try: - start_time = self.recording.start_time - stop_time = self.recording.stop_time - self.recording.stop() - - saved_name = self.storage.save(tempname, - record_id=cur_record_id, - start_time=int(start_time), - stop_time=int(stop_time)) - - with self.history_lock: - self.history[cur_record_id].mark_finished(stop_time, saved_name) - except Exception as exc: - self.logger.exception(exc) - with self.history_lock: - self.history[cur_record_id].mark_failed(exc) - finally: - self.logger.debug(f'loop: stop exc final path: calling increment_id()') - self.recording.increment_id() - - stopped = True - - if stopped and self.overtime > 0: - self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})') - _overtime = self.overtime - self.overtime = 0 - - related_id = self.record(_overtime) - self.logger.info(f'enqueued another record with id {related_id}') - - if cur_record_id is not None: - with self.history_lock: - self.history[cur_record_id].add_relation(related_id) - - time.sleep(0.2) - - def record(self, duration: int) -> int: - self.logger.debug(f'record: duration={duration}') - with self.lock: - overtime = self.recording.ask_for(duration) - self.logger.debug(f'overtime={overtime}') - - if overtime > self.overtime: - self.overtime = overtime - - if not self.recording.is_started(): - with self.history_lock: - self.history.add(self.recording.record_id) - - return self.recording.record_id - - def stop(self): - self.interrupted = True - - def get_info(self, record_id: int) -> RecordHistoryItem: - with self.history_lock: - return self.history[record_id] - - def forget(self, record_id: int): - with self.history_lock: - self.logger.info(f'forget: removing record {record_id} from history') - self.history.delete(record_id) - - @staticmethod - def get_max_record_time() -> int: - return config['node']['record_max_time'] - diff --git a/src/home/sound/record_client.py b/src/home/sound/record_client.py deleted file mode 100644 index 2744a8c..0000000 --- a/src/home/sound/record_client.py +++ /dev/null @@ -1,142 +0,0 @@ -import time -import logging -import threading -import os.path - -from tempfile import gettempdir -from .record import RecordStatus -from .node_client import SoundNodeClient -from ..util import Addr -from typing import Optional, Callable - - -class RecordClient: - interrupted: bool - logger: logging.Logger - clients: dict[str, SoundNodeClient] - awaiting: dict[str, dict[int, Optional[dict]]] - error_handler: Optional[Callable] - finished_handler: Optional[Callable] - download_on_finish: bool - - def __init__(self, - nodes: dict[str, Addr], - error_handler: Optional[Callable] = None, - finished_handler: Optional[Callable] = None, - download_on_finish=False): - self.interrupted = False - self.logger = logging.getLogger(self.__class__.__name__) - self.clients = {} - self.awaiting = {} - self.download_on_finish = download_on_finish - - self.error_handler = error_handler - self.finished_handler = finished_handler - - self.awaiting_lock = threading.Lock() - - for node, addr in nodes.items(): - self.clients[node] = SoundNodeClient(addr) - self.awaiting[node] = {} - - try: - t = threading.Thread(target=self.loop) - t.daemon = True - t.start() - except (KeyboardInterrupt, SystemExit) as exc: - self.stop() - self.logger.exception(exc) - - def stop(self): - self.interrupted = True - - def loop(self): - while not self.interrupted: - # self.logger.debug('loop: tick') - - for node in self.awaiting.keys(): - with self.awaiting_lock: - record_ids = list(self.awaiting[node].keys()) - if not record_ids: - continue - - self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}') - - cl = self.getclient(node) - del_ids = [] - for rid in record_ids: - info = cl.record_info(rid) - - if info['relations']: - for relid in info['relations']: - self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True) - - status = RecordStatus(info['status']) - if status in (RecordStatus.FINISHED, RecordStatus.ERROR): - if status == RecordStatus.FINISHED: - if self.download_on_finish: - local_fn = self.download(node, rid, info['file']['fileid']) - else: - local_fn = None - self._report_finished(info, local_fn, self.awaiting[node][rid]) - else: - self._report_error(info, self.awaiting[node][rid]) - del_ids.append(rid) - self.logger.debug(f'record {rid}: status {status}') - - if del_ids: - self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list') - with self.awaiting_lock: - for del_id in del_ids: - del self.awaiting[node][del_id] - - time.sleep(5) - - self.logger.info('loop ended') - - def getclient(self, node: str): - return self.clients[node] - - def record(self, - node: str, - duration: int, - userdata: Optional[dict] = None) -> int: - self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}') - - cl = self.getclient(node) - record_id = cl.record(duration)['id'] - self.logger.debug(f'record: request sent, record_id={record_id}') - - self.wait_for_record(node, record_id, userdata) - return record_id - - def wait_for_record(self, - node: str, - record_id: int, - userdata: Optional[dict] = None, - is_relative=False): - with self.awaiting_lock: - if record_id not in self.awaiting[node]: - msg = f'wait_for_record: adding {record_id} to {node}' - if is_relative: - msg += ' (by relation)' - self.logger.debug(msg) - - self.awaiting[node][record_id] = userdata - - def download(self, node: str, record_id: int, fileid: str): - dst = os.path.join(gettempdir(), f'{node}_{fileid}.mp3') - cl = self.getclient(node) - cl.record_download(record_id, dst) - return dst - - def forget(self, node: str, rid: int): - self.getclient(node).record_forget(rid) - - def _report_finished(self, *args): - if self.finished_handler: - self.finished_handler(*args) - - def _report_error(self, *args): - if self.error_handler: - self.error_handler(*args) diff --git a/src/home/sound/storage.py b/src/home/sound/storage.py deleted file mode 100644 index c61f6f6..0000000 --- a/src/home/sound/storage.py +++ /dev/null @@ -1,155 +0,0 @@ -import os -import re -import shutil -import logging - -from typing import Optional, Union -from datetime import datetime -from ..util import strgen - -logger = logging.getLogger(__name__) - - -class RecordFile: - start_time: Optional[datetime] - stop_time: Optional[datetime] - record_id: Optional[int] - name: str - file_id: Optional[str] - remote: bool - remote_filesize: int - storage_root: str - - human_date_dmt = '%d.%m.%y' - human_time_fmt = '%H:%M:%S' - - def __init__(self, filename: str, remote=False, remote_filesize=None, storage_root='/'): - self.name = filename - self.storage_root = storage_root - - self.remote = remote - self.remote_filesize = remote_filesize - - m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.mp3$', filename) - if m: - self.start_time = datetime.strptime(m.group(1), RecordStorage.time_fmt) - self.stop_time = datetime.strptime(m.group(2), RecordStorage.time_fmt) - self.record_id = int(m.group(3)) - self.file_id = (m.group(1) + '_' + m.group(2)).replace('-', '_') - else: - logger.warning(f'unexpected filename: {filename}') - self.start_time = None - self.stop_time = None - self.record_id = None - self.file_id = None - - @property - def path(self): - if self.remote: - return RuntimeError('remote recording, can\'t get real path') - - return os.path.realpath(os.path.join( - self.storage_root, self.name - )) - - @property - def start_humantime(self) -> str: - if self.start_time is None: - return '?' - fmt = f'{RecordFile.human_date_dmt} {RecordFile.human_time_fmt}' - return self.start_time.strftime(fmt) - - @property - def stop_humantime(self) -> str: - if self.stop_time is None: - return '?' - fmt = RecordFile.human_time_fmt - if self.start_time.date() != self.stop_time.date(): - fmt = f'{RecordFile.human_date_dmt} {fmt}' - return self.stop_time.strftime(fmt) - - @property - def start_unixtime(self) -> int: - if self.start_time is None: - return 0 - return int(self.start_time.timestamp()) - - @property - def stop_unixtime(self) -> int: - if self.stop_time is None: - return 0 - return int(self.stop_time.timestamp()) - - @property - def filesize(self): - if self.remote: - if self.remote_filesize is None: - raise RuntimeError('file is remote and remote_filesize is not set') - return self.remote_filesize - return os.path.getsize(self.path) - - def __dict__(self) -> dict: - return { - 'start_unixtime': self.start_unixtime, - 'stop_unixtime': self.stop_unixtime, - 'filename': self.name, - 'filesize': self.filesize, - 'fileid': self.file_id, - 'record_id': self.record_id or 0, - } - - -class RecordStorage: - time_fmt = '%d%m%y-%H%M%S' - - def __init__(self, root: str): - self.root = root - - def getfiles(self, as_objects=False) -> Union[list[str], list[RecordFile]]: - files = [] - for name in os.listdir(self.root): - path = os.path.join(self.root, name) - if os.path.isfile(path) and name.endswith('.mp3'): - files.append(name if not as_objects else RecordFile(name, storage_root=self.root)) - return files - - def find(self, file_id: str) -> Optional[RecordFile]: - for name in os.listdir(self.root): - if os.path.isfile(os.path.join(self.root, name)) and name.endswith('.mp3'): - item = RecordFile(name, storage_root=self.root) - if item.file_id == file_id: - return item - return None - - def purge(self): - files = self.getfiles() - if files: - logger = logging.getLogger(self.__name__) - for f in files: - try: - path = os.path.join(self.root, f) - logger.debug(f'purge: deleting {path}') - os.unlink(path) - except OSError as exc: - logger.exception(exc) - - def delete(self, file: RecordFile): - os.unlink(file.path) - - def save(self, - fn: str, - record_id: int, - start_time: int, - stop_time: int) -> RecordFile: - - start_time_s = datetime.fromtimestamp(start_time).strftime(self.time_fmt) - stop_time_s = datetime.fromtimestamp(stop_time).strftime(self.time_fmt) - - dst_fn = f'{start_time_s}_{stop_time_s}_id{record_id}' - if os.path.exists(os.path.join(self.root, dst_fn)): - dst_fn += strgen(4) - dst_fn += '.mp3' - dst_path = os.path.join(self.root, dst_fn) - - shutil.move(fn, dst_path) - return RecordFile(dst_fn, storage_root=self.root) diff --git a/src/home/web_api/web_api.py b/src/home/web_api/web_api.py index c75c031..6b8c54e 100644 --- a/src/home/web_api/web_api.py +++ b/src/home/web_api/web_api.py @@ -12,7 +12,7 @@ from ..config import config, is_development_mode from ..database import BotsDatabase, SensorsDatabase from ..util import stringify, format_tb from ..api.types import BotType, TemperatureSensorLocation, SoundSensorLocation -from ..sound import RecordStorage +from ..media import SoundRecordStorage db: Optional[BotsDatabase] = None sensors_db: Optional[SensorsDatabase] = None @@ -136,7 +136,7 @@ def recordings_list(): if not os.path.isdir(root): raise ValueError(f'invalid node {node}: no such directory') - storage = RecordStorage(root) + storage = SoundRecordStorage(root) files = storage.getfiles(as_objects=extended) if extended: files = list(map(lambda file: file.__dict__(), files)) -- cgit v1.2.3