diff options
Diffstat (limited to 'src/home/media/record_client.py')
-rw-r--r-- | src/home/media/record_client.py | 166 |
1 files changed, 0 insertions, 166 deletions
diff --git a/src/home/media/record_client.py b/src/home/media/record_client.py deleted file mode 100644 index 322495c..0000000 --- a/src/home/media/record_client.py +++ /dev/null @@ -1,166 +0,0 @@ -import time -import logging -import threading -import os.path - -from tempfile import gettempdir -from .record import RecordStatus -from .node_client import SoundNodeClient, MediaNodeClient, CameraNodeClient -from ..util import Addr -from typing import Optional, Callable, Dict - - -class RecordClient: - DOWNLOAD_EXTENSION = None - - interrupted: bool - logger: logging.Logger - clients: Dict[str, MediaNodeClient] - awaiting: Dict[str, Dict[int, Optional[dict]]] - error_handler: Optional[Callable] - finished_handler: Optional[Callable] - download_on_finish: bool - - def __init__(self, - nodes: Dict[str, Addr], - error_handler: Optional[Callable] = None, - finished_handler: Optional[Callable] = None, - download_on_finish=False): - if self.DOWNLOAD_EXTENSION is None: - raise RuntimeError('this is abstract class') - - self.interrupted = False - self.logger = logging.getLogger(self.__class__.__name__) - self.clients = {} - self.awaiting = {} - - self.download_on_finish = download_on_finish - self.error_handler = error_handler - self.finished_handler = finished_handler - - self.awaiting_lock = threading.Lock() - - self.make_clients(nodes) - - try: - t = threading.Thread(target=self.loop) - t.daemon = True - t.start() - except (KeyboardInterrupt, SystemExit) as exc: - self.stop() - self.logger.exception(exc) - - def make_clients(self, nodes: Dict[str, Addr]): - pass - - def stop(self): - self.interrupted = True - - def loop(self): - while not self.interrupted: - for node in self.awaiting.keys(): - with self.awaiting_lock: - record_ids = list(self.awaiting[node].keys()) - if not record_ids: - continue - - self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}') - - cl = self.getclient(node) - del_ids = [] - for rid in record_ids: - info = cl.record_info(rid) - - if info['relations']: - for relid in info['relations']: - self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True) - - status = RecordStatus(info['status']) - if status in (RecordStatus.FINISHED, RecordStatus.ERROR): - if status == RecordStatus.FINISHED: - if self.download_on_finish: - local_fn = self.download(node, rid, info['file']['fileid']) - else: - local_fn = None - self._report_finished(info, local_fn, self.awaiting[node][rid]) - else: - self._report_error(info, self.awaiting[node][rid]) - del_ids.append(rid) - self.logger.debug(f'record {rid}: status {status}') - - if del_ids: - self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list') - with self.awaiting_lock: - for del_id in del_ids: - del self.awaiting[node][del_id] - - time.sleep(5) - - self.logger.info('loop ended') - - def getclient(self, node: str): - return self.clients[node] - - def record(self, - node: str, - duration: int, - userdata: Optional[dict] = None) -> int: - self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}') - - cl = self.getclient(node) - record_id = cl.record(duration)['id'] - self.logger.debug(f'record: request sent, record_id={record_id}') - - self.wait_for_record(node, record_id, userdata) - return record_id - - def wait_for_record(self, - node: str, - record_id: int, - userdata: Optional[dict] = None, - is_relative=False): - with self.awaiting_lock: - if record_id not in self.awaiting[node]: - msg = f'wait_for_record: adding {record_id} to {node}' - if is_relative: - msg += ' (by relation)' - self.logger.debug(msg) - - self.awaiting[node][record_id] = userdata - - def download(self, node: str, record_id: int, fileid: str): - dst = os.path.join(gettempdir(), f'{node}_{fileid}.{self.DOWNLOAD_EXTENSION}') - cl = self.getclient(node) - cl.record_download(record_id, dst) - return dst - - def forget(self, node: str, rid: int): - self.getclient(node).record_forget(rid) - - def _report_finished(self, *args): - if self.finished_handler: - self.finished_handler(*args) - - def _report_error(self, *args): - if self.error_handler: - self.error_handler(*args) - - -class SoundRecordClient(RecordClient): - DOWNLOAD_EXTENSION = 'mp3' - # clients: Dict[str, SoundNodeClient] - - def make_clients(self, nodes: Dict[str, Addr]): - for node, addr in nodes.items(): - self.clients[node] = SoundNodeClient(addr) - self.awaiting[node] = {} - - -class CameraRecordClient(RecordClient): - DOWNLOAD_EXTENSION = 'mp4' - # clients: Dict[str, CameraNodeClient] - - def make_clients(self, nodes: Dict[str, Addr]): - for node, addr in nodes.items(): - self.clients[node] = CameraNodeClient(addr) - self.awaiting[node] = {}
\ No newline at end of file |