#!/usr/bin/env python3 import logging import os import asyncio import time import shutil import __py_include import homekit.telegram.aio as telegram from socket import gethostname from argparse import ArgumentParser from apscheduler.schedulers.asyncio import AsyncIOScheduler from asyncio import Lock from homekit.config import config as homekit_config, LinuxBoardsConfig from homekit.util import Addr from homekit import http from homekit.database.sqlite import SQLiteBase from homekit.camera import util as camutil, IpcamConfig from homekit.camera.types import ( TimeFilterType, TelegramLinkType, VideoContainerType ) from homekit.camera.util import ( get_recordings_path, get_motion_path, is_valid_recording_name, datetime_from_filename ) from typing import Optional, Union, List, Tuple from datetime import datetime, timedelta from functools import cmp_to_key ipcam_config = IpcamConfig() lbc_config = LinuxBoardsConfig() # ipcam database # -------------- class IpcamServerDatabase(SQLiteBase): SCHEMA = 4 def __init__(self, path=None): super().__init__(path=path) def schema_init(self, version: int) -> None: cursor = self.cursor() if version < 1: # timestamps cursor.execute("""CREATE TABLE IF NOT EXISTS timestamps ( camera INTEGER PRIMARY KEY, fix_time INTEGER NOT NULL, motion_time INTEGER NOT NULL )""") for cam in ipcam_config.get_all_cam_names_for_this_server(): self.add_camera(cam) if version < 2: # motion_failures cursor.execute("""CREATE TABLE IF NOT EXISTS motion_failures ( id INTEGER PRIMARY KEY AUTOINCREMENT, camera INTEGER NOT NULL, filename TEXT NOT NULL )""") if version < 3: cursor.execute("ALTER TABLE motion_failures ADD COLUMN message TEXT NOT NULL DEFAULT ''") if version < 4: cursor.execute("ALTER TABLE timestamps ADD COLUMN motion_start_time INTEGER NOT NULL DEFAULT 0") cursor.execute("UPDATE timestamps SET motion_start_time=motion_time") self.commit() def add_camera(self, camera: int): self.cursor().execute("INSERT INTO timestamps (camera, fix_time, motion_time) VALUES (?, ?, ?)", (camera, 0, 0)) self.commit() def add_motion_failure(self, camera: int, filename: str, message: Optional[str]): self.cursor().execute("INSERT INTO motion_failures (camera, filename, message) VALUES (?, ?, ?)", (camera, filename, message or '')) self.commit() def get_all_timestamps(self): cur = self.cursor() data = {} cur.execute("SELECT camera, fix_time, motion_time, motion_start_time FROM timestamps") for cam, fix_time, motion_time, motion_start_time in cur.fetchall(): data[int(cam)] = { 'fix': int(fix_time), 'motion': int(motion_time), 'motion_start': int(motion_start_time) } return data def set_timestamp(self, camera: int, time_type: TimeFilterType, time: Union[int, datetime]): cur = self.cursor() if isinstance(time, datetime): time = int(time.timestamp()) cur.execute(f"UPDATE timestamps SET {time_type.value}_time=? WHERE camera=?", (time, camera)) self.commit() def get_timestamp(self, camera: int, time_type: TimeFilterType) -> int: cur = self.cursor() cur.execute(f"SELECT {time_type.value}_time FROM timestamps WHERE camera=?", (camera,)) return int(cur.fetchone()[0]) # ipcam web api # ------------- class IpcamWebServer(http.HTTPServer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.get('/api/recordings', self.get_motion_queue) self.get('/api/recordings/{name}', self.get_camera_recordings) self.get('/api/recordings/{name}/download/{file}', self.download_recording) self.get('/api/camera/list', self.camlist) self.get('/api/timestamp/{name}/{type}', self.get_timestamp) self.get('/api/timestamp/all', self.get_all_timestamps) self.post('/api/debug/fix', self.debug_fix) self.post('/api/debug/cleanup', self.debug_cleanup) self.post('/api/timestamp/{name}/{type}', self.set_timestamp) self.post('/api/motion/done/{name}', self.submit_motion) self.post('/api/motion/fail/{name}', self.submit_motion_failure) # self.get('/api/motion/params/{name}', self.get_motion_params) # self.get('/api/motion/params/{name}/roi', self.get_motion_roi_params) self.queue_lock = Lock() async def get_camera_recordings(self, req): camera = int(req.match_info['name']) try: filter = TimeFilterType(req.query['filter']) except KeyError: filter = None try: limit = int(req.query['limit']) except KeyError: limit = 0 files = get_recordings_files(camera, filter, limit) if files: time = datetime_from_filename(files[len(files)-1]['name']) db.set_timestamp(camera, TimeFilterType.MOTION_START, time) return self.ok({'files': files}) async def get_motion_queue(self, req): try: limit = int(req.query['limit']) except KeyError: limit = 0 async with self.queue_lock: files = get_recordings_files(None, TimeFilterType.MOTION_START, limit) if files: times_by_cam = {} for file in files: time = datetime_from_filename(file['name']) if file['cam'] not in times_by_cam or times_by_cam[file['cam']] < time: times_by_cam[file['cam']] = time for cam, time in times_by_cam.items(): db.set_timestamp(cam, TimeFilterType.MOTION_START, time) return self.ok({'files': files}) async def download_recording(self, req: http.Request): cam = int(req.match_info['name']) file = req.match_info['file'] fullpath = os.path.join(get_recordings_path(cam), file) if not os.path.isfile(fullpath): raise ValueError(f'file "{fullpath}" does not exists') return http.FileResponse(fullpath) async def camlist(self, req: http.Request): return self.ok(ipcam_config.get_all_cam_names_for_this_server()) async def submit_motion(self, req: http.Request): data = await req.post() camera = int(req.match_info['name']) timecodes = data['timecodes'] filename = data['filename'] time = datetime_from_filename(filename) try: if timecodes != '': fragments = camutil.dvr_scan_timecodes(timecodes) asyncio.ensure_future(process_fragments(camera, filename, fragments)) db.set_timestamp(camera, TimeFilterType.MOTION, time) return self.ok() except camutil.DVRScanInvalidTimecodes as e: db.add_motion_failure(camera, filename, str(e)) db.set_timestamp(camera, TimeFilterType.MOTION, time) return self.ok('invalid timecodes') async def submit_motion_failure(self, req: http.Request): camera = int(req.match_info['name']) data = await req.post() filename = data['filename'] message = data['message'] db.add_motion_failure(camera, filename, message) db.set_timestamp(camera, TimeFilterType.MOTION, datetime_from_filename(filename)) return self.ok() async def debug_fix(self, req: http.Request): asyncio.ensure_future(fix_job()) return self.ok() async def debug_cleanup(self, req: http.Request): asyncio.ensure_future(cleanup_job()) return self.ok() async def set_timestamp(self, req: http.Request): cam, time_type, time = self._getset_timestamp_params(req, need_time=True) db.set_timestamp(cam, time_type, time) return self.ok() async def get_timestamp(self, req: http.Request): cam, time_type = self._getset_timestamp_params(req) return self.ok(db.get_timestamp(cam, time_type)) async def get_all_timestamps(self, req: http.Request): return self.ok(db.get_all_timestamps()) # async def get_motion_params(self, req: http.Request): # data = config['motion_params'][int(req.match_info['name'])] # lines = [ # f'threshold={data["threshold"]}', # f'min_event_length=3s', # f'frame_skip=2', # f'downscale_factor=3', # ] # return self.plain('\n'.join(lines)+'\n') # # async def get_motion_roi_params(self, req: http.Request): # data = config['motion_params'][int(req.match_info['name'])] # return self.plain('\n'.join(data['roi'])+'\n') @staticmethod def _getset_timestamp_params(req: http.Request, need_time=False): values = [] cam = int(req.match_info['name']) assert cam in ipcam_config.get_all_cam_names_for_this_server(), 'invalid camera' values.append(cam) values.append(TimeFilterType(req.match_info['type'])) if need_time: time = req.query['time'] if time.startswith('record_'): time = datetime_from_filename(time) elif time.isnumeric(): time = int(time) else: raise ValueError('invalid time') values.append(time) return values # other global stuff # ------------------ def open_database(database_path: str): global db db = IpcamServerDatabase(database_path) # update cams list in database, if needed stored_cams = db.get_all_timestamps().keys() for cam in ipcam_config.get_all_cam_names_for_this_server(): if cam not in stored_cams: db.add_camera(cam) def get_recordings_files(cam: Optional[int] = None, time_filter_type: Optional[TimeFilterType] = None, limit=0) -> List[dict]: from_time = 0 to_time = int(time.time()) cams = [cam] if cam is not None else ipcam_config.get_all_cam_names_for_this_server() files = [] for cam in cams: if time_filter_type: from_time = db.get_timestamp(cam, time_filter_type) if time_filter_type in (TimeFilterType.MOTION, TimeFilterType.MOTION_START): to_time = db.get_timestamp(cam, TimeFilterType.FIX) from_time = datetime.fromtimestamp(from_time) to_time = datetime.fromtimestamp(to_time) recdir = get_recordings_path(cam) cam_files = [{ 'cam': cam, 'name': file, 'size': os.path.getsize(os.path.join(recdir, file))} for file in os.listdir(recdir) if is_valid_recording_name(file) and from_time < datetime_from_filename(file) <= to_time] cam_files.sort(key=lambda file: file['name']) if cam_files: last = cam_files[len(cam_files)-1] fullpath = os.path.join(recdir, last['name']) if camutil.has_handle(fullpath): logger.debug(f'get_recordings_files: file {fullpath} has opened handle, ignoring it') cam_files.pop() files.extend(cam_files) if limit > 0: files = files[:limit] return files async def process_fragments(camera: int, filename: str, fragments: List[Tuple[int, int]]) -> None: time = datetime_from_filename(filename) rec_dir = get_recordings_path(camera) motion_dir = get_motion_path(camera) if not os.path.exists(motion_dir): os.mkdir(motion_dir) for fragment in fragments: start, end = fragment start -= ipcam_config['motion_padding'] end += ipcam_config['motion_padding'] if start < 0: start = 0 duration = end - start dt1 = (time + timedelta(seconds=start)).strftime(datetime_format) dt2 = (time + timedelta(seconds=end)).strftime(datetime_format) await camutil.ffmpeg_cut(input=os.path.join(rec_dir, filename), output=os.path.join(motion_dir, f'{dt1}__{dt2}.mp4'), start_pos=start, duration=duration) if fragments and ipcam_config['motion_telegram']: asyncio.ensure_future(motion_notify_tg(camera, filename, fragments)) async def motion_notify_tg(camera: int, filename: str, fragments: List[Tuple[int, int]]): dt_file = datetime_from_filename(filename) fmt = '%H:%M:%S' text = f'Camera: {camera}\n' text += f'Original file: {filename} ' text += _tg_links(TelegramLinkType.ORIGINAL_FILE, camera, filename) for start, end in fragments: start -= ipcam_config['motion_padding'] end += ipcam_config['motion_padding'] if start < 0: start = 0 duration = end - start if duration < 0: duration = 0 dt1 = dt_file + timedelta(seconds=start) dt2 = dt_file + timedelta(seconds=end) text += f'\nFragment: {duration}s, {dt1.strftime(fmt)}-{dt2.strftime(fmt)} ' text += _tg_links(TelegramLinkType.FRAGMENT, camera, f'{dt1.strftime(datetime_format)}__{dt2.strftime(datetime_format)}.mp4') await telegram.send_message(text) def _tg_links(link_type: TelegramLinkType, camera: int, file: str) -> str: links = [] for link_name, link_template in ipcam_config[f'{link_type.value}_url_templates']: link = link_template.replace('{camera}', str(camera)).replace('{file}', file) links.append(f'{link_name}') return ' '.join(links) async def fix_job() -> None: global fix_job_running logger.debug('fix_job: starting') if fix_job_running: logger.error('fix_job: already running') return try: fix_job_running = True for cam in ipcam_config.get_all_cam_names_for_this_server(): files = get_recordings_files(cam, TimeFilterType.FIX) if not files: logger.debug(f'fix_job: no files for camera {cam}') continue logger.debug(f'fix_job: got %d files for camera {cam}' % (len(files),)) for file in files: fullpath = os.path.join(get_recordings_path(cam), file['name']) await camutil.ffmpeg_recreate(fullpath) timestamp = datetime_from_filename(file['name']) if timestamp: db.set_timestamp(cam, TimeFilterType.FIX, timestamp) finally: fix_job_running = False async def cleanup_job() -> None: def compare(i1: str, i2: str) -> int: dt1 = datetime_from_filename(i1) dt2 = datetime_from_filename(i2) if dt1 < dt2: return -1 elif dt1 > dt2: return 1 else: return 0 global cleanup_job_running logger.debug('cleanup_job: starting') if cleanup_job_running: logger.error('cleanup_job: already running') return try: cleanup_job_running = True gb = float(1 << 30) disk_number = 0 for storage in lbc_config.get_board_disks(gethostname()): disk_number += 1 if os.path.exists(storage['mountpoint']): total, used, free = shutil.disk_usage(storage['mountpoint']) free_gb = free // gb if free_gb < ipcam_config['cleanup_min_gb']: cleaned = 0 files = [] for cam in ipcam_config.get_all_cam_names_for_this_server(filter_by_disk=disk_number): for _dir in (get_recordings_path(cam), get_motion_path(cam)): files += list(map(lambda file: os.path.join(_dir, file), os.listdir(_dir))) files = list(filter(lambda path: os.path.isfile(path) and path.endswith(tuple([f'.{t.value}' for t in VideoContainerType])), files)) files.sort(key=cmp_to_key(compare)) for file in files: size = os.stat(file).st_size try: os.unlink(file) cleaned += size except OSError as e: logger.exception(e) if (free + cleaned) // gb >= ipcam_config['cleanup_min_gb']: break else: logger.error(f"cleanup_job: {storage['mountpoint']} not found") finally: cleanup_job_running = False fix_job_running = False cleanup_job_running = False datetime_format = '%Y-%m-%d-%H.%M.%S' datetime_format_re = r'\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}.\d{2}' db: Optional[IpcamServerDatabase] = None server: Optional[IpcamWebServer] = None logger = logging.getLogger(__name__) # start of the program # -------------------- if __name__ == '__main__': parser = ArgumentParser() parser.add_argument('--listen', type=str, required=True) parser.add_argument('--database-path', type=str, required=True) arg = homekit_config.load_app(no_config=True, parser=parser) open_database(arg.database_path) loop = asyncio.get_event_loop() try: scheduler = AsyncIOScheduler(event_loop=loop) if ipcam_config['fix_enabled']: scheduler.add_job(fix_job, 'interval', seconds=ipcam_config['fix_interval'], misfire_grace_time=None) scheduler.add_job(cleanup_job, 'interval', seconds=ipcam_config['cleanup_interval'], misfire_grace_time=None) scheduler.start() except KeyError: pass asyncio.ensure_future(fix_job()) asyncio.ensure_future(cleanup_job()) server = IpcamWebServer(Addr.fromstring(arg.listen)) server.run()