diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2023-06-10 23:02:34 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2023-06-10 23:02:34 +0300 |
commit | b0bf43e6a272d42a55158e657bd937cb82fc3d8d (patch) | |
tree | f1bc13253bc028abcaed9c88882f5aee384a269c /bin/ipcam_server.py | |
parent | f3b9d50496257d87757802dfb472b5ffae11962c (diff) |
move files, rename home package to homekit
Diffstat (limited to 'bin/ipcam_server.py')
-rwxr-xr-x | bin/ipcam_server.py | 581 |
1 files changed, 581 insertions, 0 deletions
diff --git a/bin/ipcam_server.py b/bin/ipcam_server.py new file mode 100755 index 0000000..211bc86 --- /dev/null +++ b/bin/ipcam_server.py @@ -0,0 +1,581 @@ +#!/usr/bin/env python3 +import logging +import os +import re +import asyncio +import time +import shutil +import __py_include + +import homekit.telegram.aio as telegram + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from asyncio import Lock + +from homekit.config import config +from homekit import http +from homekit.database.sqlite import SQLiteBase +from homekit.camera import util as camutil + +from enum import Enum +from typing import Optional, Union, List, Tuple +from datetime import datetime, timedelta +from functools import cmp_to_key + + +class TimeFilterType(Enum): + FIX = 'fix' + MOTION = 'motion' + MOTION_START = 'motion_start' + + +class TelegramLinkType(Enum): + FRAGMENT = 'fragment' + ORIGINAL_FILE = 'original_file' + + +def valid_recording_name(filename: str) -> bool: + return filename.startswith('record_') and filename.endswith('.mp4') + + +def filename_to_datetime(filename: str) -> datetime: + filename = os.path.basename(filename).replace('record_', '').replace('.mp4', '') + return datetime.strptime(filename, datetime_format) + + +def get_all_cams() -> list: + return [cam for cam in config['camera'].keys()] + + +# ipcam database +# -------------- + +class IPCamServerDatabase(SQLiteBase): + SCHEMA = 4 + + def __init__(self): + super().__init__() + + def schema_init(self, version: int) -> None: + cursor = self.cursor() + + if version < 1: + # timestamps + cursor.execute("""CREATE TABLE IF NOT EXISTS timestamps ( + camera INTEGER PRIMARY KEY, + fix_time INTEGER NOT NULL, + motion_time INTEGER NOT NULL + )""") + for cam in config['camera'].keys(): + self.add_camera(cam) + + if version < 2: + # motion_failures + cursor.execute("""CREATE TABLE IF NOT EXISTS motion_failures ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + camera INTEGER NOT NULL, + filename TEXT NOT NULL + )""") + + if version < 3: + cursor.execute("ALTER TABLE motion_failures ADD COLUMN message TEXT NOT NULL DEFAULT ''") + + if version < 4: + cursor.execute("ALTER TABLE timestamps ADD COLUMN motion_start_time INTEGER NOT NULL DEFAULT 0") + cursor.execute("UPDATE timestamps SET motion_start_time=motion_time") + + self.commit() + + def add_camera(self, camera: int): + self.cursor().execute("INSERT INTO timestamps (camera, fix_time, motion_time) VALUES (?, ?, ?)", + (camera, 0, 0)) + self.commit() + + def add_motion_failure(self, + camera: int, + filename: str, + message: Optional[str]): + self.cursor().execute("INSERT INTO motion_failures (camera, filename, message) VALUES (?, ?, ?)", + (camera, filename, message or '')) + self.commit() + + def get_all_timestamps(self): + cur = self.cursor() + data = {} + + cur.execute("SELECT camera, fix_time, motion_time, motion_start_time FROM timestamps") + for cam, fix_time, motion_time, motion_start_time in cur.fetchall(): + data[int(cam)] = { + 'fix': int(fix_time), + 'motion': int(motion_time), + 'motion_start': int(motion_start_time) + } + + return data + + def set_timestamp(self, + camera: int, + time_type: TimeFilterType, + time: Union[int, datetime]): + cur = self.cursor() + if isinstance(time, datetime): + time = int(time.timestamp()) + cur.execute(f"UPDATE timestamps SET {time_type.value}_time=? WHERE camera=?", (time, camera)) + self.commit() + + def get_timestamp(self, + camera: int, + time_type: TimeFilterType) -> int: + cur = self.cursor() + cur.execute(f"SELECT {time_type.value}_time FROM timestamps WHERE camera=?", (camera,)) + return int(cur.fetchone()[0]) + + +# ipcam web api +# ------------- + +class IPCamWebServer(http.HTTPServer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.get('/api/recordings', self.get_motion_queue) + self.get('/api/recordings/{name}', self.get_camera_recordings) + self.get('/api/recordings/{name}/download/{file}', self.download_recording) + self.get('/api/camera/list', self.camlist) + self.get('/api/timestamp/{name}/{type}', self.get_timestamp) + self.get('/api/timestamp/all', self.get_all_timestamps) + + self.post('/api/debug/migrate-mtimes', self.debug_migrate_mtimes) + self.post('/api/debug/fix', self.debug_fix) + self.post('/api/debug/cleanup', self.debug_cleanup) + self.post('/api/timestamp/{name}/{type}', self.set_timestamp) + + self.post('/api/motion/done/{name}', self.submit_motion) + self.post('/api/motion/fail/{name}', self.submit_motion_failure) + + self.get('/api/motion/params/{name}', self.get_motion_params) + self.get('/api/motion/params/{name}/roi', self.get_motion_roi_params) + + self.queue_lock = Lock() + + async def get_camera_recordings(self, req): + camera = int(req.match_info['name']) + try: + filter = TimeFilterType(req.query['filter']) + except KeyError: + filter = None + + try: + limit = int(req.query['limit']) + except KeyError: + limit = 0 + + files = get_recordings_files(camera, filter, limit) + if files: + time = filename_to_datetime(files[len(files)-1]['name']) + db.set_timestamp(camera, TimeFilterType.MOTION_START, time) + return self.ok({'files': files}) + + async def get_motion_queue(self, req): + try: + limit = int(req.query['limit']) + except KeyError: + limit = 0 + + async with self.queue_lock: + files = get_recordings_files(None, TimeFilterType.MOTION_START, limit) + if files: + times_by_cam = {} + for file in files: + time = filename_to_datetime(file['name']) + if file['cam'] not in times_by_cam or times_by_cam[file['cam']] < time: + times_by_cam[file['cam']] = time + for cam, time in times_by_cam.items(): + db.set_timestamp(cam, TimeFilterType.MOTION_START, time) + + return self.ok({'files': files}) + + async def download_recording(self, req: http.Request): + cam = int(req.match_info['name']) + file = req.match_info['file'] + + fullpath = os.path.join(config['camera'][cam]['recordings_path'], file) + if not os.path.isfile(fullpath): + raise ValueError(f'file "{fullpath}" does not exists') + + return http.FileResponse(fullpath) + + async def camlist(self, req: http.Request): + return self.ok(config['camera']) + + async def submit_motion(self, req: http.Request): + data = await req.post() + + camera = int(req.match_info['name']) + timecodes = data['timecodes'] + filename = data['filename'] + + time = filename_to_datetime(filename) + + try: + if timecodes != '': + fragments = camutil.dvr_scan_timecodes(timecodes) + asyncio.ensure_future(process_fragments(camera, filename, fragments)) + + db.set_timestamp(camera, TimeFilterType.MOTION, time) + return self.ok() + + except camutil.DVRScanInvalidTimecodes as e: + db.add_motion_failure(camera, filename, str(e)) + db.set_timestamp(camera, TimeFilterType.MOTION, time) + return self.ok('invalid timecodes') + + async def submit_motion_failure(self, req: http.Request): + camera = int(req.match_info['name']) + + data = await req.post() + filename = data['filename'] + message = data['message'] + + db.add_motion_failure(camera, filename, message) + db.set_timestamp(camera, TimeFilterType.MOTION, filename_to_datetime(filename)) + + return self.ok() + + async def debug_migrate_mtimes(self, req: http.Request): + written = {} + for cam in config['camera'].keys(): + confdir = os.path.join(os.getenv('HOME'), '.config', f'video-util-{cam}') + for time_type in TimeFilterType: + txt_file = os.path.join(confdir, f'{time_type.value}_mtime') + if os.path.isfile(txt_file): + with open(txt_file, 'r') as fd: + data = fd.read() + db.set_timestamp(cam, time_type, int(data.strip())) + + if cam not in written: + written[cam] = [] + written[cam].append(time_type) + + return self.ok({'written': written}) + + async def debug_fix(self, req: http.Request): + asyncio.ensure_future(fix_job()) + return self.ok() + + async def debug_cleanup(self, req: http.Request): + asyncio.ensure_future(cleanup_job()) + return self.ok() + + async def set_timestamp(self, req: http.Request): + cam, time_type, time = self._getset_timestamp_params(req, need_time=True) + db.set_timestamp(cam, time_type, time) + return self.ok() + + async def get_timestamp(self, req: http.Request): + cam, time_type = self._getset_timestamp_params(req) + return self.ok(db.get_timestamp(cam, time_type)) + + async def get_all_timestamps(self, req: http.Request): + return self.ok(db.get_all_timestamps()) + + async def get_motion_params(self, req: http.Request): + data = config['motion_params'][int(req.match_info['name'])] + lines = [ + f'threshold={data["threshold"]}', + f'min_event_length=3s', + f'frame_skip=2', + f'downscale_factor=3', + ] + return self.plain('\n'.join(lines)+'\n') + + async def get_motion_roi_params(self, req: http.Request): + data = config['motion_params'][int(req.match_info['name'])] + return self.plain('\n'.join(data['roi'])+'\n') + + @staticmethod + def _getset_timestamp_params(req: http.Request, need_time=False): + values = [] + + cam = int(req.match_info['name']) + assert cam in config['camera'], 'invalid camera' + + values.append(cam) + values.append(TimeFilterType(req.match_info['type'])) + + if need_time: + time = req.query['time'] + if time.startswith('record_'): + time = filename_to_datetime(time) + elif time.isnumeric(): + time = int(time) + else: + raise ValueError('invalid time') + values.append(time) + + return values + + +# other global stuff +# ------------------ + +def open_database(): + global db + db = IPCamServerDatabase() + + # update cams list in database, if needed + cams = db.get_all_timestamps().keys() + for cam in config['camera']: + if cam not in cams: + db.add_camera(cam) + + +def get_recordings_path(cam: int) -> str: + return config['camera'][cam]['recordings_path'] + + +def get_motion_path(cam: int) -> str: + return config['camera'][cam]['motion_path'] + + +def get_recordings_files(cam: Optional[int] = None, + time_filter_type: Optional[TimeFilterType] = None, + limit=0) -> List[dict]: + from_time = 0 + to_time = int(time.time()) + + cams = [cam] if cam is not None else get_all_cams() + files = [] + for cam in cams: + if time_filter_type: + from_time = db.get_timestamp(cam, time_filter_type) + if time_filter_type in (TimeFilterType.MOTION, TimeFilterType.MOTION_START): + to_time = db.get_timestamp(cam, TimeFilterType.FIX) + + from_time = datetime.fromtimestamp(from_time) + to_time = datetime.fromtimestamp(to_time) + + recdir = get_recordings_path(cam) + cam_files = [{ + 'cam': cam, + 'name': file, + 'size': os.path.getsize(os.path.join(recdir, file))} + for file in os.listdir(recdir) + if valid_recording_name(file) and from_time < filename_to_datetime(file) <= to_time] + cam_files.sort(key=lambda file: file['name']) + + if cam_files: + last = cam_files[len(cam_files)-1] + fullpath = os.path.join(recdir, last['name']) + if camutil.has_handle(fullpath): + logger.debug(f'get_recordings_files: file {fullpath} has opened handle, ignoring it') + cam_files.pop() + files.extend(cam_files) + + if limit > 0: + files = files[:limit] + + return files + + +async def process_fragments(camera: int, + filename: str, + fragments: List[Tuple[int, int]]) -> None: + time = filename_to_datetime(filename) + + rec_dir = get_recordings_path(camera) + motion_dir = get_motion_path(camera) + if not os.path.exists(motion_dir): + os.mkdir(motion_dir) + + for fragment in fragments: + start, end = fragment + + start -= config['motion']['padding'] + end += config['motion']['padding'] + + if start < 0: + start = 0 + + duration = end - start + + dt1 = (time + timedelta(seconds=start)).strftime(datetime_format) + dt2 = (time + timedelta(seconds=end)).strftime(datetime_format) + + await camutil.ffmpeg_cut(input=os.path.join(rec_dir, filename), + output=os.path.join(motion_dir, f'{dt1}__{dt2}.mp4'), + start_pos=start, + duration=duration) + + if fragments and 'telegram' in config['motion'] and config['motion']['telegram']: + asyncio.ensure_future(motion_notify_tg(camera, filename, fragments)) + + +async def motion_notify_tg(camera: int, + filename: str, + fragments: List[Tuple[int, int]]): + dt_file = filename_to_datetime(filename) + fmt = '%H:%M:%S' + + text = f'Camera: <b>{camera}</b>\n' + text += f'Original file: <b>{filename}</b> ' + text += _tg_links(TelegramLinkType.ORIGINAL_FILE, camera, filename) + + for start, end in fragments: + start -= config['motion']['padding'] + end += config['motion']['padding'] + + if start < 0: + start = 0 + + duration = end - start + if duration < 0: + duration = 0 + + dt1 = dt_file + timedelta(seconds=start) + dt2 = dt_file + timedelta(seconds=end) + + text += f'\nFragment: <b>{duration}s</b>, {dt1.strftime(fmt)}-{dt2.strftime(fmt)} ' + text += _tg_links(TelegramLinkType.FRAGMENT, camera, f'{dt1.strftime(datetime_format)}__{dt2.strftime(datetime_format)}.mp4') + + await telegram.send_message(text) + + +def _tg_links(link_type: TelegramLinkType, + camera: int, + file: str) -> str: + links = [] + for link_name, link_template in config['telegram'][f'{link_type.value}_url_templates']: + link = link_template.replace('{camera}', str(camera)).replace('{file}', file) + links.append(f'<a href="{link}">{link_name}</a>') + return ' '.join(links) + + +async def fix_job() -> None: + global fix_job_running + logger.debug('fix_job: starting') + + if fix_job_running: + logger.error('fix_job: already running') + return + + try: + fix_job_running = True + for cam in config['camera'].keys(): + files = get_recordings_files(cam, TimeFilterType.FIX) + if not files: + logger.debug(f'fix_job: no files for camera {cam}') + continue + + logger.debug(f'fix_job: got %d files for camera {cam}' % (len(files),)) + + for file in files: + fullpath = os.path.join(get_recordings_path(cam), file['name']) + await camutil.ffmpeg_recreate(fullpath) + timestamp = filename_to_datetime(file['name']) + if timestamp: + db.set_timestamp(cam, TimeFilterType.FIX, timestamp) + + finally: + fix_job_running = False + + +async def cleanup_job() -> None: + def fn2dt(name: str) -> datetime: + name = os.path.basename(name) + + if name.startswith('record_'): + return datetime.strptime(re.match(r'record_(.*?)\.mp4', name).group(1), datetime_format) + + m = re.match(rf'({datetime_format_re})__{datetime_format_re}\.mp4', name) + if m: + return datetime.strptime(m.group(1), datetime_format) + + raise ValueError(f'unrecognized filename format: {name}') + + def compare(i1: str, i2: str) -> int: + dt1 = fn2dt(i1) + dt2 = fn2dt(i2) + + if dt1 < dt2: + return -1 + elif dt1 > dt2: + return 1 + else: + return 0 + + global cleanup_job_running + logger.debug('cleanup_job: starting') + + if cleanup_job_running: + logger.error('cleanup_job: already running') + return + + try: + cleanup_job_running = True + + gb = float(1 << 30) + for storage in config['storages']: + if os.path.exists(storage['mountpoint']): + total, used, free = shutil.disk_usage(storage['mountpoint']) + free_gb = free // gb + if free_gb < config['cleanup_min_gb']: + # print(f"{storage['mountpoint']}: free={free}, free_gb={free_gb}") + cleaned = 0 + files = [] + for cam in storage['cams']: + for _dir in (config['camera'][cam]['recordings_path'], config['camera'][cam]['motion_path']): + files += list(map(lambda file: os.path.join(_dir, file), os.listdir(_dir))) + files = list(filter(lambda path: os.path.isfile(path) and path.endswith('.mp4'), files)) + files.sort(key=cmp_to_key(compare)) + + for file in files: + size = os.stat(file).st_size + try: + os.unlink(file) + cleaned += size + except OSError as e: + logger.exception(e) + if (free + cleaned) // gb >= config['cleanup_min_gb']: + break + else: + logger.error(f"cleanup_job: {storage['mountpoint']} not found") + finally: + cleanup_job_running = False + + +fix_job_running = False +cleanup_job_running = False + +datetime_format = '%Y-%m-%d-%H.%M.%S' +datetime_format_re = r'\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}.\d{2}' +db: Optional[IPCamServerDatabase] = None +server: Optional[IPCamWebServer] = None +logger = logging.getLogger(__name__) + + +# start of the program +# -------------------- + +if __name__ == '__main__': + config.load_app('ipcam_server') + + open_database() + + loop = asyncio.get_event_loop() + + try: + scheduler = AsyncIOScheduler(event_loop=loop) + if config['fix_enabled']: + scheduler.add_job(fix_job, 'interval', seconds=config['fix_interval'], misfire_grace_time=None) + + scheduler.add_job(cleanup_job, 'interval', seconds=config['cleanup_interval'], misfire_grace_time=None) + scheduler.start() + except KeyError: + pass + + asyncio.ensure_future(fix_job()) + asyncio.ensure_future(cleanup_job()) + + server = IPCamWebServer(config.get_addr('server.listen')) + server.run() |