diff options
Diffstat (limited to 'src/ipcam_server.py')
-rwxr-xr-x | src/ipcam_server.py | 579 |
1 files changed, 0 insertions, 579 deletions
diff --git a/src/ipcam_server.py b/src/ipcam_server.py deleted file mode 100755 index 2c4915d..0000000 --- a/src/ipcam_server.py +++ /dev/null @@ -1,579 +0,0 @@ -#!/usr/bin/env python3 -import logging -import os -import re -import asyncio -import time -import shutil -import home.telegram.aio as telegram - -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from asyncio import Lock - -from home.config import config -from home import http -from home.database.sqlite import SQLiteBase -from home.camera import util as camutil - -from enum import Enum -from typing import Optional, Union, List, Tuple -from datetime import datetime, timedelta -from functools import cmp_to_key - - -class TimeFilterType(Enum): - FIX = 'fix' - MOTION = 'motion' - MOTION_START = 'motion_start' - - -class TelegramLinkType(Enum): - FRAGMENT = 'fragment' - ORIGINAL_FILE = 'original_file' - - -def valid_recording_name(filename: str) -> bool: - return filename.startswith('record_') and filename.endswith('.mp4') - - -def filename_to_datetime(filename: str) -> datetime: - filename = os.path.basename(filename).replace('record_', '').replace('.mp4', '') - return datetime.strptime(filename, datetime_format) - - -def get_all_cams() -> list: - return [cam for cam in config['camera'].keys()] - - -# ipcam database -# -------------- - -class IPCamServerDatabase(SQLiteBase): - SCHEMA = 4 - - def __init__(self): - super().__init__() - - def schema_init(self, version: int) -> None: - cursor = self.cursor() - - if version < 1: - # timestamps - cursor.execute("""CREATE TABLE IF NOT EXISTS timestamps ( - camera INTEGER PRIMARY KEY, - fix_time INTEGER NOT NULL, - motion_time INTEGER NOT NULL - )""") - for cam in config['camera'].keys(): - self.add_camera(cam) - - if version < 2: - # motion_failures - cursor.execute("""CREATE TABLE IF NOT EXISTS motion_failures ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - camera INTEGER NOT NULL, - filename TEXT NOT NULL - )""") - - if version < 3: - cursor.execute("ALTER TABLE motion_failures ADD COLUMN message TEXT NOT NULL DEFAULT ''") - - if version < 4: - cursor.execute("ALTER TABLE timestamps ADD COLUMN motion_start_time INTEGER NOT NULL DEFAULT 0") - cursor.execute("UPDATE timestamps SET motion_start_time=motion_time") - - self.commit() - - def add_camera(self, camera: int): - self.cursor().execute("INSERT INTO timestamps (camera, fix_time, motion_time) VALUES (?, ?, ?)", - (camera, 0, 0)) - self.commit() - - def add_motion_failure(self, - camera: int, - filename: str, - message: Optional[str]): - self.cursor().execute("INSERT INTO motion_failures (camera, filename, message) VALUES (?, ?, ?)", - (camera, filename, message or '')) - self.commit() - - def get_all_timestamps(self): - cur = self.cursor() - data = {} - - cur.execute("SELECT camera, fix_time, motion_time, motion_start_time FROM timestamps") - for cam, fix_time, motion_time, motion_start_time in cur.fetchall(): - data[int(cam)] = { - 'fix': int(fix_time), - 'motion': int(motion_time), - 'motion_start': int(motion_start_time) - } - - return data - - def set_timestamp(self, - camera: int, - time_type: TimeFilterType, - time: Union[int, datetime]): - cur = self.cursor() - if isinstance(time, datetime): - time = int(time.timestamp()) - cur.execute(f"UPDATE timestamps SET {time_type.value}_time=? WHERE camera=?", (time, camera)) - self.commit() - - def get_timestamp(self, - camera: int, - time_type: TimeFilterType) -> int: - cur = self.cursor() - cur.execute(f"SELECT {time_type.value}_time FROM timestamps WHERE camera=?", (camera,)) - return int(cur.fetchone()[0]) - - -# ipcam web api -# ------------- - -class IPCamWebServer(http.HTTPServer): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - self.get('/api/recordings', self.get_motion_queue) - self.get('/api/recordings/{name}', self.get_camera_recordings) - self.get('/api/recordings/{name}/download/{file}', self.download_recording) - self.get('/api/camera/list', self.camlist) - self.get('/api/timestamp/{name}/{type}', self.get_timestamp) - self.get('/api/timestamp/all', self.get_all_timestamps) - - self.post('/api/debug/migrate-mtimes', self.debug_migrate_mtimes) - self.post('/api/debug/fix', self.debug_fix) - self.post('/api/debug/cleanup', self.debug_cleanup) - self.post('/api/timestamp/{name}/{type}', self.set_timestamp) - - self.post('/api/motion/done/{name}', self.submit_motion) - self.post('/api/motion/fail/{name}', self.submit_motion_failure) - - self.get('/api/motion/params/{name}', self.get_motion_params) - self.get('/api/motion/params/{name}/roi', self.get_motion_roi_params) - - self.queue_lock = Lock() - - async def get_camera_recordings(self, req): - camera = int(req.match_info['name']) - try: - filter = TimeFilterType(req.query['filter']) - except KeyError: - filter = None - - try: - limit = int(req.query['limit']) - except KeyError: - limit = 0 - - files = get_recordings_files(camera, filter, limit) - if files: - time = filename_to_datetime(files[len(files)-1]['name']) - db.set_timestamp(camera, TimeFilterType.MOTION_START, time) - return self.ok({'files': files}) - - async def get_motion_queue(self, req): - try: - limit = int(req.query['limit']) - except KeyError: - limit = 0 - - async with self.queue_lock: - files = get_recordings_files(None, TimeFilterType.MOTION_START, limit) - if files: - times_by_cam = {} - for file in files: - time = filename_to_datetime(file['name']) - if file['cam'] not in times_by_cam or times_by_cam[file['cam']] < time: - times_by_cam[file['cam']] = time - for cam, time in times_by_cam.items(): - db.set_timestamp(cam, TimeFilterType.MOTION_START, time) - - return self.ok({'files': files}) - - async def download_recording(self, req: http.Request): - cam = int(req.match_info['name']) - file = req.match_info['file'] - - fullpath = os.path.join(config['camera'][cam]['recordings_path'], file) - if not os.path.isfile(fullpath): - raise ValueError(f'file "{fullpath}" does not exists') - - return http.FileResponse(fullpath) - - async def camlist(self, req: http.Request): - return self.ok(config['camera']) - - async def submit_motion(self, req: http.Request): - data = await req.post() - - camera = int(req.match_info['name']) - timecodes = data['timecodes'] - filename = data['filename'] - - time = filename_to_datetime(filename) - - try: - if timecodes != '': - fragments = camutil.dvr_scan_timecodes(timecodes) - asyncio.ensure_future(process_fragments(camera, filename, fragments)) - - db.set_timestamp(camera, TimeFilterType.MOTION, time) - return self.ok() - - except camutil.DVRScanInvalidTimecodes as e: - db.add_motion_failure(camera, filename, str(e)) - db.set_timestamp(camera, TimeFilterType.MOTION, time) - return self.ok('invalid timecodes') - - async def submit_motion_failure(self, req: http.Request): - camera = int(req.match_info['name']) - - data = await req.post() - filename = data['filename'] - message = data['message'] - - db.add_motion_failure(camera, filename, message) - db.set_timestamp(camera, TimeFilterType.MOTION, filename_to_datetime(filename)) - - return self.ok() - - async def debug_migrate_mtimes(self, req: http.Request): - written = {} - for cam in config['camera'].keys(): - confdir = os.path.join(os.getenv('HOME'), '.config', f'video-util-{cam}') - for time_type in TimeFilterType: - txt_file = os.path.join(confdir, f'{time_type.value}_mtime') - if os.path.isfile(txt_file): - with open(txt_file, 'r') as fd: - data = fd.read() - db.set_timestamp(cam, time_type, int(data.strip())) - - if cam not in written: - written[cam] = [] - written[cam].append(time_type) - - return self.ok({'written': written}) - - async def debug_fix(self, req: http.Request): - asyncio.ensure_future(fix_job()) - return self.ok() - - async def debug_cleanup(self, req: http.Request): - asyncio.ensure_future(cleanup_job()) - return self.ok() - - async def set_timestamp(self, req: http.Request): - cam, time_type, time = self._getset_timestamp_params(req, need_time=True) - db.set_timestamp(cam, time_type, time) - return self.ok() - - async def get_timestamp(self, req: http.Request): - cam, time_type = self._getset_timestamp_params(req) - return self.ok(db.get_timestamp(cam, time_type)) - - async def get_all_timestamps(self, req: http.Request): - return self.ok(db.get_all_timestamps()) - - async def get_motion_params(self, req: http.Request): - data = config['motion_params'][int(req.match_info['name'])] - lines = [ - f'threshold={data["threshold"]}', - f'min_event_length=3s', - f'frame_skip=2', - f'downscale_factor=3', - ] - return self.plain('\n'.join(lines)+'\n') - - async def get_motion_roi_params(self, req: http.Request): - data = config['motion_params'][int(req.match_info['name'])] - return self.plain('\n'.join(data['roi'])+'\n') - - @staticmethod - def _getset_timestamp_params(req: http.Request, need_time=False): - values = [] - - cam = int(req.match_info['name']) - assert cam in config['camera'], 'invalid camera' - - values.append(cam) - values.append(TimeFilterType(req.match_info['type'])) - - if need_time: - time = req.query['time'] - if time.startswith('record_'): - time = filename_to_datetime(time) - elif time.isnumeric(): - time = int(time) - else: - raise ValueError('invalid time') - values.append(time) - - return values - - -# other global stuff -# ------------------ - -def open_database(): - global db - db = IPCamServerDatabase() - - # update cams list in database, if needed - cams = db.get_all_timestamps().keys() - for cam in config['camera']: - if cam not in cams: - db.add_camera(cam) - - -def get_recordings_path(cam: int) -> str: - return config['camera'][cam]['recordings_path'] - - -def get_motion_path(cam: int) -> str: - return config['camera'][cam]['motion_path'] - - -def get_recordings_files(cam: Optional[int] = None, - time_filter_type: Optional[TimeFilterType] = None, - limit=0) -> List[dict]: - from_time = 0 - to_time = int(time.time()) - - cams = [cam] if cam is not None else get_all_cams() - files = [] - for cam in cams: - if time_filter_type: - from_time = db.get_timestamp(cam, time_filter_type) - if time_filter_type in (TimeFilterType.MOTION, TimeFilterType.MOTION_START): - to_time = db.get_timestamp(cam, TimeFilterType.FIX) - - from_time = datetime.fromtimestamp(from_time) - to_time = datetime.fromtimestamp(to_time) - - recdir = get_recordings_path(cam) - cam_files = [{ - 'cam': cam, - 'name': file, - 'size': os.path.getsize(os.path.join(recdir, file))} - for file in os.listdir(recdir) - if valid_recording_name(file) and from_time < filename_to_datetime(file) <= to_time] - cam_files.sort(key=lambda file: file['name']) - - if cam_files: - last = cam_files[len(cam_files)-1] - fullpath = os.path.join(recdir, last['name']) - if camutil.has_handle(fullpath): - logger.debug(f'get_recordings_files: file {fullpath} has opened handle, ignoring it') - cam_files.pop() - files.extend(cam_files) - - if limit > 0: - files = files[:limit] - - return files - - -async def process_fragments(camera: int, - filename: str, - fragments: List[Tuple[int, int]]) -> None: - time = filename_to_datetime(filename) - - rec_dir = get_recordings_path(camera) - motion_dir = get_motion_path(camera) - if not os.path.exists(motion_dir): - os.mkdir(motion_dir) - - for fragment in fragments: - start, end = fragment - - start -= config['motion']['padding'] - end += config['motion']['padding'] - - if start < 0: - start = 0 - - duration = end - start - - dt1 = (time + timedelta(seconds=start)).strftime(datetime_format) - dt2 = (time + timedelta(seconds=end)).strftime(datetime_format) - - await camutil.ffmpeg_cut(input=os.path.join(rec_dir, filename), - output=os.path.join(motion_dir, f'{dt1}__{dt2}.mp4'), - start_pos=start, - duration=duration) - - if fragments and 'telegram' in config['motion'] and config['motion']['telegram']: - asyncio.ensure_future(motion_notify_tg(camera, filename, fragments)) - - -async def motion_notify_tg(camera: int, - filename: str, - fragments: List[Tuple[int, int]]): - dt_file = filename_to_datetime(filename) - fmt = '%H:%M:%S' - - text = f'Camera: <b>{camera}</b>\n' - text += f'Original file: <b>{filename}</b> ' - text += _tg_links(TelegramLinkType.ORIGINAL_FILE, camera, filename) - - for start, end in fragments: - start -= config['motion']['padding'] - end += config['motion']['padding'] - - if start < 0: - start = 0 - - duration = end - start - if duration < 0: - duration = 0 - - dt1 = dt_file + timedelta(seconds=start) - dt2 = dt_file + timedelta(seconds=end) - - text += f'\nFragment: <b>{duration}s</b>, {dt1.strftime(fmt)}-{dt2.strftime(fmt)} ' - text += _tg_links(TelegramLinkType.FRAGMENT, camera, f'{dt1.strftime(datetime_format)}__{dt2.strftime(datetime_format)}.mp4') - - await telegram.send_message(text) - - -def _tg_links(link_type: TelegramLinkType, - camera: int, - file: str) -> str: - links = [] - for link_name, link_template in config['telegram'][f'{link_type.value}_url_templates']: - link = link_template.replace('{camera}', str(camera)).replace('{file}', file) - links.append(f'<a href="{link}">{link_name}</a>') - return ' '.join(links) - - -async def fix_job() -> None: - global fix_job_running - logger.debug('fix_job: starting') - - if fix_job_running: - logger.error('fix_job: already running') - return - - try: - fix_job_running = True - for cam in config['camera'].keys(): - files = get_recordings_files(cam, TimeFilterType.FIX) - if not files: - logger.debug(f'fix_job: no files for camera {cam}') - continue - - logger.debug(f'fix_job: got %d files for camera {cam}' % (len(files),)) - - for file in files: - fullpath = os.path.join(get_recordings_path(cam), file['name']) - await camutil.ffmpeg_recreate(fullpath) - timestamp = filename_to_datetime(file['name']) - if timestamp: - db.set_timestamp(cam, TimeFilterType.FIX, timestamp) - - finally: - fix_job_running = False - - -async def cleanup_job() -> None: - def fn2dt(name: str) -> datetime: - name = os.path.basename(name) - - if name.startswith('record_'): - return datetime.strptime(re.match(r'record_(.*?)\.mp4', name).group(1), datetime_format) - - m = re.match(rf'({datetime_format_re})__{datetime_format_re}\.mp4', name) - if m: - return datetime.strptime(m.group(1), datetime_format) - - raise ValueError(f'unrecognized filename format: {name}') - - def compare(i1: str, i2: str) -> int: - dt1 = fn2dt(i1) - dt2 = fn2dt(i2) - - if dt1 < dt2: - return -1 - elif dt1 > dt2: - return 1 - else: - return 0 - - global cleanup_job_running - logger.debug('cleanup_job: starting') - - if cleanup_job_running: - logger.error('cleanup_job: already running') - return - - try: - cleanup_job_running = True - - gb = float(1 << 30) - for storage in config['storages']: - if os.path.exists(storage['mountpoint']): - total, used, free = shutil.disk_usage(storage['mountpoint']) - free_gb = free // gb - if free_gb < config['cleanup_min_gb']: - # print(f"{storage['mountpoint']}: free={free}, free_gb={free_gb}") - cleaned = 0 - files = [] - for cam in storage['cams']: - for _dir in (config['camera'][cam]['recordings_path'], config['camera'][cam]['motion_path']): - files += list(map(lambda file: os.path.join(_dir, file), os.listdir(_dir))) - files = list(filter(lambda path: os.path.isfile(path) and path.endswith('.mp4'), files)) - files.sort(key=cmp_to_key(compare)) - - for file in files: - size = os.stat(file).st_size - try: - os.unlink(file) - cleaned += size - except OSError as e: - logger.exception(e) - if (free + cleaned) // gb >= config['cleanup_min_gb']: - break - else: - logger.error(f"cleanup_job: {storage['mountpoint']} not found") - finally: - cleanup_job_running = False - - -fix_job_running = False -cleanup_job_running = False - -datetime_format = '%Y-%m-%d-%H.%M.%S' -datetime_format_re = r'\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}.\d{2}' -db: Optional[IPCamServerDatabase] = None -server: Optional[IPCamWebServer] = None -logger = logging.getLogger(__name__) - - -# start of the program -# -------------------- - -if __name__ == '__main__': - config.load('ipcam_server') - - open_database() - - loop = asyncio.get_event_loop() - - try: - scheduler = AsyncIOScheduler(event_loop=loop) - if config['fix_enabled']: - scheduler.add_job(fix_job, 'interval', seconds=config['fix_interval'], misfire_grace_time=None) - - scheduler.add_job(cleanup_job, 'interval', seconds=config['cleanup_interval'], misfire_grace_time=None) - scheduler.start() - except KeyError: - pass - - asyncio.ensure_future(fix_job()) - asyncio.ensure_future(cleanup_job()) - - server = IPCamWebServer(config.get_addr('server.listen')) - server.run() |