summaryrefslogtreecommitdiff
path: root/src/home/media
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2023-09-27 00:54:57 +0300
committerEvgeny Zinoviev <me@ch1p.io>2023-09-27 00:54:57 +0300
commitd3a295872c49defb55fc8e4e43e55550991e0927 (patch)
treeb9dca15454f9027d5a9dad0d4443a20de04dbc5d /src/home/media
parentb7cbc2571c1870b4582ead45277d0aa7f961bec8 (diff)
parentbdbb296697f55f4c3a07af43c9aaf7a9ea86f3d0 (diff)
Merge branch 'master' of ch1p.io:homekit
Diffstat (limited to 'src/home/media')
-rw-r--r--src/home/media/__init__.py21
-rw-r--r--src/home/media/__init__.pyi27
-rw-r--r--src/home/media/node_client.py119
-rw-r--r--src/home/media/node_server.py86
-rw-r--r--src/home/media/record.py461
-rw-r--r--src/home/media/record_client.py166
-rw-r--r--src/home/media/storage.py210
-rw-r--r--src/home/media/types.py13
8 files changed, 0 insertions, 1103 deletions
diff --git a/src/home/media/__init__.py b/src/home/media/__init__.py
deleted file mode 100644
index 976c990..0000000
--- a/src/home/media/__init__.py
+++ /dev/null
@@ -1,21 +0,0 @@
-import importlib
-import itertools
-
-__map__ = {
- 'types': ['MediaNodeType'],
- 'record_client': ['SoundRecordClient', 'CameraRecordClient', 'RecordClient'],
- 'node_server': ['MediaNodeServer'],
- 'node_client': ['SoundNodeClient', 'CameraNodeClient', 'MediaNodeClient'],
- 'storage': ['SoundRecordStorage', 'ESP32CameraRecordStorage', 'SoundRecordFile', 'CameraRecordFile', 'RecordFile'],
- 'record': ['SoundRecorder', 'CameraRecorder']
-}
-
-__all__ = list(itertools.chain(*__map__.values()))
-
-def __getattr__(name):
- if name in __all__:
- for file, names in __map__.items():
- if name in names:
- module = importlib.import_module(f'.{file}', __name__)
- return getattr(module, name)
- raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/src/home/media/__init__.pyi b/src/home/media/__init__.pyi
deleted file mode 100644
index 77c2176..0000000
--- a/src/home/media/__init__.pyi
+++ /dev/null
@@ -1,27 +0,0 @@
-from .types import (
- MediaNodeType as MediaNodeType
-)
-from .record_client import (
- SoundRecordClient as SoundRecordClient,
- CameraRecordClient as CameraRecordClient,
- RecordClient as RecordClient
-)
-from .node_server import (
- MediaNodeServer as MediaNodeServer
-)
-from .node_client import (
- SoundNodeClient as SoundNodeClient,
- CameraNodeClient as CameraNodeClient,
- MediaNodeClient as MediaNodeClient
-)
-from .storage import (
- SoundRecordStorage as SoundRecordStorage,
- ESP32CameraRecordStorage as ESP32CameraRecordStorage,
- SoundRecordFile as SoundRecordFile,
- CameraRecordFile as CameraRecordFile,
- RecordFile as RecordFile
-)
-from .record import (
- SoundRecorder as SoundRecorder,
- CameraRecorder as CameraRecorder
-) \ No newline at end of file
diff --git a/src/home/media/node_client.py b/src/home/media/node_client.py
deleted file mode 100644
index eb39898..0000000
--- a/src/home/media/node_client.py
+++ /dev/null
@@ -1,119 +0,0 @@
-import requests
-import shutil
-import logging
-
-from typing import Optional, Union, List
-from .storage import RecordFile
-from ..util import Addr
-from ..api.errors import ApiResponseError
-
-
-class MediaNodeClient:
- def __init__(self, addr: Addr):
- self.endpoint = f'http://{addr[0]}:{addr[1]}'
- self.logger = logging.getLogger(self.__class__.__name__)
-
- def record(self, duration: int):
- return self._call('record/', params={"duration": duration})
-
- def record_info(self, record_id: int):
- return self._call(f'record/info/{record_id}/')
-
- def record_forget(self, record_id: int):
- return self._call(f'record/forget/{record_id}/')
-
- def record_download(self, record_id: int, output: str):
- return self._call(f'record/download/{record_id}/', save_to=output)
-
- def storage_list(self, extended=False, as_objects=False) -> Union[List[str], List[dict], List[RecordFile]]:
- r = self._call('storage/list/', params={'extended': int(extended)})
- files = r['files']
- if as_objects:
- return self.record_list_from_serialized(files)
- return files
-
- @staticmethod
- def record_list_from_serialized(files: Union[List[str], List[dict]]):
- new_files = []
- for f in files:
- kwargs = {'remote': True}
- if isinstance(f, dict):
- name = f['filename']
- kwargs['remote_filesize'] = f['filesize']
- else:
- name = f
- item = RecordFile.create(name, **kwargs)
- new_files.append(item)
- return new_files
-
- def storage_delete(self, file_id: str):
- return self._call('storage/delete/', params={'file_id': file_id})
-
- def storage_download(self, file_id: str, output: str):
- return self._call('storage/download/', params={'file_id': file_id}, save_to=output)
-
- def _call(self,
- method: str,
- params: dict = None,
- save_to: Optional[str] = None):
- kwargs = {}
- if isinstance(params, dict):
- kwargs['params'] = params
- if save_to:
- kwargs['stream'] = True
-
- url = f'{self.endpoint}/{method}'
- self.logger.debug(f'calling {url}, kwargs: {kwargs}')
-
- r = requests.get(url, **kwargs)
- if r.status_code != 200:
- response = r.json()
- raise ApiResponseError(status_code=r.status_code,
- error_type=response['error'],
- error_message=response['message'] or None,
- error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None)
-
- if save_to:
- r.raise_for_status()
- with open(save_to, 'wb') as f:
- shutil.copyfileobj(r.raw, f)
- return True
-
- return r.json()['response']
-
-
-class SoundNodeClient(MediaNodeClient):
- def amixer_get_all(self):
- return self._call('amixer/get-all/')
-
- def amixer_get(self, control: str):
- return self._call(f'amixer/get/{control}/')
-
- def amixer_incr(self, control: str, step: Optional[int] = None):
- params = {'step': step} if step is not None else None
- return self._call(f'amixer/incr/{control}/', params=params)
-
- def amixer_decr(self, control: str, step: Optional[int] = None):
- params = {'step': step} if step is not None else None
- return self._call(f'amixer/decr/{control}/', params=params)
-
- def amixer_mute(self, control: str):
- return self._call(f'amixer/mute/{control}/')
-
- def amixer_unmute(self, control: str):
- return self._call(f'amixer/unmute/{control}/')
-
- def amixer_cap(self, control: str):
- return self._call(f'amixer/cap/{control}/')
-
- def amixer_nocap(self, control: str):
- return self._call(f'amixer/nocap/{control}/')
-
-
-class CameraNodeClient(MediaNodeClient):
- def capture(self,
- save_to: str,
- with_flash: bool = False):
- return self._call('capture/',
- {'with_flash': int(with_flash)},
- save_to=save_to)
diff --git a/src/home/media/node_server.py b/src/home/media/node_server.py
deleted file mode 100644
index 5d0803c..0000000
--- a/src/home/media/node_server.py
+++ /dev/null
@@ -1,86 +0,0 @@
-from .. import http
-from .record import Recorder
-from .types import RecordStatus
-from .storage import RecordStorage
-
-
-class MediaNodeServer(http.HTTPServer):
- recorder: Recorder
- storage: RecordStorage
-
- def __init__(self,
- recorder: Recorder,
- storage: RecordStorage,
- *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- self.recorder = recorder
- self.storage = storage
-
- self.get('/record/', self.do_record)
- self.get('/record/info/{id}/', self.record_info)
- self.get('/record/forget/{id}/', self.record_forget)
- self.get('/record/download/{id}/', self.record_download)
-
- self.get('/storage/list/', self.storage_list)
- self.get('/storage/delete/', self.storage_delete)
- self.get('/storage/download/', self.storage_download)
-
- async def do_record(self, request: http.Request):
- duration = int(request.query['duration'])
- max = Recorder.get_max_record_time()*15
- if not 0 < duration <= max:
- raise ValueError(f'invalid duration: max duration is {max}')
-
- record_id = self.recorder.record(duration)
- return http.ok({'id': record_id})
-
- async def record_info(self, request: http.Request):
- record_id = int(request.match_info['id'])
- info = self.recorder.get_info(record_id)
- return http.ok(info.as_dict())
-
- async def record_forget(self, request: http.Request):
- record_id = int(request.match_info['id'])
-
- info = self.recorder.get_info(record_id)
- assert info.status in (RecordStatus.FINISHED, RecordStatus.ERROR), f"can't forget: record status is {info.status}"
-
- self.recorder.forget(record_id)
- return http.ok()
-
- async def record_download(self, request: http.Request):
- record_id = int(request.match_info['id'])
-
- info = self.recorder.get_info(record_id)
- assert info.status == RecordStatus.FINISHED, f"record status is {info.status}"
-
- return http.FileResponse(info.file.path)
-
- async def storage_list(self, request: http.Request):
- extended = 'extended' in request.query and int(request.query['extended']) == 1
-
- files = self.storage.getfiles(as_objects=extended)
- if extended:
- files = list(map(lambda file: file.__dict__(), files))
-
- return http.ok({
- 'files': files
- })
-
- async def storage_delete(self, request: http.Request):
- file_id = request.query['file_id']
- file = self.storage.find(file_id)
- if not file:
- raise ValueError(f'file {file} not found')
-
- self.storage.delete(file)
- return http.ok()
-
- async def storage_download(self, request):
- file_id = request.query['file_id']
- file = self.storage.find(file_id)
- if not file:
- raise ValueError(f'file {file} not found')
-
- return http.FileResponse(file.path)
diff --git a/src/home/media/record.py b/src/home/media/record.py
deleted file mode 100644
index cd7447a..0000000
--- a/src/home/media/record.py
+++ /dev/null
@@ -1,461 +0,0 @@
-import os
-import threading
-import logging
-import time
-import subprocess
-import signal
-
-from typing import Optional, List, Dict
-from ..util import find_child_processes, Addr
-from ..config import config
-from .storage import RecordFile, RecordStorage
-from .types import RecordStatus
-from ..camera.types import CameraType
-
-
-_history_item_timeout = 7200
-_history_cleanup_freq = 3600
-
-
-class RecordHistoryItem:
- id: int
- request_time: float
- start_time: float
- stop_time: float
- relations: List[int]
- status: RecordStatus
- error: Optional[Exception]
- file: Optional[RecordFile]
- creation_time: float
-
- def __init__(self, id):
- self.id = id
- self.request_time = 0
- self.start_time = 0
- self.stop_time = 0
- self.relations = []
- self.status = RecordStatus.WAITING
- self.file = None
- self.error = None
- self.creation_time = time.time()
-
- def add_relation(self, related_id: int):
- self.relations.append(related_id)
-
- def mark_started(self, start_time: float):
- self.start_time = start_time
- self.status = RecordStatus.RECORDING
-
- def mark_finished(self, end_time: float, file: RecordFile):
- self.stop_time = end_time
- self.file = file
- self.status = RecordStatus.FINISHED
-
- def mark_failed(self, error: Exception):
- self.status = RecordStatus.ERROR
- self.error = error
-
- def as_dict(self) -> dict:
- data = {
- 'id': self.id,
- 'request_time': self.request_time,
- 'status': self.status.value,
- 'relations': self.relations,
- 'start_time': self.start_time,
- 'stop_time': self.stop_time,
- }
- if self.error:
- data['error'] = str(self.error)
- if self.file:
- data['file'] = self.file.__dict__()
- return data
-
-
-class RecordingNotFoundError(Exception):
- pass
-
-
-class RecordHistory:
- history: Dict[int, RecordHistoryItem]
-
- def __init__(self):
- self.history = {}
- self.logger = logging.getLogger(self.__class__.__name__)
-
- def add(self, record_id: int):
- self.logger.debug(f'add: record_id={record_id}')
-
- r = RecordHistoryItem(record_id)
- r.request_time = time.time()
-
- self.history[record_id] = r
-
- def delete(self, record_id: int):
- self.logger.debug(f'delete: record_id={record_id}')
- del self.history[record_id]
-
- def cleanup(self):
- del_ids = []
- for rid, item in self.history.items():
- if item.creation_time < time.time()-_history_item_timeout:
- del_ids.append(rid)
- for rid in del_ids:
- self.delete(rid)
-
- def __getitem__(self, key):
- if key not in self.history:
- raise RecordingNotFoundError()
-
- return self.history[key]
-
- def __setitem__(self, key, value):
- raise NotImplementedError('setting history item this way is prohibited')
-
- def __contains__(self, key):
- return key in self.history
-
-
-class Recording:
- RECORDER_PROGRAM = None
-
- start_time: float
- stop_time: float
- duration: int
- record_id: int
- recorder_program_pid: Optional[int]
- process: Optional[subprocess.Popen]
-
- g_record_id = 1
-
- def __init__(self):
- if self.RECORDER_PROGRAM is None:
- raise RuntimeError('this is abstract class')
-
- self.start_time = 0
- self.stop_time = 0
- self.duration = 0
- self.process = None
- self.recorder_program_pid = None
- self.record_id = Recording.next_id()
- self.logger = logging.getLogger(self.__class__.__name__)
-
- def is_started(self) -> bool:
- return self.start_time > 0 and self.stop_time > 0
-
- def is_waiting(self):
- return self.duration > 0
-
- def ask_for(self, duration) -> int:
- overtime = 0
- orig_duration = duration
-
- if self.is_started():
- already_passed = time.time() - self.start_time
- max_duration = Recorder.get_max_record_time() - already_passed
- self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}')
- else:
- max_duration = Recorder.get_max_record_time()
-
- if duration > max_duration:
- overtime = duration - max_duration
- duration = max_duration
-
- self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}')
-
- self.duration += duration
- if self.is_started():
- til_end = self.stop_time - time.time()
- if til_end < 0:
- til_end = 0
-
- _prev_stop_time = self.stop_time
- _to_add = duration - til_end
- if _to_add < 0:
- _to_add = 0
-
- self.stop_time += _to_add
- self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})')
-
- return overtime
-
- def start(self, output: str):
- assert self.start_time == 0 and self.stop_time == 0, "already started?!"
- assert self.process is None, "self.process is not None, what the hell?"
-
- cur = time.time()
- self.start_time = cur
- self.stop_time = cur + self.duration
-
- cmd = self.get_command(output)
- self.logger.debug(f'start: running `{cmd}`')
- self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True)
-
- sh_pid = self.process.pid
- self.logger.debug(f'start: started, pid of shell is {sh_pid}')
-
- pid = self.find_recorder_program_pid(sh_pid)
- if pid is not None:
- self.recorder_program_pid = pid
- self.logger.debug(f'start: pid of {self.RECORDER_PROGRAM} is {pid}')
-
- def get_command(self, output: str) -> str:
- pass
-
- def stop(self):
- if self.process:
- if self.recorder_program_pid is None:
- self.recorder_program_pid = self.find_recorder_program_pid(self.process.pid)
-
- if self.recorder_program_pid is not None:
- os.kill(self.recorder_program_pid, signal.SIGINT)
- timeout = config['node']['process_wait_timeout']
-
- self.logger.debug(f'stop: sent SIGINT to {self.recorder_program_pid}. now waiting up to {timeout} seconds...')
- try:
- self.process.wait(timeout=timeout)
- except subprocess.TimeoutExpired:
- self.logger.warning(f'stop: wait({timeout}): timeout expired, killing it')
- try:
- os.kill(self.recorder_program_pid, signal.SIGKILL)
- self.process.terminate()
- except Exception as exc:
- self.logger.exception(exc)
- else:
- self.logger.warning(f'stop: pid of {self.RECORDER_PROGRAM} is unknown, calling terminate()')
- self.process.terminate()
-
- rc = self.process.returncode
- self.logger.debug(f'stop: rc={rc}')
-
- self.process = None
- self.recorder_program_pid = 0
-
- self.duration = 0
- self.start_time = 0
- self.stop_time = 0
-
- def find_recorder_program_pid(self, sh_pid: int):
- try:
- children = find_child_processes(sh_pid)
- except OSError as exc:
- self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc))
- return None
-
- for child in children:
- if self.RECORDER_PROGRAM in child.cmd:
- return child.pid
-
- return None
-
- @staticmethod
- def next_id() -> int:
- cur_id = Recording.g_record_id
- Recording.g_record_id += 1
- return cur_id
-
- def increment_id(self):
- self.record_id = Recording.next_id()
-
-
-class Recorder:
- TEMP_NAME = None
-
- interrupted: bool
- lock: threading.Lock
- history_lock: threading.Lock
- recording: Optional[Recording]
- overtime: int
- history: RecordHistory
- next_history_cleanup_time: float
- storage: RecordStorage
-
- def __init__(self,
- storage: RecordStorage,
- recording: Recording):
- if self.TEMP_NAME is None:
- raise RuntimeError('this is abstract class')
-
- self.storage = storage
- self.recording = recording
- self.interrupted = False
- self.lock = threading.Lock()
- self.history_lock = threading.Lock()
- self.overtime = 0
- self.history = RecordHistory()
- self.next_history_cleanup_time = 0
- self.logger = logging.getLogger(self.__class__.__name__)
-
- def start_thread(self):
- t = threading.Thread(target=self.loop)
- t.daemon = True
- t.start()
-
- def loop(self) -> None:
- tempname = os.path.join(self.storage.root, self.TEMP_NAME)
-
- while not self.interrupted:
- cur = time.time()
- stopped = False
- cur_record_id = None
-
- if self.next_history_cleanup_time == 0:
- self.next_history_cleanup_time = time.time() + _history_cleanup_freq
- elif self.next_history_cleanup_time <= time.time():
- self.logger.debug('loop: calling history.cleanup()')
- try:
- self.history.cleanup()
- except Exception as e:
- self.logger.error('loop: error while history.cleanup(): ' + str(e))
- self.next_history_cleanup_time = time.time() + _history_cleanup_freq
-
- with self.lock:
- cur_record_id = self.recording.record_id
- # self.logger.debug(f'cur_record_id={cur_record_id}')
-
- if not self.recording.is_started():
- if self.recording.is_waiting():
- try:
- if os.path.exists(tempname):
- self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..')
- try:
- os.unlink(tempname)
- except OSError as e:
- self.logger.exception(e)
- self.recording.start(tempname)
- with self.history_lock:
- self.history[cur_record_id].mark_started(self.recording.start_time)
- except Exception as exc:
- self.logger.exception(exc)
-
- # there should not be any errors, but still..
- try:
- self.recording.stop()
- except Exception as exc:
- self.logger.exception(exc)
-
- with self.history_lock:
- self.history[cur_record_id].mark_failed(exc)
-
- self.logger.debug(f'loop: start exc path: calling increment_id()')
- self.recording.increment_id()
- else:
- if cur >= self.recording.stop_time:
- try:
- start_time = self.recording.start_time
- stop_time = self.recording.stop_time
- self.recording.stop()
-
- saved_name = self.storage.save(tempname,
- record_id=cur_record_id,
- start_time=int(start_time),
- stop_time=int(stop_time))
-
- with self.history_lock:
- self.history[cur_record_id].mark_finished(stop_time, saved_name)
- except Exception as exc:
- self.logger.exception(exc)
- with self.history_lock:
- self.history[cur_record_id].mark_failed(exc)
- finally:
- self.logger.debug(f'loop: stop exc final path: calling increment_id()')
- self.recording.increment_id()
-
- stopped = True
-
- if stopped and self.overtime > 0:
- self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})')
- _overtime = self.overtime
- self.overtime = 0
-
- related_id = self.record(_overtime)
- self.logger.info(f'enqueued another record with id {related_id}')
-
- if cur_record_id is not None:
- with self.history_lock:
- self.history[cur_record_id].add_relation(related_id)
-
- time.sleep(0.2)
-
- def record(self, duration: int) -> int:
- self.logger.debug(f'record: duration={duration}')
- with self.lock:
- overtime = self.recording.ask_for(duration)
- self.logger.debug(f'overtime={overtime}')
-
- if overtime > self.overtime:
- self.overtime = overtime
-
- if not self.recording.is_started():
- with self.history_lock:
- self.history.add(self.recording.record_id)
-
- return self.recording.record_id
-
- def stop(self):
- self.interrupted = True
-
- def get_info(self, record_id: int) -> RecordHistoryItem:
- with self.history_lock:
- return self.history[record_id]
-
- def forget(self, record_id: int):
- with self.history_lock:
- self.logger.info(f'forget: removing record {record_id} from history')
- self.history.delete(record_id)
-
- @staticmethod
- def get_max_record_time() -> int:
- return config['node']['record_max_time']
-
-
-class SoundRecorder(Recorder):
- TEMP_NAME = 'temp.mp3'
-
- def __init__(self, *args, **kwargs):
- super().__init__(recording=SoundRecording(),
- *args, **kwargs)
-
-
-class CameraRecorder(Recorder):
- TEMP_NAME = 'temp.mp4'
-
- def __init__(self,
- camera_type: CameraType,
- *args, **kwargs):
- if camera_type == CameraType.ESP32:
- recording = ESP32CameraRecording(stream_addr=kwargs['stream_addr'])
- del kwargs['stream_addr']
- else:
- raise RuntimeError(f'unsupported camera type {camera_type}')
-
- super().__init__(recording=recording,
- *args, **kwargs)
-
-
-class SoundRecording(Recording):
- RECORDER_PROGRAM = 'arecord'
-
- def get_command(self, output: str) -> str:
- arecord = config['arecord']['bin']
- lame = config['lame']['bin']
- b = config['lame']['bitrate']
-
- return f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null'
-
-
-class ESP32CameraRecording(Recording):
- RECORDER_PROGRAM = 'esp32_capture.py'
-
- stream_addr: Addr
-
- def __init__(self, stream_addr: Addr):
- super().__init__()
- self.stream_addr = stream_addr
-
- def get_command(self, output: str) -> str:
- bin = config['esp32_capture']['bin']
- return f'{bin} --addr {self.stream_addr[0]}:{self.stream_addr[1]} --output-directory {output} >/dev/null 2>/dev/null'
-
- def start(self, output: str):
- output = os.path.dirname(output)
- return super().start(output) \ No newline at end of file
diff --git a/src/home/media/record_client.py b/src/home/media/record_client.py
deleted file mode 100644
index 322495c..0000000
--- a/src/home/media/record_client.py
+++ /dev/null
@@ -1,166 +0,0 @@
-import time
-import logging
-import threading
-import os.path
-
-from tempfile import gettempdir
-from .record import RecordStatus
-from .node_client import SoundNodeClient, MediaNodeClient, CameraNodeClient
-from ..util import Addr
-from typing import Optional, Callable, Dict
-
-
-class RecordClient:
- DOWNLOAD_EXTENSION = None
-
- interrupted: bool
- logger: logging.Logger
- clients: Dict[str, MediaNodeClient]
- awaiting: Dict[str, Dict[int, Optional[dict]]]
- error_handler: Optional[Callable]
- finished_handler: Optional[Callable]
- download_on_finish: bool
-
- def __init__(self,
- nodes: Dict[str, Addr],
- error_handler: Optional[Callable] = None,
- finished_handler: Optional[Callable] = None,
- download_on_finish=False):
- if self.DOWNLOAD_EXTENSION is None:
- raise RuntimeError('this is abstract class')
-
- self.interrupted = False
- self.logger = logging.getLogger(self.__class__.__name__)
- self.clients = {}
- self.awaiting = {}
-
- self.download_on_finish = download_on_finish
- self.error_handler = error_handler
- self.finished_handler = finished_handler
-
- self.awaiting_lock = threading.Lock()
-
- self.make_clients(nodes)
-
- try:
- t = threading.Thread(target=self.loop)
- t.daemon = True
- t.start()
- except (KeyboardInterrupt, SystemExit) as exc:
- self.stop()
- self.logger.exception(exc)
-
- def make_clients(self, nodes: Dict[str, Addr]):
- pass
-
- def stop(self):
- self.interrupted = True
-
- def loop(self):
- while not self.interrupted:
- for node in self.awaiting.keys():
- with self.awaiting_lock:
- record_ids = list(self.awaiting[node].keys())
- if not record_ids:
- continue
-
- self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}')
-
- cl = self.getclient(node)
- del_ids = []
- for rid in record_ids:
- info = cl.record_info(rid)
-
- if info['relations']:
- for relid in info['relations']:
- self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True)
-
- status = RecordStatus(info['status'])
- if status in (RecordStatus.FINISHED, RecordStatus.ERROR):
- if status == RecordStatus.FINISHED:
- if self.download_on_finish:
- local_fn = self.download(node, rid, info['file']['fileid'])
- else:
- local_fn = None
- self._report_finished(info, local_fn, self.awaiting[node][rid])
- else:
- self._report_error(info, self.awaiting[node][rid])
- del_ids.append(rid)
- self.logger.debug(f'record {rid}: status {status}')
-
- if del_ids:
- self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list')
- with self.awaiting_lock:
- for del_id in del_ids:
- del self.awaiting[node][del_id]
-
- time.sleep(5)
-
- self.logger.info('loop ended')
-
- def getclient(self, node: str):
- return self.clients[node]
-
- def record(self,
- node: str,
- duration: int,
- userdata: Optional[dict] = None) -> int:
- self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}')
-
- cl = self.getclient(node)
- record_id = cl.record(duration)['id']
- self.logger.debug(f'record: request sent, record_id={record_id}')
-
- self.wait_for_record(node, record_id, userdata)
- return record_id
-
- def wait_for_record(self,
- node: str,
- record_id: int,
- userdata: Optional[dict] = None,
- is_relative=False):
- with self.awaiting_lock:
- if record_id not in self.awaiting[node]:
- msg = f'wait_for_record: adding {record_id} to {node}'
- if is_relative:
- msg += ' (by relation)'
- self.logger.debug(msg)
-
- self.awaiting[node][record_id] = userdata
-
- def download(self, node: str, record_id: int, fileid: str):
- dst = os.path.join(gettempdir(), f'{node}_{fileid}.{self.DOWNLOAD_EXTENSION}')
- cl = self.getclient(node)
- cl.record_download(record_id, dst)
- return dst
-
- def forget(self, node: str, rid: int):
- self.getclient(node).record_forget(rid)
-
- def _report_finished(self, *args):
- if self.finished_handler:
- self.finished_handler(*args)
-
- def _report_error(self, *args):
- if self.error_handler:
- self.error_handler(*args)
-
-
-class SoundRecordClient(RecordClient):
- DOWNLOAD_EXTENSION = 'mp3'
- # clients: Dict[str, SoundNodeClient]
-
- def make_clients(self, nodes: Dict[str, Addr]):
- for node, addr in nodes.items():
- self.clients[node] = SoundNodeClient(addr)
- self.awaiting[node] = {}
-
-
-class CameraRecordClient(RecordClient):
- DOWNLOAD_EXTENSION = 'mp4'
- # clients: Dict[str, CameraNodeClient]
-
- def make_clients(self, nodes: Dict[str, Addr]):
- for node, addr in nodes.items():
- self.clients[node] = CameraNodeClient(addr)
- self.awaiting[node] = {} \ No newline at end of file
diff --git a/src/home/media/storage.py b/src/home/media/storage.py
deleted file mode 100644
index dd74ff8..0000000
--- a/src/home/media/storage.py
+++ /dev/null
@@ -1,210 +0,0 @@
-import os
-import re
-import shutil
-import logging
-
-from typing import Optional, Union, List
-from datetime import datetime
-from ..util import strgen
-
-logger = logging.getLogger(__name__)
-
-
-# record file
-# -----------
-
-class RecordFile:
- EXTENSION = None
-
- start_time: Optional[datetime]
- stop_time: Optional[datetime]
- record_id: Optional[int]
- name: str
- file_id: Optional[str]
- remote: bool
- remote_filesize: int
- storage_root: str
-
- human_date_dmt = '%d.%m.%y'
- human_time_fmt = '%H:%M:%S'
-
- @staticmethod
- def create(filename: str, *args, **kwargs):
- if filename.endswith(f'.{SoundRecordFile.EXTENSION}'):
- return SoundRecordFile(filename, *args, **kwargs)
- elif filename.endswith(f'.{CameraRecordFile.EXTENSION}'):
- return CameraRecordFile(filename, *args, **kwargs)
- else:
- raise RuntimeError(f'unsupported file extension: {filename}')
-
- def __init__(self, filename: str, remote=False, remote_filesize=None, storage_root='/'):
- if self.EXTENSION is None:
- raise RuntimeError('this is abstract class')
-
- self.name = filename
- self.storage_root = storage_root
-
- self.remote = remote
- self.remote_filesize = remote_filesize
-
- m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.'+self.EXTENSION+'$', filename)
- if m:
- self.start_time = datetime.strptime(m.group(1), RecordStorage.time_fmt)
- self.stop_time = datetime.strptime(m.group(2), RecordStorage.time_fmt)
- self.record_id = int(m.group(3))
- self.file_id = (m.group(1) + '_' + m.group(2)).replace('-', '_')
- else:
- logger.warning(f'unexpected filename: {filename}')
- self.start_time = None
- self.stop_time = None
- self.record_id = None
- self.file_id = None
-
- @property
- def path(self):
- if self.remote:
- return RuntimeError('remote recording, can\'t get real path')
-
- return os.path.realpath(os.path.join(
- self.storage_root, self.name
- ))
-
- @property
- def start_humantime(self) -> str:
- if self.start_time is None:
- return '?'
- fmt = f'{RecordFile.human_date_dmt} {RecordFile.human_time_fmt}'
- return self.start_time.strftime(fmt)
-
- @property
- def stop_humantime(self) -> str:
- if self.stop_time is None:
- return '?'
- fmt = RecordFile.human_time_fmt
- if self.start_time.date() != self.stop_time.date():
- fmt = f'{RecordFile.human_date_dmt} {fmt}'
- return self.stop_time.strftime(fmt)
-
- @property
- def start_unixtime(self) -> int:
- if self.start_time is None:
- return 0
- return int(self.start_time.timestamp())
-
- @property
- def stop_unixtime(self) -> int:
- if self.stop_time is None:
- return 0
- return int(self.stop_time.timestamp())
-
- @property
- def filesize(self):
- if self.remote:
- if self.remote_filesize is None:
- raise RuntimeError('file is remote and remote_filesize is not set')
- return self.remote_filesize
- return os.path.getsize(self.path)
-
- def __dict__(self) -> dict:
- return {
- 'start_unixtime': self.start_unixtime,
- 'stop_unixtime': self.stop_unixtime,
- 'filename': self.name,
- 'filesize': self.filesize,
- 'fileid': self.file_id,
- 'record_id': self.record_id or 0,
- }
-
-
-class PseudoRecordFile(RecordFile):
- EXTENSION = 'null'
-
- def __init__(self):
- super().__init__('pseudo.null')
-
- @property
- def filesize(self):
- return 0
-
-
-class SoundRecordFile(RecordFile):
- EXTENSION = 'mp3'
-
-
-class CameraRecordFile(RecordFile):
- EXTENSION = 'mp4'
-
-
-# record storage
-# --------------
-
-class RecordStorage:
- EXTENSION = None
-
- time_fmt = '%d%m%y-%H%M%S'
-
- def __init__(self, root: str):
- if self.EXTENSION is None:
- raise RuntimeError('this is abstract class')
-
- self.root = root
-
- def getfiles(self, as_objects=False) -> Union[List[str], List[RecordFile]]:
- files = []
- for name in os.listdir(self.root):
- path = os.path.join(self.root, name)
- if os.path.isfile(path) and name.endswith(f'.{self.EXTENSION}'):
- files.append(name if not as_objects else RecordFile.create(name, storage_root=self.root))
- return files
-
- def find(self, file_id: str) -> Optional[RecordFile]:
- for name in os.listdir(self.root):
- if os.path.isfile(os.path.join(self.root, name)) and name.endswith(f'.{self.EXTENSION}'):
- item = RecordFile.create(name, storage_root=self.root)
- if item.file_id == file_id:
- return item
- return None
-
- def purge(self):
- files = self.getfiles()
- if files:
- logger = logging.getLogger(self.__name__)
- for f in files:
- try:
- path = os.path.join(self.root, f)
- logger.debug(f'purge: deleting {path}')
- os.unlink(path)
- except OSError as exc:
- logger.exception(exc)
-
- def delete(self, file: RecordFile):
- os.unlink(file.path)
-
- def save(self,
- fn: str,
- record_id: int,
- start_time: int,
- stop_time: int) -> RecordFile:
-
- start_time_s = datetime.fromtimestamp(start_time).strftime(self.time_fmt)
- stop_time_s = datetime.fromtimestamp(stop_time).strftime(self.time_fmt)
-
- dst_fn = f'{start_time_s}_{stop_time_s}_id{record_id}'
- if os.path.exists(os.path.join(self.root, dst_fn)):
- dst_fn += strgen(4)
- dst_fn += f'.{self.EXTENSION}'
- dst_path = os.path.join(self.root, dst_fn)
-
- shutil.move(fn, dst_path)
- return RecordFile.create(dst_fn, storage_root=self.root)
-
-
-class SoundRecordStorage(RecordStorage):
- EXTENSION = 'mp3'
-
-
-class ESP32CameraRecordStorage(RecordStorage):
- EXTENSION = 'jpg' # not used anyway
-
- def save(self, *args, **kwargs):
- return PseudoRecordFile() \ No newline at end of file
diff --git a/src/home/media/types.py b/src/home/media/types.py
deleted file mode 100644
index acbc291..0000000
--- a/src/home/media/types.py
+++ /dev/null
@@ -1,13 +0,0 @@
-from enum import Enum, auto
-
-
-class MediaNodeType(Enum):
- SOUND = auto()
- CAMERA = auto()
-
-
-class RecordStatus(Enum):
- WAITING = auto()
- RECORDING = auto()
- FINISHED = auto()
- ERROR = auto()