diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2022-06-14 02:44:43 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2022-06-14 22:56:46 +0300 |
commit | e3d3d6b76010a6dd5c417f017339bec17fb07887 (patch) | |
tree | 42cb6194504ae863db2bf7d21ef9e2acd41d0fd2 /src/home/media/record_client.py | |
parent | 600fdf99ffd893857c9cdb9e68140766a963bd17 (diff) |
media: refactor sound_node, introduce camera_node
Diffstat (limited to 'src/home/media/record_client.py')
-rw-r--r-- | src/home/media/record_client.py | 166 |
1 files changed, 166 insertions, 0 deletions
diff --git a/src/home/media/record_client.py b/src/home/media/record_client.py new file mode 100644 index 0000000..f264155 --- /dev/null +++ b/src/home/media/record_client.py @@ -0,0 +1,166 @@ +import time +import logging +import threading +import os.path + +from tempfile import gettempdir +from .record import RecordStatus +from .node_client import SoundNodeClient, MediaNodeClient, CameraNodeClient +from ..util import Addr +from typing import Optional, Callable + + +class RecordClient: + DOWNLOAD_EXTENSION = None + + interrupted: bool + logger: logging.Logger + clients: dict[str, MediaNodeClient] + awaiting: dict[str, dict[int, Optional[dict]]] + error_handler: Optional[Callable] + finished_handler: Optional[Callable] + download_on_finish: bool + + def __init__(self, + nodes: dict[str, Addr], + error_handler: Optional[Callable] = None, + finished_handler: Optional[Callable] = None, + download_on_finish=False): + if self.DOWNLOAD_EXTENSION is None: + raise RuntimeError('this is abstract class') + + self.interrupted = False + self.logger = logging.getLogger(self.__class__.__name__) + self.clients = {} + self.awaiting = {} + + self.download_on_finish = download_on_finish + self.error_handler = error_handler + self.finished_handler = finished_handler + + self.awaiting_lock = threading.Lock() + + self.make_clients(nodes) + + try: + t = threading.Thread(target=self.loop) + t.daemon = True + t.start() + except (KeyboardInterrupt, SystemExit) as exc: + self.stop() + self.logger.exception(exc) + + def make_clients(self, nodes: dict[str, Addr]): + pass + + def stop(self): + self.interrupted = True + + def loop(self): + while not self.interrupted: + for node in self.awaiting.keys(): + with self.awaiting_lock: + record_ids = list(self.awaiting[node].keys()) + if not record_ids: + continue + + self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}') + + cl = self.getclient(node) + del_ids = [] + for rid in record_ids: + info = cl.record_info(rid) + + if info['relations']: + for relid in info['relations']: + self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True) + + status = RecordStatus(info['status']) + if status in (RecordStatus.FINISHED, RecordStatus.ERROR): + if status == RecordStatus.FINISHED: + if self.download_on_finish: + local_fn = self.download(node, rid, info['file']['fileid']) + else: + local_fn = None + self._report_finished(info, local_fn, self.awaiting[node][rid]) + else: + self._report_error(info, self.awaiting[node][rid]) + del_ids.append(rid) + self.logger.debug(f'record {rid}: status {status}') + + if del_ids: + self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list') + with self.awaiting_lock: + for del_id in del_ids: + del self.awaiting[node][del_id] + + time.sleep(5) + + self.logger.info('loop ended') + + def getclient(self, node: str): + return self.clients[node] + + def record(self, + node: str, + duration: int, + userdata: Optional[dict] = None) -> int: + self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}') + + cl = self.getclient(node) + record_id = cl.record(duration)['id'] + self.logger.debug(f'record: request sent, record_id={record_id}') + + self.wait_for_record(node, record_id, userdata) + return record_id + + def wait_for_record(self, + node: str, + record_id: int, + userdata: Optional[dict] = None, + is_relative=False): + with self.awaiting_lock: + if record_id not in self.awaiting[node]: + msg = f'wait_for_record: adding {record_id} to {node}' + if is_relative: + msg += ' (by relation)' + self.logger.debug(msg) + + self.awaiting[node][record_id] = userdata + + def download(self, node: str, record_id: int, fileid: str): + dst = os.path.join(gettempdir(), f'{node}_{fileid}.{self.DOWNLOAD_EXTENSION}') + cl = self.getclient(node) + cl.record_download(record_id, dst) + return dst + + def forget(self, node: str, rid: int): + self.getclient(node).record_forget(rid) + + def _report_finished(self, *args): + if self.finished_handler: + self.finished_handler(*args) + + def _report_error(self, *args): + if self.error_handler: + self.error_handler(*args) + + +class SoundRecordClient(RecordClient): + DOWNLOAD_EXTENSION = 'mp3' + # clients: dict[str, SoundNodeClient] + + def make_clients(self, nodes: dict[str, Addr]): + for node, addr in nodes.items(): + self.clients[node] = SoundNodeClient(addr) + self.awaiting[node] = {} + + +class CameraRecordClient(RecordClient): + DOWNLOAD_EXTENSION = 'mp4' + # clients: dict[str, CameraNodeClient] + + def make_clients(self, nodes: dict[str, Addr]): + for node, addr in nodes.items(): + self.clients[node] = CameraNodeClient(addr) + self.awaiting[node] = {}
\ No newline at end of file |