summaryrefslogtreecommitdiff
path: root/src/home/sound
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-11-27 16:17:05 +0300
committerEvgeny Zinoviev <me@ch1p.io>2022-04-24 01:33:04 +0300
commitc412bf2ee0a3fbf9032fc32a26837d4fbc7585c5 (patch)
tree5cca6bcab79331ad82cab4219c7692b9dd4eea21 /src/home/sound
initial public
Diffstat (limited to 'src/home/sound')
-rw-r--r--src/home/sound/__init__.py8
-rw-r--r--src/home/sound/amixer.py91
-rw-r--r--src/home/sound/node_client.py109
-rw-r--r--src/home/sound/record.py400
-rw-r--r--src/home/sound/record_client.py142
-rw-r--r--src/home/sound/storage.py155
6 files changed, 905 insertions, 0 deletions
diff --git a/src/home/sound/__init__.py b/src/home/sound/__init__.py
new file mode 100644
index 0000000..43ddaff
--- /dev/null
+++ b/src/home/sound/__init__.py
@@ -0,0 +1,8 @@
+from .node_client import SoundNodeClient
+from .record import (
+ RecordStatus,
+ RecordingNotFoundError,
+ Recorder,
+)
+from .storage import RecordStorage, RecordFile
+from .record_client import RecordClient
diff --git a/src/home/sound/amixer.py b/src/home/sound/amixer.py
new file mode 100644
index 0000000..0ab2c64
--- /dev/null
+++ b/src/home/sound/amixer.py
@@ -0,0 +1,91 @@
+import subprocess
+
+from ..config import config
+from threading import Lock
+from typing import Union
+
+
+_lock = Lock()
+_default_step = 5
+
+
+def has_control(s: str) -> bool:
+ for control in config['amixer']['controls']:
+ if control['name'] == s:
+ return True
+ return False
+
+
+def get_caps(s: str) -> list[str]:
+ for control in config['amixer']['controls']:
+ if control['name'] == s:
+ return control['caps']
+ raise KeyError(f'control {s} not found')
+
+
+def get_all() -> list:
+ controls = []
+ for control in config['amixer']['controls']:
+ controls.append({
+ 'name': control['name'],
+ 'info': get(control['name']),
+ 'caps': control['caps']
+ })
+ return controls
+
+
+def get(control: str):
+ return call('get', control)
+
+
+def mute(control):
+ return call('set', control, 'mute')
+
+
+def unmute(control):
+ return call('set', control, 'unmute')
+
+
+def cap(control):
+ return call('set', control, 'cap')
+
+
+def nocap(control):
+ return call('set', control, 'nocap')
+
+
+def _get_default_step() -> int:
+ if 'step' in config['amixer']:
+ return int(config['amixer']['step'])
+
+ return _default_step
+
+
+def incr(control, step=None):
+ if step is None:
+ step = _get_default_step()
+ return call('set', control, f'{step}%+')
+
+
+def decr(control, step=None):
+ if step is None:
+ step = _get_default_step()
+ return call('set', control, f'{step}%-')
+
+
+def call(*args, return_code=False) -> Union[int, str]:
+ with _lock:
+ result = subprocess.run([config['amixer']['bin'], *args],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ if return_code:
+ return result.returncode
+
+ if result.returncode != 0:
+ raise AmixerError(result.stderr.decode().strip())
+
+ return result.stdout.decode().strip()
+
+
+class AmixerError(OSError):
+ pass
diff --git a/src/home/sound/node_client.py b/src/home/sound/node_client.py
new file mode 100644
index 0000000..7341208
--- /dev/null
+++ b/src/home/sound/node_client.py
@@ -0,0 +1,109 @@
+import requests
+import logging
+import shutil
+
+from ..util import Addr
+from ..api.errors import ApiResponseError
+from typing import Optional, Union
+from .record import RecordFile
+
+
+class SoundNodeClient:
+ def __init__(self, addr: Addr):
+ self.endpoint = f'http://{addr[0]}:{addr[1]}'
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def amixer_get_all(self):
+ return self._call('amixer/get-all/')
+
+ def amixer_get(self, control: str):
+ return self._call(f'amixer/get/{control}/')
+
+ def amixer_incr(self, control: str, step: Optional[int] = None):
+ params = {'step': step} if step is not None else None
+ return self._call(f'amixer/incr/{control}/', params=params)
+
+ def amixer_decr(self, control: str, step: Optional[int] = None):
+ params = {'step': step} if step is not None else None
+ return self._call(f'amixer/decr/{control}/', params=params)
+
+ def amixer_mute(self, control: str):
+ return self._call(f'amixer/mute/{control}/')
+
+ def amixer_unmute(self, control: str):
+ return self._call(f'amixer/unmute/{control}/')
+
+ def amixer_cap(self, control: str):
+ return self._call(f'amixer/cap/{control}/')
+
+ def amixer_nocap(self, control: str):
+ return self._call(f'amixer/nocap/{control}/')
+
+ def record(self, duration: int):
+ return self._call('record/', params={"duration": duration})
+
+ def record_info(self, record_id: int):
+ return self._call(f'record/info/{record_id}/')
+
+ def record_forget(self, record_id: int):
+ return self._call(f'record/forget/{record_id}/')
+
+ def record_download(self, record_id: int, output: str):
+ return self._call(f'record/download/{record_id}/', save_to=output)
+
+ def storage_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]:
+ r = self._call('storage/list/', params={'extended': int(extended)})
+ files = r['files']
+ if as_objects:
+ return self.record_list_from_serialized(files)
+ return files
+
+ @staticmethod
+ def record_list_from_serialized(files: Union[list[str], list[dict]]):
+ new_files = []
+ for f in files:
+ kwargs = {'remote': True}
+ if isinstance(f, dict):
+ name = f['filename']
+ kwargs['remote_filesize'] = f['filesize']
+ else:
+ name = f
+ item = RecordFile(name, **kwargs)
+ new_files.append(item)
+ return new_files
+
+ def storage_delete(self, file_id: str):
+ return self._call('storage/delete/', params={'file_id': file_id})
+
+ def storage_download(self, file_id: str, output: str):
+ return self._call('storage/download/', params={'file_id': file_id}, save_to=output)
+
+ def _call(self,
+ method: str,
+ params: dict = None,
+ save_to: Optional[str] = None):
+
+ kwargs = {}
+ if isinstance(params, dict):
+ kwargs['params'] = params
+ if save_to:
+ kwargs['stream'] = True
+
+ url = f'{self.endpoint}/{method}'
+ self.logger.debug(f'calling {url}, kwargs: {kwargs}')
+
+ r = requests.get(url, **kwargs)
+ if r.status_code != 200:
+ response = r.json()
+ raise ApiResponseError(status_code=r.status_code,
+ error_type=response['error'],
+ error_message=response['message'] or None,
+ error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None)
+
+ if save_to:
+ r.raise_for_status()
+ with open(save_to, 'wb') as f:
+ shutil.copyfileobj(r.raw, f)
+ return True
+
+ return r.json()['response']
diff --git a/src/home/sound/record.py b/src/home/sound/record.py
new file mode 100644
index 0000000..1ad8827
--- /dev/null
+++ b/src/home/sound/record.py
@@ -0,0 +1,400 @@
+import threading
+import time
+import subprocess
+import signal
+import os
+import logging
+
+from enum import Enum, auto
+from typing import Optional
+from ..config import config
+from ..util import find_child_processes
+from .storage import RecordFile, RecordStorage
+
+
+_history_item_timeout = 7200
+_history_cleanup_freq = 3600
+
+
+class RecordStatus(Enum):
+ WAITING = auto()
+ RECORDING = auto()
+ FINISHED = auto()
+ ERROR = auto()
+
+
+class RecordHistoryItem:
+ id: int
+ request_time: float
+ start_time: float
+ stop_time: float
+ relations: list[int]
+ status: RecordStatus
+ error: Optional[Exception]
+ file: Optional[RecordFile]
+ creation_time: float
+
+ def __init__(self, id):
+ self.id = id
+ self.request_time = 0
+ self.start_time = 0
+ self.stop_time = 0
+ self.relations = []
+ self.status = RecordStatus.WAITING
+ self.file = None
+ self.error = None
+ self.creation_time = time.time()
+
+ def add_relation(self, related_id: int):
+ self.relations.append(related_id)
+
+ def mark_started(self, start_time: float):
+ self.start_time = start_time
+ self.status = RecordStatus.RECORDING
+
+ def mark_finished(self, end_time: float, file: RecordFile):
+ self.stop_time = end_time
+ self.file = file
+ self.status = RecordStatus.FINISHED
+
+ def mark_failed(self, error: Exception):
+ self.status = RecordStatus.ERROR
+ self.error = error
+
+ def as_dict(self) -> dict:
+ data = {
+ 'id': self.id,
+ 'request_time': self.request_time,
+ 'status': self.status.value,
+ 'relations': self.relations,
+ 'start_time': self.start_time,
+ 'stop_time': self.stop_time,
+ }
+ if self.error:
+ data['error'] = str(self.error)
+ if self.file:
+ data['file'] = self.file.__dict__()
+ return data
+
+
+class RecordingNotFoundError(Exception):
+ pass
+
+
+class RecordHistory:
+ history: dict[int, RecordHistoryItem]
+
+ def __init__(self):
+ self.history = {}
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def add(self, record_id: int):
+ self.logger.debug(f'add: record_id={record_id}')
+
+ r = RecordHistoryItem(record_id)
+ r.request_time = time.time()
+
+ self.history[record_id] = r
+
+ def delete(self, record_id: int):
+ self.logger.debug(f'delete: record_id={record_id}')
+ del self.history[record_id]
+
+ def cleanup(self):
+ del_ids = []
+ for rid, item in self.history.items():
+ if item.creation_time < time.time()-_history_item_timeout:
+ del_ids.append(rid)
+ for rid in del_ids:
+ self.delete(rid)
+
+ def __getitem__(self, key):
+ if key not in self.history:
+ raise RecordingNotFoundError()
+
+ return self.history[key]
+
+ def __setitem__(self, key, value):
+ raise NotImplementedError('setting history item this way is prohibited')
+
+ def __contains__(self, key):
+ return key in self.history
+
+
+class Recording:
+ start_time: float
+ stop_time: float
+ duration: int
+ record_id: int
+ arecord_pid: Optional[int]
+ process: Optional[subprocess.Popen]
+
+ g_record_id = 1
+
+ def __init__(self):
+ self.start_time = 0
+ self.stop_time = 0
+ self.duration = 0
+ self.process = None
+ self.arecord_pid = None
+ self.record_id = Recording.next_id()
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def is_started(self) -> bool:
+ return self.start_time > 0 and self.stop_time > 0
+
+ def is_waiting(self):
+ return self.duration > 0
+
+ def ask_for(self, duration) -> int:
+ overtime = 0
+ orig_duration = duration
+
+ if self.is_started():
+ already_passed = time.time() - self.start_time
+ max_duration = Recorder.get_max_record_time() - already_passed
+ self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}')
+ else:
+ max_duration = Recorder.get_max_record_time()
+
+ if duration > max_duration:
+ overtime = duration - max_duration
+ duration = max_duration
+
+ self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}')
+
+ self.duration += duration
+ if self.is_started():
+ til_end = self.stop_time - time.time()
+ if til_end < 0:
+ til_end = 0
+
+ _prev_stop_time = self.stop_time
+ _to_add = duration - til_end
+ if _to_add < 0:
+ _to_add = 0
+
+ self.stop_time += _to_add
+ self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})')
+
+ return overtime
+
+ def start(self, output: str):
+ assert self.start_time == 0 and self.stop_time == 0, "already started?!"
+ assert self.process is None, "self.process is not None, what the hell?"
+
+ cur = time.time()
+ self.start_time = cur
+ self.stop_time = cur + self.duration
+
+ arecord = config['arecord']['bin']
+ lame = config['lame']['bin']
+ b = config['lame']['bitrate']
+
+ cmd = f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null'
+ self.logger.debug(f'start: running `{cmd}`')
+ self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True)
+
+ sh_pid = self.process.pid
+ self.logger.debug(f'start: started, pid of shell is {sh_pid}')
+
+ arecord_pid = self.find_arecord_pid(sh_pid)
+ if arecord_pid is not None:
+ self.arecord_pid = arecord_pid
+ self.logger.debug(f'start: pid of arecord is {arecord_pid}')
+
+ def stop(self):
+ if self.process:
+ if self.arecord_pid is None:
+ self.arecord_pid = self.find_arecord_pid(self.process.pid)
+
+ if self.arecord_pid is not None:
+ os.kill(self.arecord_pid, signal.SIGINT)
+ timeout = config['node']['process_wait_timeout']
+
+ self.logger.debug(f'stop: sent SIGINT to {self.arecord_pid}. now waiting up to {timeout} seconds...')
+ try:
+ self.process.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ self.logger.warning(f'stop: wait({timeout}): timeout expired, calling terminate()')
+ self.process.terminate()
+ else:
+ self.logger.warning('stop: pid of arecord is unknown, calling terminate()')
+ self.process.terminate()
+
+ rc = self.process.returncode
+ self.logger.debug(f'stop: rc={rc}')
+
+ self.process = None
+ self.arecord_pid = 0
+
+ self.duration = 0
+ self.start_time = 0
+ self.stop_time = 0
+
+ def find_arecord_pid(self, sh_pid: int):
+ try:
+ children = find_child_processes(sh_pid)
+ except OSError as exc:
+ self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc))
+ return None
+
+ for child in children:
+ if 'arecord' in child.cmd:
+ return child.pid
+
+ return None
+
+ @staticmethod
+ def next_id() -> int:
+ cur_id = Recording.g_record_id
+ Recording.g_record_id += 1
+ return cur_id
+
+ def increment_id(self):
+ self.record_id = Recording.next_id()
+
+
+class Recorder:
+ interrupted: bool
+ lock: threading.Lock
+ history_lock: threading.Lock
+ recording: Optional[Recording]
+ overtime: int
+ history: RecordHistory
+ next_history_cleanup_time: float
+ storage: RecordStorage
+
+ def __init__(self, storage: RecordStorage):
+ self.storage = storage
+ self.recording = Recording()
+ self.interrupted = False
+ self.lock = threading.Lock()
+ self.history_lock = threading.Lock()
+ self.overtime = 0
+ self.history = RecordHistory()
+ self.next_history_cleanup_time = 0
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def start_thread(self):
+ t = threading.Thread(target=self.loop)
+ t.daemon = True
+ t.start()
+
+ def loop(self) -> None:
+ tempname = os.path.join(self.storage.root, 'temp.mp3')
+
+ while not self.interrupted:
+ cur = time.time()
+ stopped = False
+ cur_record_id = None
+
+ if self.next_history_cleanup_time == 0:
+ self.next_history_cleanup_time = time.time() + _history_cleanup_freq
+ elif self.next_history_cleanup_time <= time.time():
+ self.logger.debug('loop: calling history.cleanup()')
+ try:
+ self.history.cleanup()
+ except Exception as e:
+ self.logger.error('loop: error while history.cleanup(): ' + str(e))
+ self.next_history_cleanup_time = time.time() + _history_cleanup_freq
+
+ with self.lock:
+ cur_record_id = self.recording.record_id
+ # self.logger.debug(f'cur_record_id={cur_record_id}')
+
+ if not self.recording.is_started():
+ if self.recording.is_waiting():
+ try:
+ if os.path.exists(tempname):
+ self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..')
+ try:
+ os.unlink(tempname)
+ except OSError as e:
+ self.logger.exception(e)
+ self.recording.start(tempname)
+ with self.history_lock:
+ self.history[cur_record_id].mark_started(self.recording.start_time)
+ except Exception as exc:
+ self.logger.exception(exc)
+
+ # there should not be any errors, but still..
+ try:
+ self.recording.stop()
+ except Exception as exc:
+ self.logger.exception(exc)
+
+ with self.history_lock:
+ self.history[cur_record_id].mark_failed(exc)
+
+ self.logger.debug(f'loop: start exc path: calling increment_id()')
+ self.recording.increment_id()
+ else:
+ if cur >= self.recording.stop_time:
+ try:
+ start_time = self.recording.start_time
+ stop_time = self.recording.stop_time
+ self.recording.stop()
+
+ saved_name = self.storage.save(tempname,
+ record_id=cur_record_id,
+ start_time=int(start_time),
+ stop_time=int(stop_time))
+
+ with self.history_lock:
+ self.history[cur_record_id].mark_finished(stop_time, saved_name)
+ except Exception as exc:
+ self.logger.exception(exc)
+ with self.history_lock:
+ self.history[cur_record_id].mark_failed(exc)
+ finally:
+ self.logger.debug(f'loop: stop exc final path: calling increment_id()')
+ self.recording.increment_id()
+
+ stopped = True
+
+ if stopped and self.overtime > 0:
+ self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})')
+ _overtime = self.overtime
+ self.overtime = 0
+
+ related_id = self.record(_overtime)
+ self.logger.info(f'enqueued another record with id {related_id}')
+
+ if cur_record_id is not None:
+ with self.history_lock:
+ self.history[cur_record_id].add_relation(related_id)
+
+ time.sleep(0.2)
+
+ def record(self, duration: int) -> int:
+ self.logger.debug(f'record: duration={duration}')
+ with self.lock:
+ overtime = self.recording.ask_for(duration)
+ self.logger.debug(f'overtime={overtime}')
+
+ if overtime > self.overtime:
+ self.overtime = overtime
+
+ if not self.recording.is_started():
+ with self.history_lock:
+ self.history.add(self.recording.record_id)
+
+ return self.recording.record_id
+
+ def stop(self):
+ self.interrupted = True
+
+ def get_info(self, record_id: int) -> RecordHistoryItem:
+ with self.history_lock:
+ return self.history[record_id]
+
+ def forget(self, record_id: int):
+ with self.history_lock:
+ self.logger.info(f'forget: removing record {record_id} from history')
+ self.history.delete(record_id)
+
+ @staticmethod
+ def get_max_record_time() -> int:
+ return config['node']['record_max_time']
+
diff --git a/src/home/sound/record_client.py b/src/home/sound/record_client.py
new file mode 100644
index 0000000..2744a8c
--- /dev/null
+++ b/src/home/sound/record_client.py
@@ -0,0 +1,142 @@
+import time
+import logging
+import threading
+import os.path
+
+from tempfile import gettempdir
+from .record import RecordStatus
+from .node_client import SoundNodeClient
+from ..util import Addr
+from typing import Optional, Callable
+
+
+class RecordClient:
+ interrupted: bool
+ logger: logging.Logger
+ clients: dict[str, SoundNodeClient]
+ awaiting: dict[str, dict[int, Optional[dict]]]
+ error_handler: Optional[Callable]
+ finished_handler: Optional[Callable]
+ download_on_finish: bool
+
+ def __init__(self,
+ nodes: dict[str, Addr],
+ error_handler: Optional[Callable] = None,
+ finished_handler: Optional[Callable] = None,
+ download_on_finish=False):
+ self.interrupted = False
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self.clients = {}
+ self.awaiting = {}
+ self.download_on_finish = download_on_finish
+
+ self.error_handler = error_handler
+ self.finished_handler = finished_handler
+
+ self.awaiting_lock = threading.Lock()
+
+ for node, addr in nodes.items():
+ self.clients[node] = SoundNodeClient(addr)
+ self.awaiting[node] = {}
+
+ try:
+ t = threading.Thread(target=self.loop)
+ t.daemon = True
+ t.start()
+ except (KeyboardInterrupt, SystemExit) as exc:
+ self.stop()
+ self.logger.exception(exc)
+
+ def stop(self):
+ self.interrupted = True
+
+ def loop(self):
+ while not self.interrupted:
+ # self.logger.debug('loop: tick')
+
+ for node in self.awaiting.keys():
+ with self.awaiting_lock:
+ record_ids = list(self.awaiting[node].keys())
+ if not record_ids:
+ continue
+
+ self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}')
+
+ cl = self.getclient(node)
+ del_ids = []
+ for rid in record_ids:
+ info = cl.record_info(rid)
+
+ if info['relations']:
+ for relid in info['relations']:
+ self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True)
+
+ status = RecordStatus(info['status'])
+ if status in (RecordStatus.FINISHED, RecordStatus.ERROR):
+ if status == RecordStatus.FINISHED:
+ if self.download_on_finish:
+ local_fn = self.download(node, rid, info['file']['fileid'])
+ else:
+ local_fn = None
+ self._report_finished(info, local_fn, self.awaiting[node][rid])
+ else:
+ self._report_error(info, self.awaiting[node][rid])
+ del_ids.append(rid)
+ self.logger.debug(f'record {rid}: status {status}')
+
+ if del_ids:
+ self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list')
+ with self.awaiting_lock:
+ for del_id in del_ids:
+ del self.awaiting[node][del_id]
+
+ time.sleep(5)
+
+ self.logger.info('loop ended')
+
+ def getclient(self, node: str):
+ return self.clients[node]
+
+ def record(self,
+ node: str,
+ duration: int,
+ userdata: Optional[dict] = None) -> int:
+ self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}')
+
+ cl = self.getclient(node)
+ record_id = cl.record(duration)['id']
+ self.logger.debug(f'record: request sent, record_id={record_id}')
+
+ self.wait_for_record(node, record_id, userdata)
+ return record_id
+
+ def wait_for_record(self,
+ node: str,
+ record_id: int,
+ userdata: Optional[dict] = None,
+ is_relative=False):
+ with self.awaiting_lock:
+ if record_id not in self.awaiting[node]:
+ msg = f'wait_for_record: adding {record_id} to {node}'
+ if is_relative:
+ msg += ' (by relation)'
+ self.logger.debug(msg)
+
+ self.awaiting[node][record_id] = userdata
+
+ def download(self, node: str, record_id: int, fileid: str):
+ dst = os.path.join(gettempdir(), f'{node}_{fileid}.mp3')
+ cl = self.getclient(node)
+ cl.record_download(record_id, dst)
+ return dst
+
+ def forget(self, node: str, rid: int):
+ self.getclient(node).record_forget(rid)
+
+ def _report_finished(self, *args):
+ if self.finished_handler:
+ self.finished_handler(*args)
+
+ def _report_error(self, *args):
+ if self.error_handler:
+ self.error_handler(*args)
diff --git a/src/home/sound/storage.py b/src/home/sound/storage.py
new file mode 100644
index 0000000..c61f6f6
--- /dev/null
+++ b/src/home/sound/storage.py
@@ -0,0 +1,155 @@
+import os
+import re
+import shutil
+import logging
+
+from typing import Optional, Union
+from datetime import datetime
+from ..util import strgen
+
+logger = logging.getLogger(__name__)
+
+
+class RecordFile:
+ start_time: Optional[datetime]
+ stop_time: Optional[datetime]
+ record_id: Optional[int]
+ name: str
+ file_id: Optional[str]
+ remote: bool
+ remote_filesize: int
+ storage_root: str
+
+ human_date_dmt = '%d.%m.%y'
+ human_time_fmt = '%H:%M:%S'
+
+ def __init__(self, filename: str, remote=False, remote_filesize=None, storage_root='/'):
+ self.name = filename
+ self.storage_root = storage_root
+
+ self.remote = remote
+ self.remote_filesize = remote_filesize
+
+ m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.mp3$', filename)
+ if m:
+ self.start_time = datetime.strptime(m.group(1), RecordStorage.time_fmt)
+ self.stop_time = datetime.strptime(m.group(2), RecordStorage.time_fmt)
+ self.record_id = int(m.group(3))
+ self.file_id = (m.group(1) + '_' + m.group(2)).replace('-', '_')
+ else:
+ logger.warning(f'unexpected filename: {filename}')
+ self.start_time = None
+ self.stop_time = None
+ self.record_id = None
+ self.file_id = None
+
+ @property
+ def path(self):
+ if self.remote:
+ return RuntimeError('remote recording, can\'t get real path')
+
+ return os.path.realpath(os.path.join(
+ self.storage_root, self.name
+ ))
+
+ @property
+ def start_humantime(self) -> str:
+ if self.start_time is None:
+ return '?'
+ fmt = f'{RecordFile.human_date_dmt} {RecordFile.human_time_fmt}'
+ return self.start_time.strftime(fmt)
+
+ @property
+ def stop_humantime(self) -> str:
+ if self.stop_time is None:
+ return '?'
+ fmt = RecordFile.human_time_fmt
+ if self.start_time.date() != self.stop_time.date():
+ fmt = f'{RecordFile.human_date_dmt} {fmt}'
+ return self.stop_time.strftime(fmt)
+
+ @property
+ def start_unixtime(self) -> int:
+ if self.start_time is None:
+ return 0
+ return int(self.start_time.timestamp())
+
+ @property
+ def stop_unixtime(self) -> int:
+ if self.stop_time is None:
+ return 0
+ return int(self.stop_time.timestamp())
+
+ @property
+ def filesize(self):
+ if self.remote:
+ if self.remote_filesize is None:
+ raise RuntimeError('file is remote and remote_filesize is not set')
+ return self.remote_filesize
+ return os.path.getsize(self.path)
+
+ def __dict__(self) -> dict:
+ return {
+ 'start_unixtime': self.start_unixtime,
+ 'stop_unixtime': self.stop_unixtime,
+ 'filename': self.name,
+ 'filesize': self.filesize,
+ 'fileid': self.file_id,
+ 'record_id': self.record_id or 0,
+ }
+
+
+class RecordStorage:
+ time_fmt = '%d%m%y-%H%M%S'
+
+ def __init__(self, root: str):
+ self.root = root
+
+ def getfiles(self, as_objects=False) -> Union[list[str], list[RecordFile]]:
+ files = []
+ for name in os.listdir(self.root):
+ path = os.path.join(self.root, name)
+ if os.path.isfile(path) and name.endswith('.mp3'):
+ files.append(name if not as_objects else RecordFile(name, storage_root=self.root))
+ return files
+
+ def find(self, file_id: str) -> Optional[RecordFile]:
+ for name in os.listdir(self.root):
+ if os.path.isfile(os.path.join(self.root, name)) and name.endswith('.mp3'):
+ item = RecordFile(name, storage_root=self.root)
+ if item.file_id == file_id:
+ return item
+ return None
+
+ def purge(self):
+ files = self.getfiles()
+ if files:
+ logger = logging.getLogger(self.__name__)
+ for f in files:
+ try:
+ path = os.path.join(self.root, f)
+ logger.debug(f'purge: deleting {path}')
+ os.unlink(path)
+ except OSError as exc:
+ logger.exception(exc)
+
+ def delete(self, file: RecordFile):
+ os.unlink(file.path)
+
+ def save(self,
+ fn: str,
+ record_id: int,
+ start_time: int,
+ stop_time: int) -> RecordFile:
+
+ start_time_s = datetime.fromtimestamp(start_time).strftime(self.time_fmt)
+ stop_time_s = datetime.fromtimestamp(stop_time).strftime(self.time_fmt)
+
+ dst_fn = f'{start_time_s}_{stop_time_s}_id{record_id}'
+ if os.path.exists(os.path.join(self.root, dst_fn)):
+ dst_fn += strgen(4)
+ dst_fn += '.mp3'
+ dst_path = os.path.join(self.root, dst_fn)
+
+ shutil.move(fn, dst_path)
+ return RecordFile(dst_fn, storage_root=self.root)