summaryrefslogtreecommitdiff
path: root/src/home/sound/record.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/home/sound/record.py')
-rw-r--r--src/home/sound/record.py400
1 files changed, 400 insertions, 0 deletions
diff --git a/src/home/sound/record.py b/src/home/sound/record.py
new file mode 100644
index 0000000..1ad8827
--- /dev/null
+++ b/src/home/sound/record.py
@@ -0,0 +1,400 @@
+import threading
+import time
+import subprocess
+import signal
+import os
+import logging
+
+from enum import Enum, auto
+from typing import Optional
+from ..config import config
+from ..util import find_child_processes
+from .storage import RecordFile, RecordStorage
+
+
+_history_item_timeout = 7200
+_history_cleanup_freq = 3600
+
+
+class RecordStatus(Enum):
+ WAITING = auto()
+ RECORDING = auto()
+ FINISHED = auto()
+ ERROR = auto()
+
+
+class RecordHistoryItem:
+ id: int
+ request_time: float
+ start_time: float
+ stop_time: float
+ relations: list[int]
+ status: RecordStatus
+ error: Optional[Exception]
+ file: Optional[RecordFile]
+ creation_time: float
+
+ def __init__(self, id):
+ self.id = id
+ self.request_time = 0
+ self.start_time = 0
+ self.stop_time = 0
+ self.relations = []
+ self.status = RecordStatus.WAITING
+ self.file = None
+ self.error = None
+ self.creation_time = time.time()
+
+ def add_relation(self, related_id: int):
+ self.relations.append(related_id)
+
+ def mark_started(self, start_time: float):
+ self.start_time = start_time
+ self.status = RecordStatus.RECORDING
+
+ def mark_finished(self, end_time: float, file: RecordFile):
+ self.stop_time = end_time
+ self.file = file
+ self.status = RecordStatus.FINISHED
+
+ def mark_failed(self, error: Exception):
+ self.status = RecordStatus.ERROR
+ self.error = error
+
+ def as_dict(self) -> dict:
+ data = {
+ 'id': self.id,
+ 'request_time': self.request_time,
+ 'status': self.status.value,
+ 'relations': self.relations,
+ 'start_time': self.start_time,
+ 'stop_time': self.stop_time,
+ }
+ if self.error:
+ data['error'] = str(self.error)
+ if self.file:
+ data['file'] = self.file.__dict__()
+ return data
+
+
+class RecordingNotFoundError(Exception):
+ pass
+
+
+class RecordHistory:
+ history: dict[int, RecordHistoryItem]
+
+ def __init__(self):
+ self.history = {}
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def add(self, record_id: int):
+ self.logger.debug(f'add: record_id={record_id}')
+
+ r = RecordHistoryItem(record_id)
+ r.request_time = time.time()
+
+ self.history[record_id] = r
+
+ def delete(self, record_id: int):
+ self.logger.debug(f'delete: record_id={record_id}')
+ del self.history[record_id]
+
+ def cleanup(self):
+ del_ids = []
+ for rid, item in self.history.items():
+ if item.creation_time < time.time()-_history_item_timeout:
+ del_ids.append(rid)
+ for rid in del_ids:
+ self.delete(rid)
+
+ def __getitem__(self, key):
+ if key not in self.history:
+ raise RecordingNotFoundError()
+
+ return self.history[key]
+
+ def __setitem__(self, key, value):
+ raise NotImplementedError('setting history item this way is prohibited')
+
+ def __contains__(self, key):
+ return key in self.history
+
+
+class Recording:
+ start_time: float
+ stop_time: float
+ duration: int
+ record_id: int
+ arecord_pid: Optional[int]
+ process: Optional[subprocess.Popen]
+
+ g_record_id = 1
+
+ def __init__(self):
+ self.start_time = 0
+ self.stop_time = 0
+ self.duration = 0
+ self.process = None
+ self.arecord_pid = None
+ self.record_id = Recording.next_id()
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def is_started(self) -> bool:
+ return self.start_time > 0 and self.stop_time > 0
+
+ def is_waiting(self):
+ return self.duration > 0
+
+ def ask_for(self, duration) -> int:
+ overtime = 0
+ orig_duration = duration
+
+ if self.is_started():
+ already_passed = time.time() - self.start_time
+ max_duration = Recorder.get_max_record_time() - already_passed
+ self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}')
+ else:
+ max_duration = Recorder.get_max_record_time()
+
+ if duration > max_duration:
+ overtime = duration - max_duration
+ duration = max_duration
+
+ self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}')
+
+ self.duration += duration
+ if self.is_started():
+ til_end = self.stop_time - time.time()
+ if til_end < 0:
+ til_end = 0
+
+ _prev_stop_time = self.stop_time
+ _to_add = duration - til_end
+ if _to_add < 0:
+ _to_add = 0
+
+ self.stop_time += _to_add
+ self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})')
+
+ return overtime
+
+ def start(self, output: str):
+ assert self.start_time == 0 and self.stop_time == 0, "already started?!"
+ assert self.process is None, "self.process is not None, what the hell?"
+
+ cur = time.time()
+ self.start_time = cur
+ self.stop_time = cur + self.duration
+
+ arecord = config['arecord']['bin']
+ lame = config['lame']['bin']
+ b = config['lame']['bitrate']
+
+ cmd = f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null'
+ self.logger.debug(f'start: running `{cmd}`')
+ self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True)
+
+ sh_pid = self.process.pid
+ self.logger.debug(f'start: started, pid of shell is {sh_pid}')
+
+ arecord_pid = self.find_arecord_pid(sh_pid)
+ if arecord_pid is not None:
+ self.arecord_pid = arecord_pid
+ self.logger.debug(f'start: pid of arecord is {arecord_pid}')
+
+ def stop(self):
+ if self.process:
+ if self.arecord_pid is None:
+ self.arecord_pid = self.find_arecord_pid(self.process.pid)
+
+ if self.arecord_pid is not None:
+ os.kill(self.arecord_pid, signal.SIGINT)
+ timeout = config['node']['process_wait_timeout']
+
+ self.logger.debug(f'stop: sent SIGINT to {self.arecord_pid}. now waiting up to {timeout} seconds...')
+ try:
+ self.process.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ self.logger.warning(f'stop: wait({timeout}): timeout expired, calling terminate()')
+ self.process.terminate()
+ else:
+ self.logger.warning('stop: pid of arecord is unknown, calling terminate()')
+ self.process.terminate()
+
+ rc = self.process.returncode
+ self.logger.debug(f'stop: rc={rc}')
+
+ self.process = None
+ self.arecord_pid = 0
+
+ self.duration = 0
+ self.start_time = 0
+ self.stop_time = 0
+
+ def find_arecord_pid(self, sh_pid: int):
+ try:
+ children = find_child_processes(sh_pid)
+ except OSError as exc:
+ self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc))
+ return None
+
+ for child in children:
+ if 'arecord' in child.cmd:
+ return child.pid
+
+ return None
+
+ @staticmethod
+ def next_id() -> int:
+ cur_id = Recording.g_record_id
+ Recording.g_record_id += 1
+ return cur_id
+
+ def increment_id(self):
+ self.record_id = Recording.next_id()
+
+
+class Recorder:
+ interrupted: bool
+ lock: threading.Lock
+ history_lock: threading.Lock
+ recording: Optional[Recording]
+ overtime: int
+ history: RecordHistory
+ next_history_cleanup_time: float
+ storage: RecordStorage
+
+ def __init__(self, storage: RecordStorage):
+ self.storage = storage
+ self.recording = Recording()
+ self.interrupted = False
+ self.lock = threading.Lock()
+ self.history_lock = threading.Lock()
+ self.overtime = 0
+ self.history = RecordHistory()
+ self.next_history_cleanup_time = 0
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def start_thread(self):
+ t = threading.Thread(target=self.loop)
+ t.daemon = True
+ t.start()
+
+ def loop(self) -> None:
+ tempname = os.path.join(self.storage.root, 'temp.mp3')
+
+ while not self.interrupted:
+ cur = time.time()
+ stopped = False
+ cur_record_id = None
+
+ if self.next_history_cleanup_time == 0:
+ self.next_history_cleanup_time = time.time() + _history_cleanup_freq
+ elif self.next_history_cleanup_time <= time.time():
+ self.logger.debug('loop: calling history.cleanup()')
+ try:
+ self.history.cleanup()
+ except Exception as e:
+ self.logger.error('loop: error while history.cleanup(): ' + str(e))
+ self.next_history_cleanup_time = time.time() + _history_cleanup_freq
+
+ with self.lock:
+ cur_record_id = self.recording.record_id
+ # self.logger.debug(f'cur_record_id={cur_record_id}')
+
+ if not self.recording.is_started():
+ if self.recording.is_waiting():
+ try:
+ if os.path.exists(tempname):
+ self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..')
+ try:
+ os.unlink(tempname)
+ except OSError as e:
+ self.logger.exception(e)
+ self.recording.start(tempname)
+ with self.history_lock:
+ self.history[cur_record_id].mark_started(self.recording.start_time)
+ except Exception as exc:
+ self.logger.exception(exc)
+
+ # there should not be any errors, but still..
+ try:
+ self.recording.stop()
+ except Exception as exc:
+ self.logger.exception(exc)
+
+ with self.history_lock:
+ self.history[cur_record_id].mark_failed(exc)
+
+ self.logger.debug(f'loop: start exc path: calling increment_id()')
+ self.recording.increment_id()
+ else:
+ if cur >= self.recording.stop_time:
+ try:
+ start_time = self.recording.start_time
+ stop_time = self.recording.stop_time
+ self.recording.stop()
+
+ saved_name = self.storage.save(tempname,
+ record_id=cur_record_id,
+ start_time=int(start_time),
+ stop_time=int(stop_time))
+
+ with self.history_lock:
+ self.history[cur_record_id].mark_finished(stop_time, saved_name)
+ except Exception as exc:
+ self.logger.exception(exc)
+ with self.history_lock:
+ self.history[cur_record_id].mark_failed(exc)
+ finally:
+ self.logger.debug(f'loop: stop exc final path: calling increment_id()')
+ self.recording.increment_id()
+
+ stopped = True
+
+ if stopped and self.overtime > 0:
+ self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})')
+ _overtime = self.overtime
+ self.overtime = 0
+
+ related_id = self.record(_overtime)
+ self.logger.info(f'enqueued another record with id {related_id}')
+
+ if cur_record_id is not None:
+ with self.history_lock:
+ self.history[cur_record_id].add_relation(related_id)
+
+ time.sleep(0.2)
+
+ def record(self, duration: int) -> int:
+ self.logger.debug(f'record: duration={duration}')
+ with self.lock:
+ overtime = self.recording.ask_for(duration)
+ self.logger.debug(f'overtime={overtime}')
+
+ if overtime > self.overtime:
+ self.overtime = overtime
+
+ if not self.recording.is_started():
+ with self.history_lock:
+ self.history.add(self.recording.record_id)
+
+ return self.recording.record_id
+
+ def stop(self):
+ self.interrupted = True
+
+ def get_info(self, record_id: int) -> RecordHistoryItem:
+ with self.history_lock:
+ return self.history[record_id]
+
+ def forget(self, record_id: int):
+ with self.history_lock:
+ self.logger.info(f'forget: removing record {record_id} from history')
+ self.history.delete(record_id)
+
+ @staticmethod
+ def get_max_record_time() -> int:
+ return config['node']['record_max_time']
+