1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import requests
import shutil
import logging
from typing import Optional, Union
from .storage import RecordFile
from ..util import Addr
from ..api.errors import ApiResponseError
class MediaNodeClient:
def __init__(self, addr: Addr):
self.endpoint = f'http://{addr[0]}:{addr[1]}'
self.logger = logging.getLogger(self.__class__.__name__)
def record(self, duration: int):
return self._call('record/', params={"duration": duration})
def record_info(self, record_id: int):
return self._call(f'record/info/{record_id}/')
def record_forget(self, record_id: int):
return self._call(f'record/forget/{record_id}/')
def record_download(self, record_id: int, output: str):
return self._call(f'record/download/{record_id}/', save_to=output)
def storage_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]:
r = self._call('storage/list/', params={'extended': int(extended)})
files = r['files']
if as_objects:
return self.record_list_from_serialized(files)
return files
@staticmethod
def record_list_from_serialized(files: Union[list[str], list[dict]]):
new_files = []
for f in files:
kwargs = {'remote': True}
if isinstance(f, dict):
name = f['filename']
kwargs['remote_filesize'] = f['filesize']
else:
name = f
item = RecordFile.create(name, **kwargs)
new_files.append(item)
return new_files
def storage_delete(self, file_id: str):
return self._call('storage/delete/', params={'file_id': file_id})
def storage_download(self, file_id: str, output: str):
return self._call('storage/download/', params={'file_id': file_id}, save_to=output)
def _call(self,
method: str,
params: dict = None,
save_to: Optional[str] = None):
kwargs = {}
if isinstance(params, dict):
kwargs['params'] = params
if save_to:
kwargs['stream'] = True
url = f'{self.endpoint}/{method}'
self.logger.debug(f'calling {url}, kwargs: {kwargs}')
r = requests.get(url, **kwargs)
if r.status_code != 200:
response = r.json()
raise ApiResponseError(status_code=r.status_code,
error_type=response['error'],
error_message=response['message'] or None,
error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None)
if save_to:
r.raise_for_status()
with open(save_to, 'wb') as f:
shutil.copyfileobj(r.raw, f)
return True
return r.json()['response']
class SoundNodeClient(MediaNodeClient):
def amixer_get_all(self):
return self._call('amixer/get-all/')
def amixer_get(self, control: str):
return self._call(f'amixer/get/{control}/')
def amixer_incr(self, control: str, step: Optional[int] = None):
params = {'step': step} if step is not None else None
return self._call(f'amixer/incr/{control}/', params=params)
def amixer_decr(self, control: str, step: Optional[int] = None):
params = {'step': step} if step is not None else None
return self._call(f'amixer/decr/{control}/', params=params)
def amixer_mute(self, control: str):
return self._call(f'amixer/mute/{control}/')
def amixer_unmute(self, control: str):
return self._call(f'amixer/unmute/{control}/')
def amixer_cap(self, control: str):
return self._call(f'amixer/cap/{control}/')
def amixer_nocap(self, control: str):
return self._call(f'amixer/nocap/{control}/')
class CameraNodeClient(MediaNodeClient):
def capture(self,
save_to: str,
with_flash: bool = False):
return self._call('capture/',
{'with_flash': int(with_flash)},
save_to=save_to)
|