summaryrefslogtreecommitdiff
path: root/src/home/media/record.py
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2023-09-27 00:54:57 +0300
committerEvgeny Zinoviev <me@ch1p.io>2023-09-27 00:54:57 +0300
commitd3a295872c49defb55fc8e4e43e55550991e0927 (patch)
treeb9dca15454f9027d5a9dad0d4443a20de04dbc5d /src/home/media/record.py
parentb7cbc2571c1870b4582ead45277d0aa7f961bec8 (diff)
parentbdbb296697f55f4c3a07af43c9aaf7a9ea86f3d0 (diff)
Merge branch 'master' of ch1p.io:homekit
Diffstat (limited to 'src/home/media/record.py')
-rw-r--r--src/home/media/record.py461
1 files changed, 0 insertions, 461 deletions
diff --git a/src/home/media/record.py b/src/home/media/record.py
deleted file mode 100644
index cd7447a..0000000
--- a/src/home/media/record.py
+++ /dev/null
@@ -1,461 +0,0 @@
-import os
-import threading
-import logging
-import time
-import subprocess
-import signal
-
-from typing import Optional, List, Dict
-from ..util import find_child_processes, Addr
-from ..config import config
-from .storage import RecordFile, RecordStorage
-from .types import RecordStatus
-from ..camera.types import CameraType
-
-
-_history_item_timeout = 7200
-_history_cleanup_freq = 3600
-
-
-class RecordHistoryItem:
- id: int
- request_time: float
- start_time: float
- stop_time: float
- relations: List[int]
- status: RecordStatus
- error: Optional[Exception]
- file: Optional[RecordFile]
- creation_time: float
-
- def __init__(self, id):
- self.id = id
- self.request_time = 0
- self.start_time = 0
- self.stop_time = 0
- self.relations = []
- self.status = RecordStatus.WAITING
- self.file = None
- self.error = None
- self.creation_time = time.time()
-
- def add_relation(self, related_id: int):
- self.relations.append(related_id)
-
- def mark_started(self, start_time: float):
- self.start_time = start_time
- self.status = RecordStatus.RECORDING
-
- def mark_finished(self, end_time: float, file: RecordFile):
- self.stop_time = end_time
- self.file = file
- self.status = RecordStatus.FINISHED
-
- def mark_failed(self, error: Exception):
- self.status = RecordStatus.ERROR
- self.error = error
-
- def as_dict(self) -> dict:
- data = {
- 'id': self.id,
- 'request_time': self.request_time,
- 'status': self.status.value,
- 'relations': self.relations,
- 'start_time': self.start_time,
- 'stop_time': self.stop_time,
- }
- if self.error:
- data['error'] = str(self.error)
- if self.file:
- data['file'] = self.file.__dict__()
- return data
-
-
-class RecordingNotFoundError(Exception):
- pass
-
-
-class RecordHistory:
- history: Dict[int, RecordHistoryItem]
-
- def __init__(self):
- self.history = {}
- self.logger = logging.getLogger(self.__class__.__name__)
-
- def add(self, record_id: int):
- self.logger.debug(f'add: record_id={record_id}')
-
- r = RecordHistoryItem(record_id)
- r.request_time = time.time()
-
- self.history[record_id] = r
-
- def delete(self, record_id: int):
- self.logger.debug(f'delete: record_id={record_id}')
- del self.history[record_id]
-
- def cleanup(self):
- del_ids = []
- for rid, item in self.history.items():
- if item.creation_time < time.time()-_history_item_timeout:
- del_ids.append(rid)
- for rid in del_ids:
- self.delete(rid)
-
- def __getitem__(self, key):
- if key not in self.history:
- raise RecordingNotFoundError()
-
- return self.history[key]
-
- def __setitem__(self, key, value):
- raise NotImplementedError('setting history item this way is prohibited')
-
- def __contains__(self, key):
- return key in self.history
-
-
-class Recording:
- RECORDER_PROGRAM = None
-
- start_time: float
- stop_time: float
- duration: int
- record_id: int
- recorder_program_pid: Optional[int]
- process: Optional[subprocess.Popen]
-
- g_record_id = 1
-
- def __init__(self):
- if self.RECORDER_PROGRAM is None:
- raise RuntimeError('this is abstract class')
-
- self.start_time = 0
- self.stop_time = 0
- self.duration = 0
- self.process = None
- self.recorder_program_pid = None
- self.record_id = Recording.next_id()
- self.logger = logging.getLogger(self.__class__.__name__)
-
- def is_started(self) -> bool:
- return self.start_time > 0 and self.stop_time > 0
-
- def is_waiting(self):
- return self.duration > 0
-
- def ask_for(self, duration) -> int:
- overtime = 0
- orig_duration = duration
-
- if self.is_started():
- already_passed = time.time() - self.start_time
- max_duration = Recorder.get_max_record_time() - already_passed
- self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}')
- else:
- max_duration = Recorder.get_max_record_time()
-
- if duration > max_duration:
- overtime = duration - max_duration
- duration = max_duration
-
- self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}')
-
- self.duration += duration
- if self.is_started():
- til_end = self.stop_time - time.time()
- if til_end < 0:
- til_end = 0
-
- _prev_stop_time = self.stop_time
- _to_add = duration - til_end
- if _to_add < 0:
- _to_add = 0
-
- self.stop_time += _to_add
- self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})')
-
- return overtime
-
- def start(self, output: str):
- assert self.start_time == 0 and self.stop_time == 0, "already started?!"
- assert self.process is None, "self.process is not None, what the hell?"
-
- cur = time.time()
- self.start_time = cur
- self.stop_time = cur + self.duration
-
- cmd = self.get_command(output)
- self.logger.debug(f'start: running `{cmd}`')
- self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True)
-
- sh_pid = self.process.pid
- self.logger.debug(f'start: started, pid of shell is {sh_pid}')
-
- pid = self.find_recorder_program_pid(sh_pid)
- if pid is not None:
- self.recorder_program_pid = pid
- self.logger.debug(f'start: pid of {self.RECORDER_PROGRAM} is {pid}')
-
- def get_command(self, output: str) -> str:
- pass
-
- def stop(self):
- if self.process:
- if self.recorder_program_pid is None:
- self.recorder_program_pid = self.find_recorder_program_pid(self.process.pid)
-
- if self.recorder_program_pid is not None:
- os.kill(self.recorder_program_pid, signal.SIGINT)
- timeout = config['node']['process_wait_timeout']
-
- self.logger.debug(f'stop: sent SIGINT to {self.recorder_program_pid}. now waiting up to {timeout} seconds...')
- try:
- self.process.wait(timeout=timeout)
- except subprocess.TimeoutExpired:
- self.logger.warning(f'stop: wait({timeout}): timeout expired, killing it')
- try:
- os.kill(self.recorder_program_pid, signal.SIGKILL)
- self.process.terminate()
- except Exception as exc:
- self.logger.exception(exc)
- else:
- self.logger.warning(f'stop: pid of {self.RECORDER_PROGRAM} is unknown, calling terminate()')
- self.process.terminate()
-
- rc = self.process.returncode
- self.logger.debug(f'stop: rc={rc}')
-
- self.process = None
- self.recorder_program_pid = 0
-
- self.duration = 0
- self.start_time = 0
- self.stop_time = 0
-
- def find_recorder_program_pid(self, sh_pid: int):
- try:
- children = find_child_processes(sh_pid)
- except OSError as exc:
- self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc))
- return None
-
- for child in children:
- if self.RECORDER_PROGRAM in child.cmd:
- return child.pid
-
- return None
-
- @staticmethod
- def next_id() -> int:
- cur_id = Recording.g_record_id
- Recording.g_record_id += 1
- return cur_id
-
- def increment_id(self):
- self.record_id = Recording.next_id()
-
-
-class Recorder:
- TEMP_NAME = None
-
- interrupted: bool
- lock: threading.Lock
- history_lock: threading.Lock
- recording: Optional[Recording]
- overtime: int
- history: RecordHistory
- next_history_cleanup_time: float
- storage: RecordStorage
-
- def __init__(self,
- storage: RecordStorage,
- recording: Recording):
- if self.TEMP_NAME is None:
- raise RuntimeError('this is abstract class')
-
- self.storage = storage
- self.recording = recording
- self.interrupted = False
- self.lock = threading.Lock()
- self.history_lock = threading.Lock()
- self.overtime = 0
- self.history = RecordHistory()
- self.next_history_cleanup_time = 0
- self.logger = logging.getLogger(self.__class__.__name__)
-
- def start_thread(self):
- t = threading.Thread(target=self.loop)
- t.daemon = True
- t.start()
-
- def loop(self) -> None:
- tempname = os.path.join(self.storage.root, self.TEMP_NAME)
-
- while not self.interrupted:
- cur = time.time()
- stopped = False
- cur_record_id = None
-
- if self.next_history_cleanup_time == 0:
- self.next_history_cleanup_time = time.time() + _history_cleanup_freq
- elif self.next_history_cleanup_time <= time.time():
- self.logger.debug('loop: calling history.cleanup()')
- try:
- self.history.cleanup()
- except Exception as e:
- self.logger.error('loop: error while history.cleanup(): ' + str(e))
- self.next_history_cleanup_time = time.time() + _history_cleanup_freq
-
- with self.lock:
- cur_record_id = self.recording.record_id
- # self.logger.debug(f'cur_record_id={cur_record_id}')
-
- if not self.recording.is_started():
- if self.recording.is_waiting():
- try:
- if os.path.exists(tempname):
- self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..')
- try:
- os.unlink(tempname)
- except OSError as e:
- self.logger.exception(e)
- self.recording.start(tempname)
- with self.history_lock:
- self.history[cur_record_id].mark_started(self.recording.start_time)
- except Exception as exc:
- self.logger.exception(exc)
-
- # there should not be any errors, but still..
- try:
- self.recording.stop()
- except Exception as exc:
- self.logger.exception(exc)
-
- with self.history_lock:
- self.history[cur_record_id].mark_failed(exc)
-
- self.logger.debug(f'loop: start exc path: calling increment_id()')
- self.recording.increment_id()
- else:
- if cur >= self.recording.stop_time:
- try:
- start_time = self.recording.start_time
- stop_time = self.recording.stop_time
- self.recording.stop()
-
- saved_name = self.storage.save(tempname,
- record_id=cur_record_id,
- start_time=int(start_time),
- stop_time=int(stop_time))
-
- with self.history_lock:
- self.history[cur_record_id].mark_finished(stop_time, saved_name)
- except Exception as exc:
- self.logger.exception(exc)
- with self.history_lock:
- self.history[cur_record_id].mark_failed(exc)
- finally:
- self.logger.debug(f'loop: stop exc final path: calling increment_id()')
- self.recording.increment_id()
-
- stopped = True
-
- if stopped and self.overtime > 0:
- self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})')
- _overtime = self.overtime
- self.overtime = 0
-
- related_id = self.record(_overtime)
- self.logger.info(f'enqueued another record with id {related_id}')
-
- if cur_record_id is not None:
- with self.history_lock:
- self.history[cur_record_id].add_relation(related_id)
-
- time.sleep(0.2)
-
- def record(self, duration: int) -> int:
- self.logger.debug(f'record: duration={duration}')
- with self.lock:
- overtime = self.recording.ask_for(duration)
- self.logger.debug(f'overtime={overtime}')
-
- if overtime > self.overtime:
- self.overtime = overtime
-
- if not self.recording.is_started():
- with self.history_lock:
- self.history.add(self.recording.record_id)
-
- return self.recording.record_id
-
- def stop(self):
- self.interrupted = True
-
- def get_info(self, record_id: int) -> RecordHistoryItem:
- with self.history_lock:
- return self.history[record_id]
-
- def forget(self, record_id: int):
- with self.history_lock:
- self.logger.info(f'forget: removing record {record_id} from history')
- self.history.delete(record_id)
-
- @staticmethod
- def get_max_record_time() -> int:
- return config['node']['record_max_time']
-
-
-class SoundRecorder(Recorder):
- TEMP_NAME = 'temp.mp3'
-
- def __init__(self, *args, **kwargs):
- super().__init__(recording=SoundRecording(),
- *args, **kwargs)
-
-
-class CameraRecorder(Recorder):
- TEMP_NAME = 'temp.mp4'
-
- def __init__(self,
- camera_type: CameraType,
- *args, **kwargs):
- if camera_type == CameraType.ESP32:
- recording = ESP32CameraRecording(stream_addr=kwargs['stream_addr'])
- del kwargs['stream_addr']
- else:
- raise RuntimeError(f'unsupported camera type {camera_type}')
-
- super().__init__(recording=recording,
- *args, **kwargs)
-
-
-class SoundRecording(Recording):
- RECORDER_PROGRAM = 'arecord'
-
- def get_command(self, output: str) -> str:
- arecord = config['arecord']['bin']
- lame = config['lame']['bin']
- b = config['lame']['bitrate']
-
- return f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null'
-
-
-class ESP32CameraRecording(Recording):
- RECORDER_PROGRAM = 'esp32_capture.py'
-
- stream_addr: Addr
-
- def __init__(self, stream_addr: Addr):
- super().__init__()
- self.stream_addr = stream_addr
-
- def get_command(self, output: str) -> str:
- bin = config['esp32_capture']['bin']
- return f'{bin} --addr {self.stream_addr[0]}:{self.stream_addr[1]} --output-directory {output} >/dev/null 2>/dev/null'
-
- def start(self, output: str):
- output = os.path.dirname(output)
- return super().start(output) \ No newline at end of file