summaryrefslogtreecommitdiff
path: root/bin/ipcam_capture.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/ipcam_capture.py')
-rwxr-xr-xbin/ipcam_capture.py142
1 files changed, 142 insertions, 0 deletions
diff --git a/bin/ipcam_capture.py b/bin/ipcam_capture.py
new file mode 100755
index 0000000..226e12e
--- /dev/null
+++ b/bin/ipcam_capture.py
@@ -0,0 +1,142 @@
+#!/usr/bin/env python3
+import __py_include
+import sys
+import os
+import subprocess
+import asyncio
+import signal
+
+from typing import TextIO
+from argparse import ArgumentParser
+from socket import gethostname
+from asyncio.streams import StreamReader
+from homekit.config import LinuxBoardsConfig, config as homekit_config
+from homekit.camera import IpcamConfig, CaptureType
+from homekit.camera.util import get_hls_directory, get_hls_channel_name, get_recordings_path
+
+ipcam_config = IpcamConfig()
+lbc_config = LinuxBoardsConfig()
+channels = (1, 2)
+tasks = []
+restart_delay = 3
+lock = asyncio.Lock()
+worker_type: CaptureType
+
+
+async def read_output(stream: StreamReader,
+ thread_name: str,
+ output: TextIO):
+ try:
+ while True:
+ line = await stream.readline()
+ if not line:
+ break
+ print(f"[{thread_name}] {line.decode().strip()}", file=output)
+
+ except asyncio.LimitOverrunError:
+ print(f"[{thread_name}] Output limit exceeded.", file=output)
+
+ except Exception as e:
+ print(f"[{thread_name}] Error occurred while reading output: {e}", file=sys.stderr)
+
+
+async def run_ffmpeg(cam: int, channel: int):
+ prefix = get_hls_channel_name(cam, channel)
+
+ if homekit_config.app_config.logging_is_verbose():
+ debug_args = ['-v', '-info']
+ else:
+ debug_args = ['-nostats', '-loglevel', 'error']
+
+ # protocol = 'tcp' if ipcam_config.should_use_tcp_for_rtsp(cam) else 'udp'
+ protocol = 'tcp'
+ user, pw = ipcam_config.get_rtsp_creds()
+ ip = ipcam_config.get_camera_ip(cam)
+ path = ipcam_config.get_camera_type(cam).get_channel_url(channel)
+ ext = ipcam_config.get_camera_container(cam)
+ ffmpeg_command = ['ffmpeg', *debug_args,
+ '-rtsp_transport', protocol,
+ '-i', f'rtsp://{user}:{pw}@{ip}:554{path}',
+ '-c', 'copy',]
+
+ if worker_type == CaptureType.HLS:
+ ffmpeg_command.extend(['-bufsize', '1835k',
+ '-pix_fmt', 'yuv420p',
+ '-flags', '-global_header',
+ '-hls_time', '2',
+ '-hls_list_size', '3',
+ '-hls_flags', 'delete_segments',
+ os.path.join(get_hls_directory(cam, channel), 'live.m3u8')])
+
+ elif worker_type == CaptureType.RECORD:
+ ffmpeg_command.extend(['-f', 'segment',
+ '-strftime', '1',
+ '-segment_time', '00:10:00',
+ '-segment_atclocktime', '1',
+ os.path.join(get_recordings_path(cam), f'record_%Y-%m-%d-%H.%M.%S.{ext.value}')])
+
+ else:
+ raise ValueError(f'invalid worker type: {worker_type}')
+
+ while True:
+ try:
+ process = await asyncio.create_subprocess_exec(
+ *ffmpeg_command,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE
+ )
+
+ stdout_task = asyncio.create_task(read_output(process.stdout, prefix, sys.stdout))
+ stderr_task = asyncio.create_task(read_output(process.stderr, prefix, sys.stderr))
+
+ await asyncio.gather(stdout_task, stderr_task)
+
+ # check the return code of the process
+ if process.returncode != 0:
+ raise subprocess.CalledProcessError(process.returncode, ffmpeg_command)
+
+ except (FileNotFoundError, PermissionError, subprocess.CalledProcessError) as e:
+ # an error occurred, print the error message
+ error_message = f"Error occurred in {prefix}: {e}"
+ print(error_message, file=sys.stderr)
+
+ # sleep for 5 seconds before restarting the process
+ await asyncio.sleep(restart_delay)
+
+
+async def run():
+ kwargs = {}
+ if worker_type == CaptureType.RECORD:
+ kwargs['filter_by_server'] = gethostname()
+ for cam in ipcam_config.get_all_cam_names(**kwargs):
+ for channel in channels:
+ task = asyncio.create_task(run_ffmpeg(cam, channel))
+ tasks.append(task)
+
+ try:
+ await asyncio.gather(*tasks)
+ except KeyboardInterrupt:
+ print('KeyboardInterrupt: stopping processes...', file=sys.stderr)
+ for task in tasks:
+ task.cancel()
+
+ # wait for subprocesses to terminate
+ await asyncio.gather(*tasks, return_exceptions=True)
+
+ # send termination signal to all subprocesses
+ for task in tasks:
+ process = task.get_stack()
+ if process:
+ process.send_signal(signal.SIGTERM)
+
+
+if __name__ == '__main__':
+ capture_types = [t.value for t in CaptureType]
+ parser = ArgumentParser()
+ parser.add_argument('type', type=str, metavar='CAPTURE_TYPE', choices=tuple(capture_types),
+ help='capture type (variants: '+', '.join(capture_types)+')')
+
+ arg = homekit_config.load_app(no_config=True, parser=parser)
+ worker_type = CaptureType(arg['type'])
+
+ asyncio.run(run())