diff options
Diffstat (limited to 'src/home/media/record.py')
-rw-r--r-- | src/home/media/record.py | 453 |
1 files changed, 453 insertions, 0 deletions
diff --git a/src/home/media/record.py b/src/home/media/record.py new file mode 100644 index 0000000..d3abfbb --- /dev/null +++ b/src/home/media/record.py @@ -0,0 +1,453 @@ +import os +import threading +import logging +import time +import subprocess +import signal + +from typing import Optional +from ..util import find_child_processes, Addr +from ..config import config +from .storage import RecordFile, RecordStorage +from .types import RecordStatus +from ..camera.types import CameraType + + +_history_item_timeout = 7200 +_history_cleanup_freq = 3600 + + +class RecordHistoryItem: + id: int + request_time: float + start_time: float + stop_time: float + relations: list[int] + status: RecordStatus + error: Optional[Exception] + file: Optional[RecordFile] + creation_time: float + + def __init__(self, id): + self.id = id + self.request_time = 0 + self.start_time = 0 + self.stop_time = 0 + self.relations = [] + self.status = RecordStatus.WAITING + self.file = None + self.error = None + self.creation_time = time.time() + + def add_relation(self, related_id: int): + self.relations.append(related_id) + + def mark_started(self, start_time: float): + self.start_time = start_time + self.status = RecordStatus.RECORDING + + def mark_finished(self, end_time: float, file: RecordFile): + self.stop_time = end_time + self.file = file + self.status = RecordStatus.FINISHED + + def mark_failed(self, error: Exception): + self.status = RecordStatus.ERROR + self.error = error + + def as_dict(self) -> dict: + data = { + 'id': self.id, + 'request_time': self.request_time, + 'status': self.status.value, + 'relations': self.relations, + 'start_time': self.start_time, + 'stop_time': self.stop_time, + } + if self.error: + data['error'] = str(self.error) + if self.file: + data['file'] = self.file.__dict__() + return data + + +class RecordingNotFoundError(Exception): + pass + + +class RecordHistory: + history: dict[int, RecordHistoryItem] + + def __init__(self): + self.history = {} + self.logger = logging.getLogger(self.__class__.__name__) + + def add(self, record_id: int): + self.logger.debug(f'add: record_id={record_id}') + + r = RecordHistoryItem(record_id) + r.request_time = time.time() + + self.history[record_id] = r + + def delete(self, record_id: int): + self.logger.debug(f'delete: record_id={record_id}') + del self.history[record_id] + + def cleanup(self): + del_ids = [] + for rid, item in self.history.items(): + if item.creation_time < time.time()-_history_item_timeout: + del_ids.append(rid) + for rid in del_ids: + self.delete(rid) + + def __getitem__(self, key): + if key not in self.history: + raise RecordingNotFoundError() + + return self.history[key] + + def __setitem__(self, key, value): + raise NotImplementedError('setting history item this way is prohibited') + + def __contains__(self, key): + return key in self.history + + +class Recording: + RECORDER_PROGRAM = None + + start_time: float + stop_time: float + duration: int + record_id: int + recorder_program_pid: Optional[int] + process: Optional[subprocess.Popen] + + g_record_id = 1 + + def __init__(self): + if self.RECORDER_PROGRAM is None: + raise RuntimeError('this is abstract class') + + self.start_time = 0 + self.stop_time = 0 + self.duration = 0 + self.process = None + self.recorder_program_pid = None + self.record_id = Recording.next_id() + self.logger = logging.getLogger(self.__class__.__name__) + + def is_started(self) -> bool: + return self.start_time > 0 and self.stop_time > 0 + + def is_waiting(self): + return self.duration > 0 + + def ask_for(self, duration) -> int: + overtime = 0 + orig_duration = duration + + if self.is_started(): + already_passed = time.time() - self.start_time + max_duration = Recorder.get_max_record_time() - already_passed + self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}') + else: + max_duration = Recorder.get_max_record_time() + + if duration > max_duration: + overtime = duration - max_duration + duration = max_duration + + self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}') + + self.duration += duration + if self.is_started(): + til_end = self.stop_time - time.time() + if til_end < 0: + til_end = 0 + + _prev_stop_time = self.stop_time + _to_add = duration - til_end + if _to_add < 0: + _to_add = 0 + + self.stop_time += _to_add + self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})') + + return overtime + + def start(self, output: str): + assert self.start_time == 0 and self.stop_time == 0, "already started?!" + assert self.process is None, "self.process is not None, what the hell?" + + cur = time.time() + self.start_time = cur + self.stop_time = cur + self.duration + + cmd = self.get_command(output) + self.logger.debug(f'start: running `{cmd}`') + self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True) + + sh_pid = self.process.pid + self.logger.debug(f'start: started, pid of shell is {sh_pid}') + + pid = self.find_recorder_program_pid(sh_pid) + if pid is not None: + self.recorder_program_pid = pid + self.logger.debug(f'start: pid of {self.RECORDER_PROGRAM} is {pid}') + + def get_command(self, output: str) -> str: + pass + + def stop(self): + if self.process: + if self.recorder_program_pid is None: + self.recorder_program_pid = self.find_recorder_program_pid(self.process.pid) + + if self.recorder_program_pid is not None: + os.kill(self.recorder_program_pid, signal.SIGINT) + timeout = config['node']['process_wait_timeout'] + + self.logger.debug(f'stop: sent SIGINT to {self.recorder_program_pid}. now waiting up to {timeout} seconds...') + try: + self.process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + self.logger.warning(f'stop: wait({timeout}): timeout expired, calling terminate()') + self.process.terminate() + else: + self.logger.warning(f'stop: pid of {self.RECORDER_PROGRAM} is unknown, calling terminate()') + self.process.terminate() + + rc = self.process.returncode + self.logger.debug(f'stop: rc={rc}') + + self.process = None + self.recorder_program_pid = 0 + + self.duration = 0 + self.start_time = 0 + self.stop_time = 0 + + def find_recorder_program_pid(self, sh_pid: int): + try: + children = find_child_processes(sh_pid) + except OSError as exc: + self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc)) + return None + + for child in children: + if self.RECORDER_PROGRAM in child.cmd: + return child.pid + + return None + + @staticmethod + def next_id() -> int: + cur_id = Recording.g_record_id + Recording.g_record_id += 1 + return cur_id + + def increment_id(self): + self.record_id = Recording.next_id() + + +class Recorder: + TEMP_NAME = None + + interrupted: bool + lock: threading.Lock + history_lock: threading.Lock + recording: Optional[Recording] + overtime: int + history: RecordHistory + next_history_cleanup_time: float + storage: RecordStorage + + def __init__(self, + storage: RecordStorage, + recording: Recording): + if self.TEMP_NAME is None: + raise RuntimeError('this is abstract class') + + self.storage = storage + self.recording = recording + self.interrupted = False + self.lock = threading.Lock() + self.history_lock = threading.Lock() + self.overtime = 0 + self.history = RecordHistory() + self.next_history_cleanup_time = 0 + self.logger = logging.getLogger(self.__class__.__name__) + + def start_thread(self): + t = threading.Thread(target=self.loop) + t.daemon = True + t.start() + + def loop(self) -> None: + tempname = os.path.join(self.storage.root, self.TEMP_NAME) + + while not self.interrupted: + cur = time.time() + stopped = False + cur_record_id = None + + if self.next_history_cleanup_time == 0: + self.next_history_cleanup_time = time.time() + _history_cleanup_freq + elif self.next_history_cleanup_time <= time.time(): + self.logger.debug('loop: calling history.cleanup()') + try: + self.history.cleanup() + except Exception as e: + self.logger.error('loop: error while history.cleanup(): ' + str(e)) + self.next_history_cleanup_time = time.time() + _history_cleanup_freq + + with self.lock: + cur_record_id = self.recording.record_id + # self.logger.debug(f'cur_record_id={cur_record_id}') + + if not self.recording.is_started(): + if self.recording.is_waiting(): + try: + if os.path.exists(tempname): + self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..') + try: + os.unlink(tempname) + except OSError as e: + self.logger.exception(e) + self.recording.start(tempname) + with self.history_lock: + self.history[cur_record_id].mark_started(self.recording.start_time) + except Exception as exc: + self.logger.exception(exc) + + # there should not be any errors, but still.. + try: + self.recording.stop() + except Exception as exc: + self.logger.exception(exc) + + with self.history_lock: + self.history[cur_record_id].mark_failed(exc) + + self.logger.debug(f'loop: start exc path: calling increment_id()') + self.recording.increment_id() + else: + if cur >= self.recording.stop_time: + try: + start_time = self.recording.start_time + stop_time = self.recording.stop_time + self.recording.stop() + + saved_name = self.storage.save(tempname, + record_id=cur_record_id, + start_time=int(start_time), + stop_time=int(stop_time)) + + with self.history_lock: + self.history[cur_record_id].mark_finished(stop_time, saved_name) + except Exception as exc: + self.logger.exception(exc) + with self.history_lock: + self.history[cur_record_id].mark_failed(exc) + finally: + self.logger.debug(f'loop: stop exc final path: calling increment_id()') + self.recording.increment_id() + + stopped = True + + if stopped and self.overtime > 0: + self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})') + _overtime = self.overtime + self.overtime = 0 + + related_id = self.record(_overtime) + self.logger.info(f'enqueued another record with id {related_id}') + + if cur_record_id is not None: + with self.history_lock: + self.history[cur_record_id].add_relation(related_id) + + time.sleep(0.2) + + def record(self, duration: int) -> int: + self.logger.debug(f'record: duration={duration}') + with self.lock: + overtime = self.recording.ask_for(duration) + self.logger.debug(f'overtime={overtime}') + + if overtime > self.overtime: + self.overtime = overtime + + if not self.recording.is_started(): + with self.history_lock: + self.history.add(self.recording.record_id) + + return self.recording.record_id + + def stop(self): + self.interrupted = True + + def get_info(self, record_id: int) -> RecordHistoryItem: + with self.history_lock: + return self.history[record_id] + + def forget(self, record_id: int): + with self.history_lock: + self.logger.info(f'forget: removing record {record_id} from history') + self.history.delete(record_id) + + @staticmethod + def get_max_record_time() -> int: + return config['node']['record_max_time'] + + +class SoundRecorder(Recorder): + TEMP_NAME = 'temp.mp3' + + def __init__(self, *args, **kwargs): + super().__init__(recording=SoundRecording(), + *args, **kwargs) + + +class CameraRecorder(Recorder): + TEMP_NAME = 'temp.mp4' + + def __init__(self, + camera_type: CameraType, + *args, **kwargs): + if camera_type == CameraType.ESP32: + recording = ESP32CameraRecording(stream_addr=kwargs['stream_addr']) + del kwargs['stream_addr'] + else: + raise RuntimeError(f'unsupported camera type {camera_type}') + + super().__init__(recording=recording, + *args, **kwargs) + + +class SoundRecording(Recording): + RECORDER_PROGRAM = 'arecord' + + def get_command(self, output: str) -> str: + arecord = config['arecord']['bin'] + lame = config['lame']['bin'] + b = config['lame']['bitrate'] + + return f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null' + + +class ESP32CameraRecording(Recording): + RECORDER_PROGRAM = 'esp32_capture.py' + + stream_addr: Addr + + def __init__(self, stream_addr: Addr): + super().__init__() + self.stream_addr = stream_addr + + def get_command(self, output: str) -> str: + bin = config['esp32_capture']['bin'] + return f'{bin} --addr {self.stream_addr[0]}:{self.stream_addr[1]} --output-directory {output} >/dev/null 2>/dev/null'
\ No newline at end of file |