diff options
Diffstat (limited to 'bin/ipcam_server.py')
-rwxr-xr-x | bin/ipcam_server.py | 544 |
1 files changed, 544 insertions, 0 deletions
diff --git a/bin/ipcam_server.py b/bin/ipcam_server.py new file mode 100755 index 0000000..71d5ea1 --- /dev/null +++ b/bin/ipcam_server.py @@ -0,0 +1,544 @@ +#!/usr/bin/env python3 +import logging +import os +import asyncio +import time +import shutil +import __py_include + +import homekit.telegram.aio as telegram + +from socket import gethostname +from argparse import ArgumentParser +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from asyncio import Lock + +from homekit.config import config as homekit_config, LinuxBoardsConfig +from homekit.util import Addr +from homekit import http +from homekit.database.sqlite import SQLiteBase +from homekit.camera import util as camutil, IpcamConfig +from homekit.camera.types import ( + TimeFilterType, + TelegramLinkType, + VideoContainerType +) +from homekit.camera.util import ( + get_recordings_path, + get_motion_path, + is_valid_recording_name, + datetime_from_filename +) + +from typing import Optional, Union, List, Tuple +from datetime import datetime, timedelta +from functools import cmp_to_key + + +ipcam_config = IpcamConfig() +lbc_config = LinuxBoardsConfig() + + +# ipcam database +# -------------- + +class IpcamServerDatabase(SQLiteBase): + SCHEMA = 4 + + def __init__(self, path=None): + super().__init__(path=path) + + def schema_init(self, version: int) -> None: + cursor = self.cursor() + + if version < 1: + # timestamps + cursor.execute("""CREATE TABLE IF NOT EXISTS timestamps ( + camera INTEGER PRIMARY KEY, + fix_time INTEGER NOT NULL, + motion_time INTEGER NOT NULL + )""") + for cam in ipcam_config.get_all_cam_names_for_this_server(): + self.add_camera(cam) + + if version < 2: + # motion_failures + cursor.execute("""CREATE TABLE IF NOT EXISTS motion_failures ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + camera INTEGER NOT NULL, + filename TEXT NOT NULL + )""") + + if version < 3: + cursor.execute("ALTER TABLE motion_failures ADD COLUMN message TEXT NOT NULL DEFAULT ''") + + if version < 4: + cursor.execute("ALTER TABLE timestamps ADD COLUMN motion_start_time INTEGER NOT NULL DEFAULT 0") + cursor.execute("UPDATE timestamps SET motion_start_time=motion_time") + + self.commit() + + def add_camera(self, camera: int): + self.cursor().execute("INSERT INTO timestamps (camera, fix_time, motion_time) VALUES (?, ?, ?)", + (camera, 0, 0)) + self.commit() + + def add_motion_failure(self, + camera: int, + filename: str, + message: Optional[str]): + self.cursor().execute("INSERT INTO motion_failures (camera, filename, message) VALUES (?, ?, ?)", + (camera, filename, message or '')) + self.commit() + + def get_all_timestamps(self): + cur = self.cursor() + data = {} + + cur.execute("SELECT camera, fix_time, motion_time, motion_start_time FROM timestamps") + for cam, fix_time, motion_time, motion_start_time in cur.fetchall(): + data[int(cam)] = { + 'fix': int(fix_time), + 'motion': int(motion_time), + 'motion_start': int(motion_start_time) + } + + return data + + def set_timestamp(self, + camera: int, + time_type: TimeFilterType, + time: Union[int, datetime]): + cur = self.cursor() + if isinstance(time, datetime): + time = int(time.timestamp()) + cur.execute(f"UPDATE timestamps SET {time_type.value}_time=? WHERE camera=?", (time, camera)) + self.commit() + + def get_timestamp(self, + camera: int, + time_type: TimeFilterType) -> int: + cur = self.cursor() + cur.execute(f"SELECT {time_type.value}_time FROM timestamps WHERE camera=?", (camera,)) + return int(cur.fetchone()[0]) + + +# ipcam web api +# ------------- + +class IpcamWebServer(http.HTTPServer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.get('/api/recordings', self.get_motion_queue) + self.get('/api/recordings/{name}', self.get_camera_recordings) + self.get('/api/recordings/{name}/download/{file}', self.download_recording) + self.get('/api/camera/list', self.camlist) + self.get('/api/timestamp/{name}/{type}', self.get_timestamp) + self.get('/api/timestamp/all', self.get_all_timestamps) + + self.post('/api/debug/fix', self.debug_fix) + self.post('/api/debug/cleanup', self.debug_cleanup) + + self.post('/api/timestamp/{name}/{type}', self.set_timestamp) + + self.post('/api/motion/done/{name}', self.submit_motion) + self.post('/api/motion/fail/{name}', self.submit_motion_failure) + + # self.get('/api/motion/params/{name}', self.get_motion_params) + # self.get('/api/motion/params/{name}/roi', self.get_motion_roi_params) + + self.queue_lock = Lock() + + async def get_camera_recordings(self, req): + camera = int(req.match_info['name']) + try: + filter = TimeFilterType(req.query['filter']) + except KeyError: + filter = None + + try: + limit = int(req.query['limit']) + except KeyError: + limit = 0 + + files = get_recordings_files(camera, filter, limit) + if files: + time = datetime_from_filename(files[len(files)-1]['name']) + db.set_timestamp(camera, TimeFilterType.MOTION_START, time) + return self.ok({'files': files}) + + async def get_motion_queue(self, req): + try: + limit = int(req.query['limit']) + except KeyError: + limit = 0 + + async with self.queue_lock: + files = get_recordings_files(None, TimeFilterType.MOTION_START, limit) + if files: + times_by_cam = {} + for file in files: + time = datetime_from_filename(file['name']) + if file['cam'] not in times_by_cam or times_by_cam[file['cam']] < time: + times_by_cam[file['cam']] = time + for cam, time in times_by_cam.items(): + db.set_timestamp(cam, TimeFilterType.MOTION_START, time) + + return self.ok({'files': files}) + + async def download_recording(self, req: http.Request): + cam = int(req.match_info['name']) + file = req.match_info['file'] + + fullpath = os.path.join(get_recordings_path(cam), file) + if not os.path.isfile(fullpath): + raise ValueError(f'file "{fullpath}" does not exists') + + return http.FileResponse(fullpath) + + async def camlist(self, req: http.Request): + return self.ok(ipcam_config.get_all_cam_names_for_this_server()) + + async def submit_motion(self, req: http.Request): + data = await req.post() + + camera = int(req.match_info['name']) + timecodes = data['timecodes'] + filename = data['filename'] + + time = datetime_from_filename(filename) + + try: + if timecodes != '': + fragments = camutil.dvr_scan_timecodes(timecodes) + asyncio.ensure_future(process_fragments(camera, filename, fragments)) + + db.set_timestamp(camera, TimeFilterType.MOTION, time) + return self.ok() + + except camutil.DVRScanInvalidTimecodes as e: + db.add_motion_failure(camera, filename, str(e)) + db.set_timestamp(camera, TimeFilterType.MOTION, time) + return self.ok('invalid timecodes') + + async def submit_motion_failure(self, req: http.Request): + camera = int(req.match_info['name']) + + data = await req.post() + filename = data['filename'] + message = data['message'] + + db.add_motion_failure(camera, filename, message) + db.set_timestamp(camera, TimeFilterType.MOTION, datetime_from_filename(filename)) + + return self.ok() + + async def debug_fix(self, req: http.Request): + asyncio.ensure_future(fix_job()) + return self.ok() + + async def debug_cleanup(self, req: http.Request): + asyncio.ensure_future(cleanup_job()) + return self.ok() + + async def set_timestamp(self, req: http.Request): + cam, time_type, time = self._getset_timestamp_params(req, need_time=True) + db.set_timestamp(cam, time_type, time) + return self.ok() + + async def get_timestamp(self, req: http.Request): + cam, time_type = self._getset_timestamp_params(req) + return self.ok(db.get_timestamp(cam, time_type)) + + async def get_all_timestamps(self, req: http.Request): + return self.ok(db.get_all_timestamps()) + + # async def get_motion_params(self, req: http.Request): + # data = config['motion_params'][int(req.match_info['name'])] + # lines = [ + # f'threshold={data["threshold"]}', + # f'min_event_length=3s', + # f'frame_skip=2', + # f'downscale_factor=3', + # ] + # return self.plain('\n'.join(lines)+'\n') + # + # async def get_motion_roi_params(self, req: http.Request): + # data = config['motion_params'][int(req.match_info['name'])] + # return self.plain('\n'.join(data['roi'])+'\n') + + @staticmethod + def _getset_timestamp_params(req: http.Request, need_time=False): + values = [] + + cam = int(req.match_info['name']) + assert cam in ipcam_config.get_all_cam_names_for_this_server(), 'invalid camera' + + values.append(cam) + values.append(TimeFilterType(req.match_info['type'])) + + if need_time: + time = req.query['time'] + if time.startswith('record_'): + time = datetime_from_filename(time) + elif time.isnumeric(): + time = int(time) + else: + raise ValueError('invalid time') + values.append(time) + + return values + + +# other global stuff +# ------------------ + +def open_database(database_path: str): + global db + db = IpcamServerDatabase(database_path) + + # update cams list in database, if needed + stored_cams = db.get_all_timestamps().keys() + for cam in ipcam_config.get_all_cam_names_for_this_server(): + if cam not in stored_cams: + db.add_camera(cam) + + +def get_recordings_files(cam: Optional[int] = None, + time_filter_type: Optional[TimeFilterType] = None, + limit=0) -> List[dict]: + from_time = 0 + to_time = int(time.time()) + + cams = [cam] if cam is not None else ipcam_config.get_all_cam_names_for_this_server() + files = [] + for cam in cams: + if time_filter_type: + from_time = db.get_timestamp(cam, time_filter_type) + if time_filter_type in (TimeFilterType.MOTION, TimeFilterType.MOTION_START): + to_time = db.get_timestamp(cam, TimeFilterType.FIX) + + from_time = datetime.fromtimestamp(from_time) + to_time = datetime.fromtimestamp(to_time) + + recdir = get_recordings_path(cam) + cam_files = [{ + 'cam': cam, + 'name': file, + 'size': os.path.getsize(os.path.join(recdir, file))} + for file in os.listdir(recdir) + if is_valid_recording_name(file) and from_time < datetime_from_filename(file) <= to_time] + cam_files.sort(key=lambda file: file['name']) + + if cam_files: + last = cam_files[len(cam_files)-1] + fullpath = os.path.join(recdir, last['name']) + if camutil.has_handle(fullpath): + logger.debug(f'get_recordings_files: file {fullpath} has opened handle, ignoring it') + cam_files.pop() + files.extend(cam_files) + + if limit > 0: + files = files[:limit] + + return files + + +async def process_fragments(camera: int, + filename: str, + fragments: List[Tuple[int, int]]) -> None: + time = datetime_from_filename(filename) + + rec_dir = get_recordings_path(camera) + motion_dir = get_motion_path(camera) + if not os.path.exists(motion_dir): + os.mkdir(motion_dir) + + for fragment in fragments: + start, end = fragment + + start -= ipcam_config['motion_padding'] + end += ipcam_config['motion_padding'] + + if start < 0: + start = 0 + + duration = end - start + + dt1 = (time + timedelta(seconds=start)).strftime(datetime_format) + dt2 = (time + timedelta(seconds=end)).strftime(datetime_format) + + await camutil.ffmpeg_cut(input=os.path.join(rec_dir, filename), + output=os.path.join(motion_dir, f'{dt1}__{dt2}.mp4'), + start_pos=start, + duration=duration) + + if fragments and ipcam_config['motion_telegram']: + asyncio.ensure_future(motion_notify_tg(camera, filename, fragments)) + + +async def motion_notify_tg(camera: int, + filename: str, + fragments: List[Tuple[int, int]]): + dt_file = datetime_from_filename(filename) + fmt = '%H:%M:%S' + + text = f'Camera: <b>{camera}</b>\n' + text += f'Original file: <b>{filename}</b> ' + text += _tg_links(TelegramLinkType.ORIGINAL_FILE, camera, filename) + + for start, end in fragments: + start -= ipcam_config['motion_padding'] + end += ipcam_config['motion_padding'] + + if start < 0: + start = 0 + + duration = end - start + if duration < 0: + duration = 0 + + dt1 = dt_file + timedelta(seconds=start) + dt2 = dt_file + timedelta(seconds=end) + + text += f'\nFragment: <b>{duration}s</b>, {dt1.strftime(fmt)}-{dt2.strftime(fmt)} ' + text += _tg_links(TelegramLinkType.FRAGMENT, camera, f'{dt1.strftime(datetime_format)}__{dt2.strftime(datetime_format)}.mp4') + + await telegram.send_message(text) + + +def _tg_links(link_type: TelegramLinkType, + camera: int, + file: str) -> str: + links = [] + for link_name, link_template in ipcam_config[f'{link_type.value}_url_templates']: + link = link_template.replace('{camera}', str(camera)).replace('{file}', file) + links.append(f'<a href="{link}">{link_name}</a>') + return ' '.join(links) + + +async def fix_job() -> None: + global fix_job_running + logger.debug('fix_job: starting') + + if fix_job_running: + logger.error('fix_job: already running') + return + + try: + fix_job_running = True + for cam in ipcam_config.get_all_cam_names_for_this_server(): + files = get_recordings_files(cam, TimeFilterType.FIX) + if not files: + logger.debug(f'fix_job: no files for camera {cam}') + continue + + logger.debug(f'fix_job: got %d files for camera {cam}' % (len(files),)) + + for file in files: + fullpath = os.path.join(get_recordings_path(cam), file['name']) + await camutil.ffmpeg_recreate(fullpath) + timestamp = datetime_from_filename(file['name']) + if timestamp: + db.set_timestamp(cam, TimeFilterType.FIX, timestamp) + + finally: + fix_job_running = False + + +async def cleanup_job() -> None: + def compare(i1: str, i2: str) -> int: + dt1 = datetime_from_filename(i1) + dt2 = datetime_from_filename(i2) + + if dt1 < dt2: + return -1 + elif dt1 > dt2: + return 1 + else: + return 0 + + global cleanup_job_running + logger.debug('cleanup_job: starting') + + if cleanup_job_running: + logger.error('cleanup_job: already running') + return + + try: + cleanup_job_running = True + + gb = float(1 << 30) + disk_number = 0 + for storage in lbc_config.get_board_disks(gethostname()): + disk_number += 1 + if os.path.exists(storage['mountpoint']): + total, used, free = shutil.disk_usage(storage['mountpoint']) + free_gb = free // gb + if free_gb < ipcam_config['cleanup_min_gb']: + cleaned = 0 + files = [] + for cam in ipcam_config.get_all_cam_names_for_this_server(filter_by_disk=disk_number): + for _dir in (get_recordings_path(cam), get_motion_path(cam)): + files += list(map(lambda file: os.path.join(_dir, file), os.listdir(_dir))) + files = list(filter(lambda path: os.path.isfile(path) and path.endswith(tuple([f'.{t.value}' for t in VideoContainerType])), files)) + files.sort(key=cmp_to_key(compare)) + + for file in files: + size = os.stat(file).st_size + try: + os.unlink(file) + cleaned += size + except OSError as e: + logger.exception(e) + if (free + cleaned) // gb >= ipcam_config['cleanup_min_gb']: + break + else: + logger.error(f"cleanup_job: {storage['mountpoint']} not found") + finally: + cleanup_job_running = False + + +fix_job_running = False +cleanup_job_running = False + +datetime_format = '%Y-%m-%d-%H.%M.%S' +datetime_format_re = r'\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}.\d{2}' +db: Optional[IpcamServerDatabase] = None +server: Optional[IpcamWebServer] = None +logger = logging.getLogger(__name__) + + +# start of the program +# -------------------- + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--listen', type=str, required=True) + parser.add_argument('--database-path', type=str, required=True) + arg = homekit_config.load_app(no_config=True, parser=parser) + + open_database(arg.database_path) + + loop = asyncio.get_event_loop() + + try: + scheduler = AsyncIOScheduler(event_loop=loop) + if ipcam_config['fix_enabled']: + scheduler.add_job(fix_job, 'interval', + seconds=ipcam_config['fix_interval'], + misfire_grace_time=None) + + scheduler.add_job(cleanup_job, 'interval', + seconds=ipcam_config['cleanup_interval'], + misfire_grace_time=None) + scheduler.start() + except KeyError: + pass + + asyncio.ensure_future(fix_job()) + asyncio.ensure_future(cleanup_job()) + + server = IpcamWebServer(Addr.fromstring(arg.listen)) + server.run() |