1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
import time
import logging
import threading
import os.path
from tempfile import gettempdir
from .record import RecordStatus
from .node_client import SoundNodeClient, MediaNodeClient, CameraNodeClient
from ..util import Addr
from typing import Optional, Callable, Dict
class RecordClient:
DOWNLOAD_EXTENSION = None
interrupted: bool
logger: logging.Logger
clients: Dict[str, MediaNodeClient]
awaiting: Dict[str, Dict[int, Optional[dict]]]
error_handler: Optional[Callable]
finished_handler: Optional[Callable]
download_on_finish: bool
def __init__(self,
nodes: Dict[str, Addr],
error_handler: Optional[Callable] = None,
finished_handler: Optional[Callable] = None,
download_on_finish=False):
if self.DOWNLOAD_EXTENSION is None:
raise RuntimeError('this is abstract class')
self.interrupted = False
self.logger = logging.getLogger(self.__class__.__name__)
self.clients = {}
self.awaiting = {}
self.download_on_finish = download_on_finish
self.error_handler = error_handler
self.finished_handler = finished_handler
self.awaiting_lock = threading.Lock()
self.make_clients(nodes)
try:
t = threading.Thread(target=self.loop)
t.daemon = True
t.start()
except (KeyboardInterrupt, SystemExit) as exc:
self.stop()
self.logger.exception(exc)
def make_clients(self, nodes: Dict[str, Addr]):
pass
def stop(self):
self.interrupted = True
def loop(self):
while not self.interrupted:
for node in self.awaiting.keys():
with self.awaiting_lock:
record_ids = list(self.awaiting[node].keys())
if not record_ids:
continue
self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}')
cl = self.getclient(node)
del_ids = []
for rid in record_ids:
info = cl.record_info(rid)
if info['relations']:
for relid in info['relations']:
self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True)
status = RecordStatus(info['status'])
if status in (RecordStatus.FINISHED, RecordStatus.ERROR):
if status == RecordStatus.FINISHED:
if self.download_on_finish:
local_fn = self.download(node, rid, info['file']['fileid'])
else:
local_fn = None
self._report_finished(info, local_fn, self.awaiting[node][rid])
else:
self._report_error(info, self.awaiting[node][rid])
del_ids.append(rid)
self.logger.debug(f'record {rid}: status {status}')
if del_ids:
self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list')
with self.awaiting_lock:
for del_id in del_ids:
del self.awaiting[node][del_id]
time.sleep(5)
self.logger.info('loop ended')
def getclient(self, node: str):
return self.clients[node]
def record(self,
node: str,
duration: int,
userdata: Optional[dict] = None) -> int:
self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}')
cl = self.getclient(node)
record_id = cl.record(duration)['id']
self.logger.debug(f'record: request sent, record_id={record_id}')
self.wait_for_record(node, record_id, userdata)
return record_id
def wait_for_record(self,
node: str,
record_id: int,
userdata: Optional[dict] = None,
is_relative=False):
with self.awaiting_lock:
if record_id not in self.awaiting[node]:
msg = f'wait_for_record: adding {record_id} to {node}'
if is_relative:
msg += ' (by relation)'
self.logger.debug(msg)
self.awaiting[node][record_id] = userdata
def download(self, node: str, record_id: int, fileid: str):
dst = os.path.join(gettempdir(), f'{node}_{fileid}.{self.DOWNLOAD_EXTENSION}')
cl = self.getclient(node)
cl.record_download(record_id, dst)
return dst
def forget(self, node: str, rid: int):
self.getclient(node).record_forget(rid)
def _report_finished(self, *args):
if self.finished_handler:
self.finished_handler(*args)
def _report_error(self, *args):
if self.error_handler:
self.error_handler(*args)
class SoundRecordClient(RecordClient):
DOWNLOAD_EXTENSION = 'mp3'
# clients: Dict[str, SoundNodeClient]
def make_clients(self, nodes: Dict[str, Addr]):
for node, addr in nodes.items():
self.clients[node] = SoundNodeClient(addr)
self.awaiting[node] = {}
class CameraRecordClient(RecordClient):
DOWNLOAD_EXTENSION = 'mp4'
# clients: Dict[str, CameraNodeClient]
def make_clients(self, nodes: Dict[str, Addr]):
for node, addr in nodes.items():
self.clients[node] = CameraNodeClient(addr)
self.awaiting[node] = {}
|