summaryrefslogtreecommitdiff
path: root/src/home/media/record_client.py
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2022-06-14 02:44:43 +0300
committerEvgeny Zinoviev <me@ch1p.io>2022-06-14 22:56:46 +0300
commite3d3d6b76010a6dd5c417f017339bec17fb07887 (patch)
tree42cb6194504ae863db2bf7d21ef9e2acd41d0fd2 /src/home/media/record_client.py
parent600fdf99ffd893857c9cdb9e68140766a963bd17 (diff)
media: refactor sound_node, introduce camera_node
Diffstat (limited to 'src/home/media/record_client.py')
-rw-r--r--src/home/media/record_client.py166
1 files changed, 166 insertions, 0 deletions
diff --git a/src/home/media/record_client.py b/src/home/media/record_client.py
new file mode 100644
index 0000000..f264155
--- /dev/null
+++ b/src/home/media/record_client.py
@@ -0,0 +1,166 @@
+import time
+import logging
+import threading
+import os.path
+
+from tempfile import gettempdir
+from .record import RecordStatus
+from .node_client import SoundNodeClient, MediaNodeClient, CameraNodeClient
+from ..util import Addr
+from typing import Optional, Callable
+
+
+class RecordClient:
+ DOWNLOAD_EXTENSION = None
+
+ interrupted: bool
+ logger: logging.Logger
+ clients: dict[str, MediaNodeClient]
+ awaiting: dict[str, dict[int, Optional[dict]]]
+ error_handler: Optional[Callable]
+ finished_handler: Optional[Callable]
+ download_on_finish: bool
+
+ def __init__(self,
+ nodes: dict[str, Addr],
+ error_handler: Optional[Callable] = None,
+ finished_handler: Optional[Callable] = None,
+ download_on_finish=False):
+ if self.DOWNLOAD_EXTENSION is None:
+ raise RuntimeError('this is abstract class')
+
+ self.interrupted = False
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self.clients = {}
+ self.awaiting = {}
+
+ self.download_on_finish = download_on_finish
+ self.error_handler = error_handler
+ self.finished_handler = finished_handler
+
+ self.awaiting_lock = threading.Lock()
+
+ self.make_clients(nodes)
+
+ try:
+ t = threading.Thread(target=self.loop)
+ t.daemon = True
+ t.start()
+ except (KeyboardInterrupt, SystemExit) as exc:
+ self.stop()
+ self.logger.exception(exc)
+
+ def make_clients(self, nodes: dict[str, Addr]):
+ pass
+
+ def stop(self):
+ self.interrupted = True
+
+ def loop(self):
+ while not self.interrupted:
+ for node in self.awaiting.keys():
+ with self.awaiting_lock:
+ record_ids = list(self.awaiting[node].keys())
+ if not record_ids:
+ continue
+
+ self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}')
+
+ cl = self.getclient(node)
+ del_ids = []
+ for rid in record_ids:
+ info = cl.record_info(rid)
+
+ if info['relations']:
+ for relid in info['relations']:
+ self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True)
+
+ status = RecordStatus(info['status'])
+ if status in (RecordStatus.FINISHED, RecordStatus.ERROR):
+ if status == RecordStatus.FINISHED:
+ if self.download_on_finish:
+ local_fn = self.download(node, rid, info['file']['fileid'])
+ else:
+ local_fn = None
+ self._report_finished(info, local_fn, self.awaiting[node][rid])
+ else:
+ self._report_error(info, self.awaiting[node][rid])
+ del_ids.append(rid)
+ self.logger.debug(f'record {rid}: status {status}')
+
+ if del_ids:
+ self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list')
+ with self.awaiting_lock:
+ for del_id in del_ids:
+ del self.awaiting[node][del_id]
+
+ time.sleep(5)
+
+ self.logger.info('loop ended')
+
+ def getclient(self, node: str):
+ return self.clients[node]
+
+ def record(self,
+ node: str,
+ duration: int,
+ userdata: Optional[dict] = None) -> int:
+ self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}')
+
+ cl = self.getclient(node)
+ record_id = cl.record(duration)['id']
+ self.logger.debug(f'record: request sent, record_id={record_id}')
+
+ self.wait_for_record(node, record_id, userdata)
+ return record_id
+
+ def wait_for_record(self,
+ node: str,
+ record_id: int,
+ userdata: Optional[dict] = None,
+ is_relative=False):
+ with self.awaiting_lock:
+ if record_id not in self.awaiting[node]:
+ msg = f'wait_for_record: adding {record_id} to {node}'
+ if is_relative:
+ msg += ' (by relation)'
+ self.logger.debug(msg)
+
+ self.awaiting[node][record_id] = userdata
+
+ def download(self, node: str, record_id: int, fileid: str):
+ dst = os.path.join(gettempdir(), f'{node}_{fileid}.{self.DOWNLOAD_EXTENSION}')
+ cl = self.getclient(node)
+ cl.record_download(record_id, dst)
+ return dst
+
+ def forget(self, node: str, rid: int):
+ self.getclient(node).record_forget(rid)
+
+ def _report_finished(self, *args):
+ if self.finished_handler:
+ self.finished_handler(*args)
+
+ def _report_error(self, *args):
+ if self.error_handler:
+ self.error_handler(*args)
+
+
+class SoundRecordClient(RecordClient):
+ DOWNLOAD_EXTENSION = 'mp3'
+ # clients: dict[str, SoundNodeClient]
+
+ def make_clients(self, nodes: dict[str, Addr]):
+ for node, addr in nodes.items():
+ self.clients[node] = SoundNodeClient(addr)
+ self.awaiting[node] = {}
+
+
+class CameraRecordClient(RecordClient):
+ DOWNLOAD_EXTENSION = 'mp4'
+ # clients: dict[str, CameraNodeClient]
+
+ def make_clients(self, nodes: dict[str, Addr]):
+ for node, addr in nodes.items():
+ self.clients[node] = CameraNodeClient(addr)
+ self.awaiting[node] = {} \ No newline at end of file