summaryrefslogtreecommitdiff
path: root/src/home/media/record_client.py
blob: 322495cfb35c7f2c7d7f15d8d214a3a37d77b4ff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import time
import logging
import threading
import os.path

from tempfile import gettempdir
from .record import RecordStatus
from .node_client import SoundNodeClient, MediaNodeClient, CameraNodeClient
from ..util import Addr
from typing import Optional, Callable, Dict


class RecordClient:
    DOWNLOAD_EXTENSION = None

    interrupted: bool
    logger: logging.Logger
    clients: Dict[str, MediaNodeClient]
    awaiting: Dict[str, Dict[int, Optional[dict]]]
    error_handler: Optional[Callable]
    finished_handler: Optional[Callable]
    download_on_finish: bool

    def __init__(self,
                 nodes: Dict[str, Addr],
                 error_handler: Optional[Callable] = None,
                 finished_handler: Optional[Callable] = None,
                 download_on_finish=False):
        if self.DOWNLOAD_EXTENSION is None:
            raise RuntimeError('this is abstract class')

        self.interrupted = False
        self.logger = logging.getLogger(self.__class__.__name__)
        self.clients = {}
        self.awaiting = {}

        self.download_on_finish = download_on_finish
        self.error_handler = error_handler
        self.finished_handler = finished_handler

        self.awaiting_lock = threading.Lock()

        self.make_clients(nodes)

        try:
            t = threading.Thread(target=self.loop)
            t.daemon = True
            t.start()
        except (KeyboardInterrupt, SystemExit) as exc:
            self.stop()
            self.logger.exception(exc)

    def make_clients(self, nodes: Dict[str, Addr]):
        pass

    def stop(self):
        self.interrupted = True

    def loop(self):
        while not self.interrupted:
            for node in self.awaiting.keys():
                with self.awaiting_lock:
                    record_ids = list(self.awaiting[node].keys())
                if not record_ids:
                    continue

                self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}')

                cl = self.getclient(node)
                del_ids = []
                for rid in record_ids:
                    info = cl.record_info(rid)

                    if info['relations']:
                        for relid in info['relations']:
                            self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True)

                    status = RecordStatus(info['status'])
                    if status in (RecordStatus.FINISHED, RecordStatus.ERROR):
                        if status == RecordStatus.FINISHED:
                            if self.download_on_finish:
                                local_fn = self.download(node, rid, info['file']['fileid'])
                            else:
                                local_fn = None
                            self._report_finished(info, local_fn, self.awaiting[node][rid])
                        else:
                            self._report_error(info, self.awaiting[node][rid])
                        del_ids.append(rid)
                        self.logger.debug(f'record {rid}: status {status}')

                if del_ids:
                    self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list')
                    with self.awaiting_lock:
                        for del_id in del_ids:
                            del self.awaiting[node][del_id]

            time.sleep(5)

        self.logger.info('loop ended')

    def getclient(self, node: str):
        return self.clients[node]

    def record(self,
               node: str,
               duration: int,
               userdata: Optional[dict] = None) -> int:
        self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}')

        cl = self.getclient(node)
        record_id = cl.record(duration)['id']
        self.logger.debug(f'record: request sent, record_id={record_id}')

        self.wait_for_record(node, record_id, userdata)
        return record_id

    def wait_for_record(self,
                        node: str,
                        record_id: int,
                        userdata: Optional[dict] = None,
                        is_relative=False):
        with self.awaiting_lock:
            if record_id not in self.awaiting[node]:
                msg = f'wait_for_record: adding {record_id} to {node}'
                if is_relative:
                    msg += ' (by relation)'
                self.logger.debug(msg)

                self.awaiting[node][record_id] = userdata

    def download(self, node: str, record_id: int, fileid: str):
        dst = os.path.join(gettempdir(), f'{node}_{fileid}.{self.DOWNLOAD_EXTENSION}')
        cl = self.getclient(node)
        cl.record_download(record_id, dst)
        return dst

    def forget(self, node: str, rid: int):
        self.getclient(node).record_forget(rid)

    def _report_finished(self, *args):
        if self.finished_handler:
            self.finished_handler(*args)

    def _report_error(self, *args):
        if self.error_handler:
            self.error_handler(*args)


class SoundRecordClient(RecordClient):
    DOWNLOAD_EXTENSION = 'mp3'
    # clients: Dict[str, SoundNodeClient]

    def make_clients(self, nodes: Dict[str, Addr]):
        for node, addr in nodes.items():
            self.clients[node] = SoundNodeClient(addr)
            self.awaiting[node] = {}


class CameraRecordClient(RecordClient):
    DOWNLOAD_EXTENSION = 'mp4'
    # clients: Dict[str, CameraNodeClient]

    def make_clients(self, nodes: Dict[str, Addr]):
        for node, addr in nodes.items():
            self.clients[node] = CameraNodeClient(addr)
            self.awaiting[node] = {}