#!/usr/bin/env python3 import __py_include import sys import os import subprocess import asyncio import signal from typing import TextIO from argparse import ArgumentParser from socket import gethostname from asyncio.streams import StreamReader from homekit.config import LinuxBoardsConfig, config as homekit_config from homekit.camera import IpcamConfig, CaptureType from homekit.camera.util import get_hls_directory, get_hls_channel_name, get_recordings_path ipcam_config = IpcamConfig() lbc_config = LinuxBoardsConfig() channels = (1, 2) tasks = [] restart_delay = 3 lock = asyncio.Lock() worker_type: CaptureType async def read_output(stream: StreamReader, thread_name: str, output: TextIO): try: while True: line = await stream.readline() if not line: break print(f"[{thread_name}] {line.decode().strip()}", file=output) except asyncio.LimitOverrunError: print(f"[{thread_name}] Output limit exceeded.", file=output) except Exception as e: print(f"[{thread_name}] Error occurred while reading output: {e}", file=sys.stderr) async def run_ffmpeg(cam: int, channel: int): prefix = get_hls_channel_name(cam, channel) if homekit_config.app_config.logging_is_verbose(): debug_args = ['-v', '-info'] else: debug_args = ['-nostats', '-loglevel', 'error'] # protocol = 'tcp' if ipcam_config.should_use_tcp_for_rtsp(cam) else 'udp' protocol = 'tcp' user, pw = ipcam_config.get_rtsp_creds() ip = ipcam_config.get_camera_ip(cam) path = ipcam_config.get_camera_type(cam).get_channel_url(channel) ext = ipcam_config.get_camera_container(cam) ffmpeg_command = ['ffmpeg', *debug_args, '-rtsp_transport', protocol, '-i', f'rtsp://{user}:{pw}@{ip}:554{path}', '-c', 'copy',] if worker_type == CaptureType.HLS: ffmpeg_command.extend(['-bufsize', '1835k', '-pix_fmt', 'yuv420p', '-flags', '-global_header', '-hls_time', '2', '-hls_list_size', '3', '-hls_flags', 'delete_segments', os.path.join(get_hls_directory(cam, channel), 'live.m3u8')]) elif worker_type == CaptureType.RECORD: ffmpeg_command.extend(['-f', 'segment', '-strftime', '1', '-segment_time', '00:10:00', '-segment_atclocktime', '1', os.path.join(get_recordings_path(cam), f'record_%Y-%m-%d-%H.%M.%S.{ext.value}')]) else: raise ValueError(f'invalid worker type: {worker_type}') while True: try: process = await asyncio.create_subprocess_exec( *ffmpeg_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout_task = asyncio.create_task(read_output(process.stdout, prefix, sys.stdout)) stderr_task = asyncio.create_task(read_output(process.stderr, prefix, sys.stderr)) await asyncio.gather(stdout_task, stderr_task) # check the return code of the process if process.returncode != 0: raise subprocess.CalledProcessError(process.returncode, ffmpeg_command) except (FileNotFoundError, PermissionError, subprocess.CalledProcessError) as e: # an error occurred, print the error message error_message = f"Error occurred in {prefix}: {e}" print(error_message, file=sys.stderr) # sleep for 5 seconds before restarting the process await asyncio.sleep(restart_delay) async def run(): kwargs = {} if worker_type == CaptureType.RECORD: kwargs['filter_by_server'] = gethostname() for cam in ipcam_config.get_all_cam_names(**kwargs): for channel in channels: task = asyncio.create_task(run_ffmpeg(cam, channel)) tasks.append(task) try: await asyncio.gather(*tasks) except KeyboardInterrupt: print('KeyboardInterrupt: stopping processes...', file=sys.stderr) for task in tasks: task.cancel() # wait for subprocesses to terminate await asyncio.gather(*tasks, return_exceptions=True) # send termination signal to all subprocesses for task in tasks: process = task.get_stack() if process: process.send_signal(signal.SIGTERM) if __name__ == '__main__': capture_types = [t.value for t in CaptureType] parser = ArgumentParser() parser.add_argument('type', type=str, metavar='CAPTURE_TYPE', choices=tuple(capture_types), help='capture type (variants: '+', '.join(capture_types)+')') arg = homekit_config.load_app(no_config=True, parser=parser) worker_type = CaptureType(arg['type']) asyncio.run(run())