summaryrefslogtreecommitdiff
path: root/py_include/homekit/media
diff options
context:
space:
mode:
Diffstat (limited to 'py_include/homekit/media')
-rw-r--r--py_include/homekit/media/__init__.py22
-rw-r--r--py_include/homekit/media/__init__.pyi27
-rw-r--r--py_include/homekit/media/node_client.py119
-rw-r--r--py_include/homekit/media/node_server.py86
-rw-r--r--py_include/homekit/media/record.py461
-rw-r--r--py_include/homekit/media/record_client.py166
-rw-r--r--py_include/homekit/media/storage.py210
-rw-r--r--py_include/homekit/media/types.py13
8 files changed, 1104 insertions, 0 deletions
diff --git a/py_include/homekit/media/__init__.py b/py_include/homekit/media/__init__.py
new file mode 100644
index 0000000..6923105
--- /dev/null
+++ b/py_include/homekit/media/__init__.py
@@ -0,0 +1,22 @@
+import importlib
+import itertools
+
+__map__ = {
+ 'types': ['MediaNodeType'],
+ 'record_client': ['SoundRecordClient', 'CameraRecordClient', 'RecordClient'],
+ 'node_server': ['MediaNodeServer'],
+ 'node_client': ['SoundNodeClient', 'CameraNodeClient', 'MediaNodeClient'],
+ 'storage': ['SoundRecordStorage', 'ESP32CameraRecordStorage', 'SoundRecordFile', 'CameraRecordFile', 'RecordFile'],
+ 'record': ['SoundRecorder', 'CameraRecorder']
+}
+
+__all__ = list(itertools.chain(*__map__.values()))
+
+
+def __getattr__(name):
+ if name in __all__:
+ for file, names in __map__.items():
+ if name in names:
+ module = importlib.import_module(f'.{file}', __name__)
+ return getattr(module, name)
+ raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/py_include/homekit/media/__init__.pyi b/py_include/homekit/media/__init__.pyi
new file mode 100644
index 0000000..77c2176
--- /dev/null
+++ b/py_include/homekit/media/__init__.pyi
@@ -0,0 +1,27 @@
+from .types import (
+ MediaNodeType as MediaNodeType
+)
+from .record_client import (
+ SoundRecordClient as SoundRecordClient,
+ CameraRecordClient as CameraRecordClient,
+ RecordClient as RecordClient
+)
+from .node_server import (
+ MediaNodeServer as MediaNodeServer
+)
+from .node_client import (
+ SoundNodeClient as SoundNodeClient,
+ CameraNodeClient as CameraNodeClient,
+ MediaNodeClient as MediaNodeClient
+)
+from .storage import (
+ SoundRecordStorage as SoundRecordStorage,
+ ESP32CameraRecordStorage as ESP32CameraRecordStorage,
+ SoundRecordFile as SoundRecordFile,
+ CameraRecordFile as CameraRecordFile,
+ RecordFile as RecordFile
+)
+from .record import (
+ SoundRecorder as SoundRecorder,
+ CameraRecorder as CameraRecorder
+) \ No newline at end of file
diff --git a/py_include/homekit/media/node_client.py b/py_include/homekit/media/node_client.py
new file mode 100644
index 0000000..eb39898
--- /dev/null
+++ b/py_include/homekit/media/node_client.py
@@ -0,0 +1,119 @@
+import requests
+import shutil
+import logging
+
+from typing import Optional, Union, List
+from .storage import RecordFile
+from ..util import Addr
+from ..api.errors import ApiResponseError
+
+
+class MediaNodeClient:
+ def __init__(self, addr: Addr):
+ self.endpoint = f'http://{addr[0]}:{addr[1]}'
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def record(self, duration: int):
+ return self._call('record/', params={"duration": duration})
+
+ def record_info(self, record_id: int):
+ return self._call(f'record/info/{record_id}/')
+
+ def record_forget(self, record_id: int):
+ return self._call(f'record/forget/{record_id}/')
+
+ def record_download(self, record_id: int, output: str):
+ return self._call(f'record/download/{record_id}/', save_to=output)
+
+ def storage_list(self, extended=False, as_objects=False) -> Union[List[str], List[dict], List[RecordFile]]:
+ r = self._call('storage/list/', params={'extended': int(extended)})
+ files = r['files']
+ if as_objects:
+ return self.record_list_from_serialized(files)
+ return files
+
+ @staticmethod
+ def record_list_from_serialized(files: Union[List[str], List[dict]]):
+ new_files = []
+ for f in files:
+ kwargs = {'remote': True}
+ if isinstance(f, dict):
+ name = f['filename']
+ kwargs['remote_filesize'] = f['filesize']
+ else:
+ name = f
+ item = RecordFile.create(name, **kwargs)
+ new_files.append(item)
+ return new_files
+
+ def storage_delete(self, file_id: str):
+ return self._call('storage/delete/', params={'file_id': file_id})
+
+ def storage_download(self, file_id: str, output: str):
+ return self._call('storage/download/', params={'file_id': file_id}, save_to=output)
+
+ def _call(self,
+ method: str,
+ params: dict = None,
+ save_to: Optional[str] = None):
+ kwargs = {}
+ if isinstance(params, dict):
+ kwargs['params'] = params
+ if save_to:
+ kwargs['stream'] = True
+
+ url = f'{self.endpoint}/{method}'
+ self.logger.debug(f'calling {url}, kwargs: {kwargs}')
+
+ r = requests.get(url, **kwargs)
+ if r.status_code != 200:
+ response = r.json()
+ raise ApiResponseError(status_code=r.status_code,
+ error_type=response['error'],
+ error_message=response['message'] or None,
+ error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None)
+
+ if save_to:
+ r.raise_for_status()
+ with open(save_to, 'wb') as f:
+ shutil.copyfileobj(r.raw, f)
+ return True
+
+ return r.json()['response']
+
+
+class SoundNodeClient(MediaNodeClient):
+ def amixer_get_all(self):
+ return self._call('amixer/get-all/')
+
+ def amixer_get(self, control: str):
+ return self._call(f'amixer/get/{control}/')
+
+ def amixer_incr(self, control: str, step: Optional[int] = None):
+ params = {'step': step} if step is not None else None
+ return self._call(f'amixer/incr/{control}/', params=params)
+
+ def amixer_decr(self, control: str, step: Optional[int] = None):
+ params = {'step': step} if step is not None else None
+ return self._call(f'amixer/decr/{control}/', params=params)
+
+ def amixer_mute(self, control: str):
+ return self._call(f'amixer/mute/{control}/')
+
+ def amixer_unmute(self, control: str):
+ return self._call(f'amixer/unmute/{control}/')
+
+ def amixer_cap(self, control: str):
+ return self._call(f'amixer/cap/{control}/')
+
+ def amixer_nocap(self, control: str):
+ return self._call(f'amixer/nocap/{control}/')
+
+
+class CameraNodeClient(MediaNodeClient):
+ def capture(self,
+ save_to: str,
+ with_flash: bool = False):
+ return self._call('capture/',
+ {'with_flash': int(with_flash)},
+ save_to=save_to)
diff --git a/py_include/homekit/media/node_server.py b/py_include/homekit/media/node_server.py
new file mode 100644
index 0000000..5d0803c
--- /dev/null
+++ b/py_include/homekit/media/node_server.py
@@ -0,0 +1,86 @@
+from .. import http
+from .record import Recorder
+from .types import RecordStatus
+from .storage import RecordStorage
+
+
+class MediaNodeServer(http.HTTPServer):
+ recorder: Recorder
+ storage: RecordStorage
+
+ def __init__(self,
+ recorder: Recorder,
+ storage: RecordStorage,
+ *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self.recorder = recorder
+ self.storage = storage
+
+ self.get('/record/', self.do_record)
+ self.get('/record/info/{id}/', self.record_info)
+ self.get('/record/forget/{id}/', self.record_forget)
+ self.get('/record/download/{id}/', self.record_download)
+
+ self.get('/storage/list/', self.storage_list)
+ self.get('/storage/delete/', self.storage_delete)
+ self.get('/storage/download/', self.storage_download)
+
+ async def do_record(self, request: http.Request):
+ duration = int(request.query['duration'])
+ max = Recorder.get_max_record_time()*15
+ if not 0 < duration <= max:
+ raise ValueError(f'invalid duration: max duration is {max}')
+
+ record_id = self.recorder.record(duration)
+ return http.ok({'id': record_id})
+
+ async def record_info(self, request: http.Request):
+ record_id = int(request.match_info['id'])
+ info = self.recorder.get_info(record_id)
+ return http.ok(info.as_dict())
+
+ async def record_forget(self, request: http.Request):
+ record_id = int(request.match_info['id'])
+
+ info = self.recorder.get_info(record_id)
+ assert info.status in (RecordStatus.FINISHED, RecordStatus.ERROR), f"can't forget: record status is {info.status}"
+
+ self.recorder.forget(record_id)
+ return http.ok()
+
+ async def record_download(self, request: http.Request):
+ record_id = int(request.match_info['id'])
+
+ info = self.recorder.get_info(record_id)
+ assert info.status == RecordStatus.FINISHED, f"record status is {info.status}"
+
+ return http.FileResponse(info.file.path)
+
+ async def storage_list(self, request: http.Request):
+ extended = 'extended' in request.query and int(request.query['extended']) == 1
+
+ files = self.storage.getfiles(as_objects=extended)
+ if extended:
+ files = list(map(lambda file: file.__dict__(), files))
+
+ return http.ok({
+ 'files': files
+ })
+
+ async def storage_delete(self, request: http.Request):
+ file_id = request.query['file_id']
+ file = self.storage.find(file_id)
+ if not file:
+ raise ValueError(f'file {file} not found')
+
+ self.storage.delete(file)
+ return http.ok()
+
+ async def storage_download(self, request):
+ file_id = request.query['file_id']
+ file = self.storage.find(file_id)
+ if not file:
+ raise ValueError(f'file {file} not found')
+
+ return http.FileResponse(file.path)
diff --git a/py_include/homekit/media/record.py b/py_include/homekit/media/record.py
new file mode 100644
index 0000000..cd7447a
--- /dev/null
+++ b/py_include/homekit/media/record.py
@@ -0,0 +1,461 @@
+import os
+import threading
+import logging
+import time
+import subprocess
+import signal
+
+from typing import Optional, List, Dict
+from ..util import find_child_processes, Addr
+from ..config import config
+from .storage import RecordFile, RecordStorage
+from .types import RecordStatus
+from ..camera.types import CameraType
+
+
+_history_item_timeout = 7200
+_history_cleanup_freq = 3600
+
+
+class RecordHistoryItem:
+ id: int
+ request_time: float
+ start_time: float
+ stop_time: float
+ relations: List[int]
+ status: RecordStatus
+ error: Optional[Exception]
+ file: Optional[RecordFile]
+ creation_time: float
+
+ def __init__(self, id):
+ self.id = id
+ self.request_time = 0
+ self.start_time = 0
+ self.stop_time = 0
+ self.relations = []
+ self.status = RecordStatus.WAITING
+ self.file = None
+ self.error = None
+ self.creation_time = time.time()
+
+ def add_relation(self, related_id: int):
+ self.relations.append(related_id)
+
+ def mark_started(self, start_time: float):
+ self.start_time = start_time
+ self.status = RecordStatus.RECORDING
+
+ def mark_finished(self, end_time: float, file: RecordFile):
+ self.stop_time = end_time
+ self.file = file
+ self.status = RecordStatus.FINISHED
+
+ def mark_failed(self, error: Exception):
+ self.status = RecordStatus.ERROR
+ self.error = error
+
+ def as_dict(self) -> dict:
+ data = {
+ 'id': self.id,
+ 'request_time': self.request_time,
+ 'status': self.status.value,
+ 'relations': self.relations,
+ 'start_time': self.start_time,
+ 'stop_time': self.stop_time,
+ }
+ if self.error:
+ data['error'] = str(self.error)
+ if self.file:
+ data['file'] = self.file.__dict__()
+ return data
+
+
+class RecordingNotFoundError(Exception):
+ pass
+
+
+class RecordHistory:
+ history: Dict[int, RecordHistoryItem]
+
+ def __init__(self):
+ self.history = {}
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def add(self, record_id: int):
+ self.logger.debug(f'add: record_id={record_id}')
+
+ r = RecordHistoryItem(record_id)
+ r.request_time = time.time()
+
+ self.history[record_id] = r
+
+ def delete(self, record_id: int):
+ self.logger.debug(f'delete: record_id={record_id}')
+ del self.history[record_id]
+
+ def cleanup(self):
+ del_ids = []
+ for rid, item in self.history.items():
+ if item.creation_time < time.time()-_history_item_timeout:
+ del_ids.append(rid)
+ for rid in del_ids:
+ self.delete(rid)
+
+ def __getitem__(self, key):
+ if key not in self.history:
+ raise RecordingNotFoundError()
+
+ return self.history[key]
+
+ def __setitem__(self, key, value):
+ raise NotImplementedError('setting history item this way is prohibited')
+
+ def __contains__(self, key):
+ return key in self.history
+
+
+class Recording:
+ RECORDER_PROGRAM = None
+
+ start_time: float
+ stop_time: float
+ duration: int
+ record_id: int
+ recorder_program_pid: Optional[int]
+ process: Optional[subprocess.Popen]
+
+ g_record_id = 1
+
+ def __init__(self):
+ if self.RECORDER_PROGRAM is None:
+ raise RuntimeError('this is abstract class')
+
+ self.start_time = 0
+ self.stop_time = 0
+ self.duration = 0
+ self.process = None
+ self.recorder_program_pid = None
+ self.record_id = Recording.next_id()
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def is_started(self) -> bool:
+ return self.start_time > 0 and self.stop_time > 0
+
+ def is_waiting(self):
+ return self.duration > 0
+
+ def ask_for(self, duration) -> int:
+ overtime = 0
+ orig_duration = duration
+
+ if self.is_started():
+ already_passed = time.time() - self.start_time
+ max_duration = Recorder.get_max_record_time() - already_passed
+ self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}')
+ else:
+ max_duration = Recorder.get_max_record_time()
+
+ if duration > max_duration:
+ overtime = duration - max_duration
+ duration = max_duration
+
+ self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}')
+
+ self.duration += duration
+ if self.is_started():
+ til_end = self.stop_time - time.time()
+ if til_end < 0:
+ til_end = 0
+
+ _prev_stop_time = self.stop_time
+ _to_add = duration - til_end
+ if _to_add < 0:
+ _to_add = 0
+
+ self.stop_time += _to_add
+ self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})')
+
+ return overtime
+
+ def start(self, output: str):
+ assert self.start_time == 0 and self.stop_time == 0, "already started?!"
+ assert self.process is None, "self.process is not None, what the hell?"
+
+ cur = time.time()
+ self.start_time = cur
+ self.stop_time = cur + self.duration
+
+ cmd = self.get_command(output)
+ self.logger.debug(f'start: running `{cmd}`')
+ self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True)
+
+ sh_pid = self.process.pid
+ self.logger.debug(f'start: started, pid of shell is {sh_pid}')
+
+ pid = self.find_recorder_program_pid(sh_pid)
+ if pid is not None:
+ self.recorder_program_pid = pid
+ self.logger.debug(f'start: pid of {self.RECORDER_PROGRAM} is {pid}')
+
+ def get_command(self, output: str) -> str:
+ pass
+
+ def stop(self):
+ if self.process:
+ if self.recorder_program_pid is None:
+ self.recorder_program_pid = self.find_recorder_program_pid(self.process.pid)
+
+ if self.recorder_program_pid is not None:
+ os.kill(self.recorder_program_pid, signal.SIGINT)
+ timeout = config['node']['process_wait_timeout']
+
+ self.logger.debug(f'stop: sent SIGINT to {self.recorder_program_pid}. now waiting up to {timeout} seconds...')
+ try:
+ self.process.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ self.logger.warning(f'stop: wait({timeout}): timeout expired, killing it')
+ try:
+ os.kill(self.recorder_program_pid, signal.SIGKILL)
+ self.process.terminate()
+ except Exception as exc:
+ self.logger.exception(exc)
+ else:
+ self.logger.warning(f'stop: pid of {self.RECORDER_PROGRAM} is unknown, calling terminate()')
+ self.process.terminate()
+
+ rc = self.process.returncode
+ self.logger.debug(f'stop: rc={rc}')
+
+ self.process = None
+ self.recorder_program_pid = 0
+
+ self.duration = 0
+ self.start_time = 0
+ self.stop_time = 0
+
+ def find_recorder_program_pid(self, sh_pid: int):
+ try:
+ children = find_child_processes(sh_pid)
+ except OSError as exc:
+ self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc))
+ return None
+
+ for child in children:
+ if self.RECORDER_PROGRAM in child.cmd:
+ return child.pid
+
+ return None
+
+ @staticmethod
+ def next_id() -> int:
+ cur_id = Recording.g_record_id
+ Recording.g_record_id += 1
+ return cur_id
+
+ def increment_id(self):
+ self.record_id = Recording.next_id()
+
+
+class Recorder:
+ TEMP_NAME = None
+
+ interrupted: bool
+ lock: threading.Lock
+ history_lock: threading.Lock
+ recording: Optional[Recording]
+ overtime: int
+ history: RecordHistory
+ next_history_cleanup_time: float
+ storage: RecordStorage
+
+ def __init__(self,
+ storage: RecordStorage,
+ recording: Recording):
+ if self.TEMP_NAME is None:
+ raise RuntimeError('this is abstract class')
+
+ self.storage = storage
+ self.recording = recording
+ self.interrupted = False
+ self.lock = threading.Lock()
+ self.history_lock = threading.Lock()
+ self.overtime = 0
+ self.history = RecordHistory()
+ self.next_history_cleanup_time = 0
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def start_thread(self):
+ t = threading.Thread(target=self.loop)
+ t.daemon = True
+ t.start()
+
+ def loop(self) -> None:
+ tempname = os.path.join(self.storage.root, self.TEMP_NAME)
+
+ while not self.interrupted:
+ cur = time.time()
+ stopped = False
+ cur_record_id = None
+
+ if self.next_history_cleanup_time == 0:
+ self.next_history_cleanup_time = time.time() + _history_cleanup_freq
+ elif self.next_history_cleanup_time <= time.time():
+ self.logger.debug('loop: calling history.cleanup()')
+ try:
+ self.history.cleanup()
+ except Exception as e:
+ self.logger.error('loop: error while history.cleanup(): ' + str(e))
+ self.next_history_cleanup_time = time.time() + _history_cleanup_freq
+
+ with self.lock:
+ cur_record_id = self.recording.record_id
+ # self.logger.debug(f'cur_record_id={cur_record_id}')
+
+ if not self.recording.is_started():
+ if self.recording.is_waiting():
+ try:
+ if os.path.exists(tempname):
+ self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..')
+ try:
+ os.unlink(tempname)
+ except OSError as e:
+ self.logger.exception(e)
+ self.recording.start(tempname)
+ with self.history_lock:
+ self.history[cur_record_id].mark_started(self.recording.start_time)
+ except Exception as exc:
+ self.logger.exception(exc)
+
+ # there should not be any errors, but still..
+ try:
+ self.recording.stop()
+ except Exception as exc:
+ self.logger.exception(exc)
+
+ with self.history_lock:
+ self.history[cur_record_id].mark_failed(exc)
+
+ self.logger.debug(f'loop: start exc path: calling increment_id()')
+ self.recording.increment_id()
+ else:
+ if cur >= self.recording.stop_time:
+ try:
+ start_time = self.recording.start_time
+ stop_time = self.recording.stop_time
+ self.recording.stop()
+
+ saved_name = self.storage.save(tempname,
+ record_id=cur_record_id,
+ start_time=int(start_time),
+ stop_time=int(stop_time))
+
+ with self.history_lock:
+ self.history[cur_record_id].mark_finished(stop_time, saved_name)
+ except Exception as exc:
+ self.logger.exception(exc)
+ with self.history_lock:
+ self.history[cur_record_id].mark_failed(exc)
+ finally:
+ self.logger.debug(f'loop: stop exc final path: calling increment_id()')
+ self.recording.increment_id()
+
+ stopped = True
+
+ if stopped and self.overtime > 0:
+ self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})')
+ _overtime = self.overtime
+ self.overtime = 0
+
+ related_id = self.record(_overtime)
+ self.logger.info(f'enqueued another record with id {related_id}')
+
+ if cur_record_id is not None:
+ with self.history_lock:
+ self.history[cur_record_id].add_relation(related_id)
+
+ time.sleep(0.2)
+
+ def record(self, duration: int) -> int:
+ self.logger.debug(f'record: duration={duration}')
+ with self.lock:
+ overtime = self.recording.ask_for(duration)
+ self.logger.debug(f'overtime={overtime}')
+
+ if overtime > self.overtime:
+ self.overtime = overtime
+
+ if not self.recording.is_started():
+ with self.history_lock:
+ self.history.add(self.recording.record_id)
+
+ return self.recording.record_id
+
+ def stop(self):
+ self.interrupted = True
+
+ def get_info(self, record_id: int) -> RecordHistoryItem:
+ with self.history_lock:
+ return self.history[record_id]
+
+ def forget(self, record_id: int):
+ with self.history_lock:
+ self.logger.info(f'forget: removing record {record_id} from history')
+ self.history.delete(record_id)
+
+ @staticmethod
+ def get_max_record_time() -> int:
+ return config['node']['record_max_time']
+
+
+class SoundRecorder(Recorder):
+ TEMP_NAME = 'temp.mp3'
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(recording=SoundRecording(),
+ *args, **kwargs)
+
+
+class CameraRecorder(Recorder):
+ TEMP_NAME = 'temp.mp4'
+
+ def __init__(self,
+ camera_type: CameraType,
+ *args, **kwargs):
+ if camera_type == CameraType.ESP32:
+ recording = ESP32CameraRecording(stream_addr=kwargs['stream_addr'])
+ del kwargs['stream_addr']
+ else:
+ raise RuntimeError(f'unsupported camera type {camera_type}')
+
+ super().__init__(recording=recording,
+ *args, **kwargs)
+
+
+class SoundRecording(Recording):
+ RECORDER_PROGRAM = 'arecord'
+
+ def get_command(self, output: str) -> str:
+ arecord = config['arecord']['bin']
+ lame = config['lame']['bin']
+ b = config['lame']['bitrate']
+
+ return f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null'
+
+
+class ESP32CameraRecording(Recording):
+ RECORDER_PROGRAM = 'esp32_capture.py'
+
+ stream_addr: Addr
+
+ def __init__(self, stream_addr: Addr):
+ super().__init__()
+ self.stream_addr = stream_addr
+
+ def get_command(self, output: str) -> str:
+ bin = config['esp32_capture']['bin']
+ return f'{bin} --addr {self.stream_addr[0]}:{self.stream_addr[1]} --output-directory {output} >/dev/null 2>/dev/null'
+
+ def start(self, output: str):
+ output = os.path.dirname(output)
+ return super().start(output) \ No newline at end of file
diff --git a/py_include/homekit/media/record_client.py b/py_include/homekit/media/record_client.py
new file mode 100644
index 0000000..322495c
--- /dev/null
+++ b/py_include/homekit/media/record_client.py
@@ -0,0 +1,166 @@
+import time
+import logging
+import threading
+import os.path
+
+from tempfile import gettempdir
+from .record import RecordStatus
+from .node_client import SoundNodeClient, MediaNodeClient, CameraNodeClient
+from ..util import Addr
+from typing import Optional, Callable, Dict
+
+
+class RecordClient:
+ DOWNLOAD_EXTENSION = None
+
+ interrupted: bool
+ logger: logging.Logger
+ clients: Dict[str, MediaNodeClient]
+ awaiting: Dict[str, Dict[int, Optional[dict]]]
+ error_handler: Optional[Callable]
+ finished_handler: Optional[Callable]
+ download_on_finish: bool
+
+ def __init__(self,
+ nodes: Dict[str, Addr],
+ error_handler: Optional[Callable] = None,
+ finished_handler: Optional[Callable] = None,
+ download_on_finish=False):
+ if self.DOWNLOAD_EXTENSION is None:
+ raise RuntimeError('this is abstract class')
+
+ self.interrupted = False
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self.clients = {}
+ self.awaiting = {}
+
+ self.download_on_finish = download_on_finish
+ self.error_handler = error_handler
+ self.finished_handler = finished_handler
+
+ self.awaiting_lock = threading.Lock()
+
+ self.make_clients(nodes)
+
+ try:
+ t = threading.Thread(target=self.loop)
+ t.daemon = True
+ t.start()
+ except (KeyboardInterrupt, SystemExit) as exc:
+ self.stop()
+ self.logger.exception(exc)
+
+ def make_clients(self, nodes: Dict[str, Addr]):
+ pass
+
+ def stop(self):
+ self.interrupted = True
+
+ def loop(self):
+ while not self.interrupted:
+ for node in self.awaiting.keys():
+ with self.awaiting_lock:
+ record_ids = list(self.awaiting[node].keys())
+ if not record_ids:
+ continue
+
+ self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}')
+
+ cl = self.getclient(node)
+ del_ids = []
+ for rid in record_ids:
+ info = cl.record_info(rid)
+
+ if info['relations']:
+ for relid in info['relations']:
+ self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True)
+
+ status = RecordStatus(info['status'])
+ if status in (RecordStatus.FINISHED, RecordStatus.ERROR):
+ if status == RecordStatus.FINISHED:
+ if self.download_on_finish:
+ local_fn = self.download(node, rid, info['file']['fileid'])
+ else:
+ local_fn = None
+ self._report_finished(info, local_fn, self.awaiting[node][rid])
+ else:
+ self._report_error(info, self.awaiting[node][rid])
+ del_ids.append(rid)
+ self.logger.debug(f'record {rid}: status {status}')
+
+ if del_ids:
+ self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list')
+ with self.awaiting_lock:
+ for del_id in del_ids:
+ del self.awaiting[node][del_id]
+
+ time.sleep(5)
+
+ self.logger.info('loop ended')
+
+ def getclient(self, node: str):
+ return self.clients[node]
+
+ def record(self,
+ node: str,
+ duration: int,
+ userdata: Optional[dict] = None) -> int:
+ self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}')
+
+ cl = self.getclient(node)
+ record_id = cl.record(duration)['id']
+ self.logger.debug(f'record: request sent, record_id={record_id}')
+
+ self.wait_for_record(node, record_id, userdata)
+ return record_id
+
+ def wait_for_record(self,
+ node: str,
+ record_id: int,
+ userdata: Optional[dict] = None,
+ is_relative=False):
+ with self.awaiting_lock:
+ if record_id not in self.awaiting[node]:
+ msg = f'wait_for_record: adding {record_id} to {node}'
+ if is_relative:
+ msg += ' (by relation)'
+ self.logger.debug(msg)
+
+ self.awaiting[node][record_id] = userdata
+
+ def download(self, node: str, record_id: int, fileid: str):
+ dst = os.path.join(gettempdir(), f'{node}_{fileid}.{self.DOWNLOAD_EXTENSION}')
+ cl = self.getclient(node)
+ cl.record_download(record_id, dst)
+ return dst
+
+ def forget(self, node: str, rid: int):
+ self.getclient(node).record_forget(rid)
+
+ def _report_finished(self, *args):
+ if self.finished_handler:
+ self.finished_handler(*args)
+
+ def _report_error(self, *args):
+ if self.error_handler:
+ self.error_handler(*args)
+
+
+class SoundRecordClient(RecordClient):
+ DOWNLOAD_EXTENSION = 'mp3'
+ # clients: Dict[str, SoundNodeClient]
+
+ def make_clients(self, nodes: Dict[str, Addr]):
+ for node, addr in nodes.items():
+ self.clients[node] = SoundNodeClient(addr)
+ self.awaiting[node] = {}
+
+
+class CameraRecordClient(RecordClient):
+ DOWNLOAD_EXTENSION = 'mp4'
+ # clients: Dict[str, CameraNodeClient]
+
+ def make_clients(self, nodes: Dict[str, Addr]):
+ for node, addr in nodes.items():
+ self.clients[node] = CameraNodeClient(addr)
+ self.awaiting[node] = {} \ No newline at end of file
diff --git a/py_include/homekit/media/storage.py b/py_include/homekit/media/storage.py
new file mode 100644
index 0000000..dd74ff8
--- /dev/null
+++ b/py_include/homekit/media/storage.py
@@ -0,0 +1,210 @@
+import os
+import re
+import shutil
+import logging
+
+from typing import Optional, Union, List
+from datetime import datetime
+from ..util import strgen
+
+logger = logging.getLogger(__name__)
+
+
+# record file
+# -----------
+
+class RecordFile:
+ EXTENSION = None
+
+ start_time: Optional[datetime]
+ stop_time: Optional[datetime]
+ record_id: Optional[int]
+ name: str
+ file_id: Optional[str]
+ remote: bool
+ remote_filesize: int
+ storage_root: str
+
+ human_date_dmt = '%d.%m.%y'
+ human_time_fmt = '%H:%M:%S'
+
+ @staticmethod
+ def create(filename: str, *args, **kwargs):
+ if filename.endswith(f'.{SoundRecordFile.EXTENSION}'):
+ return SoundRecordFile(filename, *args, **kwargs)
+ elif filename.endswith(f'.{CameraRecordFile.EXTENSION}'):
+ return CameraRecordFile(filename, *args, **kwargs)
+ else:
+ raise RuntimeError(f'unsupported file extension: {filename}')
+
+ def __init__(self, filename: str, remote=False, remote_filesize=None, storage_root='/'):
+ if self.EXTENSION is None:
+ raise RuntimeError('this is abstract class')
+
+ self.name = filename
+ self.storage_root = storage_root
+
+ self.remote = remote
+ self.remote_filesize = remote_filesize
+
+ m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.'+self.EXTENSION+'$', filename)
+ if m:
+ self.start_time = datetime.strptime(m.group(1), RecordStorage.time_fmt)
+ self.stop_time = datetime.strptime(m.group(2), RecordStorage.time_fmt)
+ self.record_id = int(m.group(3))
+ self.file_id = (m.group(1) + '_' + m.group(2)).replace('-', '_')
+ else:
+ logger.warning(f'unexpected filename: {filename}')
+ self.start_time = None
+ self.stop_time = None
+ self.record_id = None
+ self.file_id = None
+
+ @property
+ def path(self):
+ if self.remote:
+ return RuntimeError('remote recording, can\'t get real path')
+
+ return os.path.realpath(os.path.join(
+ self.storage_root, self.name
+ ))
+
+ @property
+ def start_humantime(self) -> str:
+ if self.start_time is None:
+ return '?'
+ fmt = f'{RecordFile.human_date_dmt} {RecordFile.human_time_fmt}'
+ return self.start_time.strftime(fmt)
+
+ @property
+ def stop_humantime(self) -> str:
+ if self.stop_time is None:
+ return '?'
+ fmt = RecordFile.human_time_fmt
+ if self.start_time.date() != self.stop_time.date():
+ fmt = f'{RecordFile.human_date_dmt} {fmt}'
+ return self.stop_time.strftime(fmt)
+
+ @property
+ def start_unixtime(self) -> int:
+ if self.start_time is None:
+ return 0
+ return int(self.start_time.timestamp())
+
+ @property
+ def stop_unixtime(self) -> int:
+ if self.stop_time is None:
+ return 0
+ return int(self.stop_time.timestamp())
+
+ @property
+ def filesize(self):
+ if self.remote:
+ if self.remote_filesize is None:
+ raise RuntimeError('file is remote and remote_filesize is not set')
+ return self.remote_filesize
+ return os.path.getsize(self.path)
+
+ def __dict__(self) -> dict:
+ return {
+ 'start_unixtime': self.start_unixtime,
+ 'stop_unixtime': self.stop_unixtime,
+ 'filename': self.name,
+ 'filesize': self.filesize,
+ 'fileid': self.file_id,
+ 'record_id': self.record_id or 0,
+ }
+
+
+class PseudoRecordFile(RecordFile):
+ EXTENSION = 'null'
+
+ def __init__(self):
+ super().__init__('pseudo.null')
+
+ @property
+ def filesize(self):
+ return 0
+
+
+class SoundRecordFile(RecordFile):
+ EXTENSION = 'mp3'
+
+
+class CameraRecordFile(RecordFile):
+ EXTENSION = 'mp4'
+
+
+# record storage
+# --------------
+
+class RecordStorage:
+ EXTENSION = None
+
+ time_fmt = '%d%m%y-%H%M%S'
+
+ def __init__(self, root: str):
+ if self.EXTENSION is None:
+ raise RuntimeError('this is abstract class')
+
+ self.root = root
+
+ def getfiles(self, as_objects=False) -> Union[List[str], List[RecordFile]]:
+ files = []
+ for name in os.listdir(self.root):
+ path = os.path.join(self.root, name)
+ if os.path.isfile(path) and name.endswith(f'.{self.EXTENSION}'):
+ files.append(name if not as_objects else RecordFile.create(name, storage_root=self.root))
+ return files
+
+ def find(self, file_id: str) -> Optional[RecordFile]:
+ for name in os.listdir(self.root):
+ if os.path.isfile(os.path.join(self.root, name)) and name.endswith(f'.{self.EXTENSION}'):
+ item = RecordFile.create(name, storage_root=self.root)
+ if item.file_id == file_id:
+ return item
+ return None
+
+ def purge(self):
+ files = self.getfiles()
+ if files:
+ logger = logging.getLogger(self.__name__)
+ for f in files:
+ try:
+ path = os.path.join(self.root, f)
+ logger.debug(f'purge: deleting {path}')
+ os.unlink(path)
+ except OSError as exc:
+ logger.exception(exc)
+
+ def delete(self, file: RecordFile):
+ os.unlink(file.path)
+
+ def save(self,
+ fn: str,
+ record_id: int,
+ start_time: int,
+ stop_time: int) -> RecordFile:
+
+ start_time_s = datetime.fromtimestamp(start_time).strftime(self.time_fmt)
+ stop_time_s = datetime.fromtimestamp(stop_time).strftime(self.time_fmt)
+
+ dst_fn = f'{start_time_s}_{stop_time_s}_id{record_id}'
+ if os.path.exists(os.path.join(self.root, dst_fn)):
+ dst_fn += strgen(4)
+ dst_fn += f'.{self.EXTENSION}'
+ dst_path = os.path.join(self.root, dst_fn)
+
+ shutil.move(fn, dst_path)
+ return RecordFile.create(dst_fn, storage_root=self.root)
+
+
+class SoundRecordStorage(RecordStorage):
+ EXTENSION = 'mp3'
+
+
+class ESP32CameraRecordStorage(RecordStorage):
+ EXTENSION = 'jpg' # not used anyway
+
+ def save(self, *args, **kwargs):
+ return PseudoRecordFile() \ No newline at end of file
diff --git a/py_include/homekit/media/types.py b/py_include/homekit/media/types.py
new file mode 100644
index 0000000..acbc291
--- /dev/null
+++ b/py_include/homekit/media/types.py
@@ -0,0 +1,13 @@
+from enum import Enum, auto
+
+
+class MediaNodeType(Enum):
+ SOUND = auto()
+ CAMERA = auto()
+
+
+class RecordStatus(Enum):
+ WAITING = auto()
+ RECORDING = auto()
+ FINISHED = auto()
+ ERROR = auto()