import asyncio import os.path import logging import psutil import re from datetime import datetime from typing import List, Tuple from ..util import chunks from ..config import config, LinuxBoardsConfig from .config import IpcamConfig from .types import VideoContainerType _logger = logging.getLogger(__name__) _ipcam_config = IpcamConfig() _lbc_config = LinuxBoardsConfig() datetime_format = '%Y-%m-%d-%H.%M.%S' datetime_format_re = r'\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}.\d{2}' def _get_ffmpeg_path() -> str: return 'ffmpeg' if 'ffmpeg' not in config else config['ffmpeg']['path'] def time2seconds(time: str) -> int: time, frac = time.split('.') frac = int(frac) h, m, s = [int(i) for i in time.split(':')] return round(s + m*60 + h*3600 + frac/1000) async def ffmpeg_recreate(filename: str): filedir = os.path.dirname(filename) _, fileext = os.path.splitext(filename) tempname = os.path.join(filedir, f'.temporary_fixing.{fileext}') mtime = os.path.getmtime(filename) args = [_get_ffmpeg_path(), '-nostats', '-loglevel', 'error', '-i', filename, '-c', 'copy', '-y', tempname] proc = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() if proc.returncode != 0: _logger.error(f'fix_timestamps({filename}): ffmpeg returned {proc.returncode}, stderr: {stderr.decode().strip()}') if os.path.isfile(tempname): os.unlink(filename) os.rename(tempname, filename) os.utime(filename, (mtime, mtime)) _logger.info(f'fix_timestamps({filename}): OK') else: _logger.error(f'fix_timestamps({filename}): temp file \'{tempname}\' does not exists, fix failed') async def ffmpeg_cut(input: str, output: str, start_pos: int, duration: int): args = [_get_ffmpeg_path(), '-nostats', '-loglevel', 'error', '-i', input, '-ss', str(start_pos), '-t', str(duration), '-c', 'copy', '-y', output] proc = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() if proc.returncode != 0: _logger.error(f'ffmpeg_cut({input}, start_pos={start_pos}, duration={duration}): ffmpeg returned {proc.returncode}, stderr: {stderr.decode().strip()}') else: _logger.info(f'ffmpeg_cut({input}): OK') def dvr_scan_timecodes(timecodes: str) -> List[Tuple[int, int]]: tc_backup = timecodes timecodes = timecodes.split(',') if len(timecodes) % 2 != 0: raise DVRScanInvalidTimecodes(f'invalid number of timecodes. input: {tc_backup}') timecodes = list(map(time2seconds, timecodes)) timecodes = list(chunks(timecodes, 2)) # sort out invalid fragments (dvr-scan returns them sometimes, idk why...) timecodes = list(filter(lambda f: f[0] < f[1], timecodes)) if not timecodes: raise DVRScanInvalidTimecodes(f'no valid timecodes. input: {tc_backup}') # https://stackoverflow.com/a/43600953 timecodes.sort(key=lambda interval: interval[0]) merged = [timecodes[0]] for current in timecodes: previous = merged[-1] if current[0] <= previous[1]: previous[1] = max(previous[1], current[1]) else: merged.append(current) return merged class DVRScanInvalidTimecodes(Exception): pass def has_handle(fpath): for proc in psutil.process_iter(): try: for item in proc.open_files(): if fpath == item.path: return True except Exception: pass return False def get_recordings_path(cam: int) -> str: server, disk = _ipcam_config.get_cam_server_and_disk(cam) disks = _lbc_config.get_board_disks(server) disk_mountpoint = disks[disk-1] return f'{disk_mountpoint}/cam-{cam}' def get_motion_path(cam: int) -> str: return f'{get_recordings_path(cam)}/motion' def is_valid_recording_name(filename: str) -> bool: if not filename.startswith('record_'): return False for container_type in VideoContainerType: if filename.endswith(f'.{container_type.value}'): return True return False def datetime_from_filename(name: str) -> datetime: name = os.path.basename(name) exts = '|'.join([t.value for t in VideoContainerType]) if name.startswith('record_'): return datetime.strptime(re.match(rf'record_(.*?)\.(?:{exts})', name).group(1), datetime_format) m = re.match(rf'({datetime_format_re})__{datetime_format_re}\.(?:{exts})', name) if m: return datetime.strptime(m.group(1), datetime_format) raise ValueError(f'unrecognized filename format: {name}') def get_hls_channel_name(cam: int, channel: int) -> str: name = str(cam) if channel == 2: name += '-low' return name def get_hls_directory(cam, channel) -> str: dirname = os.path.join( _ipcam_config['hls_path'], get_hls_channel_name(cam, channel) ) if not os.path.exists(dirname): os.makedirs(dirname) return dirname