summaryrefslogtreecommitdiff
path: root/bin/ipcam_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/ipcam_server.py')
-rwxr-xr-xbin/ipcam_server.py581
1 files changed, 581 insertions, 0 deletions
diff --git a/bin/ipcam_server.py b/bin/ipcam_server.py
new file mode 100755
index 0000000..211bc86
--- /dev/null
+++ b/bin/ipcam_server.py
@@ -0,0 +1,581 @@
+#!/usr/bin/env python3
+import logging
+import os
+import re
+import asyncio
+import time
+import shutil
+import __py_include
+
+import homekit.telegram.aio as telegram
+
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from asyncio import Lock
+
+from homekit.config import config
+from homekit import http
+from homekit.database.sqlite import SQLiteBase
+from homekit.camera import util as camutil
+
+from enum import Enum
+from typing import Optional, Union, List, Tuple
+from datetime import datetime, timedelta
+from functools import cmp_to_key
+
+
+class TimeFilterType(Enum):
+ FIX = 'fix'
+ MOTION = 'motion'
+ MOTION_START = 'motion_start'
+
+
+class TelegramLinkType(Enum):
+ FRAGMENT = 'fragment'
+ ORIGINAL_FILE = 'original_file'
+
+
+def valid_recording_name(filename: str) -> bool:
+ return filename.startswith('record_') and filename.endswith('.mp4')
+
+
+def filename_to_datetime(filename: str) -> datetime:
+ filename = os.path.basename(filename).replace('record_', '').replace('.mp4', '')
+ return datetime.strptime(filename, datetime_format)
+
+
+def get_all_cams() -> list:
+ return [cam for cam in config['camera'].keys()]
+
+
+# ipcam database
+# --------------
+
+class IPCamServerDatabase(SQLiteBase):
+ SCHEMA = 4
+
+ def __init__(self):
+ super().__init__()
+
+ def schema_init(self, version: int) -> None:
+ cursor = self.cursor()
+
+ if version < 1:
+ # timestamps
+ cursor.execute("""CREATE TABLE IF NOT EXISTS timestamps (
+ camera INTEGER PRIMARY KEY,
+ fix_time INTEGER NOT NULL,
+ motion_time INTEGER NOT NULL
+ )""")
+ for cam in config['camera'].keys():
+ self.add_camera(cam)
+
+ if version < 2:
+ # motion_failures
+ cursor.execute("""CREATE TABLE IF NOT EXISTS motion_failures (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ camera INTEGER NOT NULL,
+ filename TEXT NOT NULL
+ )""")
+
+ if version < 3:
+ cursor.execute("ALTER TABLE motion_failures ADD COLUMN message TEXT NOT NULL DEFAULT ''")
+
+ if version < 4:
+ cursor.execute("ALTER TABLE timestamps ADD COLUMN motion_start_time INTEGER NOT NULL DEFAULT 0")
+ cursor.execute("UPDATE timestamps SET motion_start_time=motion_time")
+
+ self.commit()
+
+ def add_camera(self, camera: int):
+ self.cursor().execute("INSERT INTO timestamps (camera, fix_time, motion_time) VALUES (?, ?, ?)",
+ (camera, 0, 0))
+ self.commit()
+
+ def add_motion_failure(self,
+ camera: int,
+ filename: str,
+ message: Optional[str]):
+ self.cursor().execute("INSERT INTO motion_failures (camera, filename, message) VALUES (?, ?, ?)",
+ (camera, filename, message or ''))
+ self.commit()
+
+ def get_all_timestamps(self):
+ cur = self.cursor()
+ data = {}
+
+ cur.execute("SELECT camera, fix_time, motion_time, motion_start_time FROM timestamps")
+ for cam, fix_time, motion_time, motion_start_time in cur.fetchall():
+ data[int(cam)] = {
+ 'fix': int(fix_time),
+ 'motion': int(motion_time),
+ 'motion_start': int(motion_start_time)
+ }
+
+ return data
+
+ def set_timestamp(self,
+ camera: int,
+ time_type: TimeFilterType,
+ time: Union[int, datetime]):
+ cur = self.cursor()
+ if isinstance(time, datetime):
+ time = int(time.timestamp())
+ cur.execute(f"UPDATE timestamps SET {time_type.value}_time=? WHERE camera=?", (time, camera))
+ self.commit()
+
+ def get_timestamp(self,
+ camera: int,
+ time_type: TimeFilterType) -> int:
+ cur = self.cursor()
+ cur.execute(f"SELECT {time_type.value}_time FROM timestamps WHERE camera=?", (camera,))
+ return int(cur.fetchone()[0])
+
+
+# ipcam web api
+# -------------
+
+class IPCamWebServer(http.HTTPServer):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self.get('/api/recordings', self.get_motion_queue)
+ self.get('/api/recordings/{name}', self.get_camera_recordings)
+ self.get('/api/recordings/{name}/download/{file}', self.download_recording)
+ self.get('/api/camera/list', self.camlist)
+ self.get('/api/timestamp/{name}/{type}', self.get_timestamp)
+ self.get('/api/timestamp/all', self.get_all_timestamps)
+
+ self.post('/api/debug/migrate-mtimes', self.debug_migrate_mtimes)
+ self.post('/api/debug/fix', self.debug_fix)
+ self.post('/api/debug/cleanup', self.debug_cleanup)
+ self.post('/api/timestamp/{name}/{type}', self.set_timestamp)
+
+ self.post('/api/motion/done/{name}', self.submit_motion)
+ self.post('/api/motion/fail/{name}', self.submit_motion_failure)
+
+ self.get('/api/motion/params/{name}', self.get_motion_params)
+ self.get('/api/motion/params/{name}/roi', self.get_motion_roi_params)
+
+ self.queue_lock = Lock()
+
+ async def get_camera_recordings(self, req):
+ camera = int(req.match_info['name'])
+ try:
+ filter = TimeFilterType(req.query['filter'])
+ except KeyError:
+ filter = None
+
+ try:
+ limit = int(req.query['limit'])
+ except KeyError:
+ limit = 0
+
+ files = get_recordings_files(camera, filter, limit)
+ if files:
+ time = filename_to_datetime(files[len(files)-1]['name'])
+ db.set_timestamp(camera, TimeFilterType.MOTION_START, time)
+ return self.ok({'files': files})
+
+ async def get_motion_queue(self, req):
+ try:
+ limit = int(req.query['limit'])
+ except KeyError:
+ limit = 0
+
+ async with self.queue_lock:
+ files = get_recordings_files(None, TimeFilterType.MOTION_START, limit)
+ if files:
+ times_by_cam = {}
+ for file in files:
+ time = filename_to_datetime(file['name'])
+ if file['cam'] not in times_by_cam or times_by_cam[file['cam']] < time:
+ times_by_cam[file['cam']] = time
+ for cam, time in times_by_cam.items():
+ db.set_timestamp(cam, TimeFilterType.MOTION_START, time)
+
+ return self.ok({'files': files})
+
+ async def download_recording(self, req: http.Request):
+ cam = int(req.match_info['name'])
+ file = req.match_info['file']
+
+ fullpath = os.path.join(config['camera'][cam]['recordings_path'], file)
+ if not os.path.isfile(fullpath):
+ raise ValueError(f'file "{fullpath}" does not exists')
+
+ return http.FileResponse(fullpath)
+
+ async def camlist(self, req: http.Request):
+ return self.ok(config['camera'])
+
+ async def submit_motion(self, req: http.Request):
+ data = await req.post()
+
+ camera = int(req.match_info['name'])
+ timecodes = data['timecodes']
+ filename = data['filename']
+
+ time = filename_to_datetime(filename)
+
+ try:
+ if timecodes != '':
+ fragments = camutil.dvr_scan_timecodes(timecodes)
+ asyncio.ensure_future(process_fragments(camera, filename, fragments))
+
+ db.set_timestamp(camera, TimeFilterType.MOTION, time)
+ return self.ok()
+
+ except camutil.DVRScanInvalidTimecodes as e:
+ db.add_motion_failure(camera, filename, str(e))
+ db.set_timestamp(camera, TimeFilterType.MOTION, time)
+ return self.ok('invalid timecodes')
+
+ async def submit_motion_failure(self, req: http.Request):
+ camera = int(req.match_info['name'])
+
+ data = await req.post()
+ filename = data['filename']
+ message = data['message']
+
+ db.add_motion_failure(camera, filename, message)
+ db.set_timestamp(camera, TimeFilterType.MOTION, filename_to_datetime(filename))
+
+ return self.ok()
+
+ async def debug_migrate_mtimes(self, req: http.Request):
+ written = {}
+ for cam in config['camera'].keys():
+ confdir = os.path.join(os.getenv('HOME'), '.config', f'video-util-{cam}')
+ for time_type in TimeFilterType:
+ txt_file = os.path.join(confdir, f'{time_type.value}_mtime')
+ if os.path.isfile(txt_file):
+ with open(txt_file, 'r') as fd:
+ data = fd.read()
+ db.set_timestamp(cam, time_type, int(data.strip()))
+
+ if cam not in written:
+ written[cam] = []
+ written[cam].append(time_type)
+
+ return self.ok({'written': written})
+
+ async def debug_fix(self, req: http.Request):
+ asyncio.ensure_future(fix_job())
+ return self.ok()
+
+ async def debug_cleanup(self, req: http.Request):
+ asyncio.ensure_future(cleanup_job())
+ return self.ok()
+
+ async def set_timestamp(self, req: http.Request):
+ cam, time_type, time = self._getset_timestamp_params(req, need_time=True)
+ db.set_timestamp(cam, time_type, time)
+ return self.ok()
+
+ async def get_timestamp(self, req: http.Request):
+ cam, time_type = self._getset_timestamp_params(req)
+ return self.ok(db.get_timestamp(cam, time_type))
+
+ async def get_all_timestamps(self, req: http.Request):
+ return self.ok(db.get_all_timestamps())
+
+ async def get_motion_params(self, req: http.Request):
+ data = config['motion_params'][int(req.match_info['name'])]
+ lines = [
+ f'threshold={data["threshold"]}',
+ f'min_event_length=3s',
+ f'frame_skip=2',
+ f'downscale_factor=3',
+ ]
+ return self.plain('\n'.join(lines)+'\n')
+
+ async def get_motion_roi_params(self, req: http.Request):
+ data = config['motion_params'][int(req.match_info['name'])]
+ return self.plain('\n'.join(data['roi'])+'\n')
+
+ @staticmethod
+ def _getset_timestamp_params(req: http.Request, need_time=False):
+ values = []
+
+ cam = int(req.match_info['name'])
+ assert cam in config['camera'], 'invalid camera'
+
+ values.append(cam)
+ values.append(TimeFilterType(req.match_info['type']))
+
+ if need_time:
+ time = req.query['time']
+ if time.startswith('record_'):
+ time = filename_to_datetime(time)
+ elif time.isnumeric():
+ time = int(time)
+ else:
+ raise ValueError('invalid time')
+ values.append(time)
+
+ return values
+
+
+# other global stuff
+# ------------------
+
+def open_database():
+ global db
+ db = IPCamServerDatabase()
+
+ # update cams list in database, if needed
+ cams = db.get_all_timestamps().keys()
+ for cam in config['camera']:
+ if cam not in cams:
+ db.add_camera(cam)
+
+
+def get_recordings_path(cam: int) -> str:
+ return config['camera'][cam]['recordings_path']
+
+
+def get_motion_path(cam: int) -> str:
+ return config['camera'][cam]['motion_path']
+
+
+def get_recordings_files(cam: Optional[int] = None,
+ time_filter_type: Optional[TimeFilterType] = None,
+ limit=0) -> List[dict]:
+ from_time = 0
+ to_time = int(time.time())
+
+ cams = [cam] if cam is not None else get_all_cams()
+ files = []
+ for cam in cams:
+ if time_filter_type:
+ from_time = db.get_timestamp(cam, time_filter_type)
+ if time_filter_type in (TimeFilterType.MOTION, TimeFilterType.MOTION_START):
+ to_time = db.get_timestamp(cam, TimeFilterType.FIX)
+
+ from_time = datetime.fromtimestamp(from_time)
+ to_time = datetime.fromtimestamp(to_time)
+
+ recdir = get_recordings_path(cam)
+ cam_files = [{
+ 'cam': cam,
+ 'name': file,
+ 'size': os.path.getsize(os.path.join(recdir, file))}
+ for file in os.listdir(recdir)
+ if valid_recording_name(file) and from_time < filename_to_datetime(file) <= to_time]
+ cam_files.sort(key=lambda file: file['name'])
+
+ if cam_files:
+ last = cam_files[len(cam_files)-1]
+ fullpath = os.path.join(recdir, last['name'])
+ if camutil.has_handle(fullpath):
+ logger.debug(f'get_recordings_files: file {fullpath} has opened handle, ignoring it')
+ cam_files.pop()
+ files.extend(cam_files)
+
+ if limit > 0:
+ files = files[:limit]
+
+ return files
+
+
+async def process_fragments(camera: int,
+ filename: str,
+ fragments: List[Tuple[int, int]]) -> None:
+ time = filename_to_datetime(filename)
+
+ rec_dir = get_recordings_path(camera)
+ motion_dir = get_motion_path(camera)
+ if not os.path.exists(motion_dir):
+ os.mkdir(motion_dir)
+
+ for fragment in fragments:
+ start, end = fragment
+
+ start -= config['motion']['padding']
+ end += config['motion']['padding']
+
+ if start < 0:
+ start = 0
+
+ duration = end - start
+
+ dt1 = (time + timedelta(seconds=start)).strftime(datetime_format)
+ dt2 = (time + timedelta(seconds=end)).strftime(datetime_format)
+
+ await camutil.ffmpeg_cut(input=os.path.join(rec_dir, filename),
+ output=os.path.join(motion_dir, f'{dt1}__{dt2}.mp4'),
+ start_pos=start,
+ duration=duration)
+
+ if fragments and 'telegram' in config['motion'] and config['motion']['telegram']:
+ asyncio.ensure_future(motion_notify_tg(camera, filename, fragments))
+
+
+async def motion_notify_tg(camera: int,
+ filename: str,
+ fragments: List[Tuple[int, int]]):
+ dt_file = filename_to_datetime(filename)
+ fmt = '%H:%M:%S'
+
+ text = f'Camera: <b>{camera}</b>\n'
+ text += f'Original file: <b>{filename}</b> '
+ text += _tg_links(TelegramLinkType.ORIGINAL_FILE, camera, filename)
+
+ for start, end in fragments:
+ start -= config['motion']['padding']
+ end += config['motion']['padding']
+
+ if start < 0:
+ start = 0
+
+ duration = end - start
+ if duration < 0:
+ duration = 0
+
+ dt1 = dt_file + timedelta(seconds=start)
+ dt2 = dt_file + timedelta(seconds=end)
+
+ text += f'\nFragment: <b>{duration}s</b>, {dt1.strftime(fmt)}-{dt2.strftime(fmt)} '
+ text += _tg_links(TelegramLinkType.FRAGMENT, camera, f'{dt1.strftime(datetime_format)}__{dt2.strftime(datetime_format)}.mp4')
+
+ await telegram.send_message(text)
+
+
+def _tg_links(link_type: TelegramLinkType,
+ camera: int,
+ file: str) -> str:
+ links = []
+ for link_name, link_template in config['telegram'][f'{link_type.value}_url_templates']:
+ link = link_template.replace('{camera}', str(camera)).replace('{file}', file)
+ links.append(f'<a href="{link}">{link_name}</a>')
+ return ' '.join(links)
+
+
+async def fix_job() -> None:
+ global fix_job_running
+ logger.debug('fix_job: starting')
+
+ if fix_job_running:
+ logger.error('fix_job: already running')
+ return
+
+ try:
+ fix_job_running = True
+ for cam in config['camera'].keys():
+ files = get_recordings_files(cam, TimeFilterType.FIX)
+ if not files:
+ logger.debug(f'fix_job: no files for camera {cam}')
+ continue
+
+ logger.debug(f'fix_job: got %d files for camera {cam}' % (len(files),))
+
+ for file in files:
+ fullpath = os.path.join(get_recordings_path(cam), file['name'])
+ await camutil.ffmpeg_recreate(fullpath)
+ timestamp = filename_to_datetime(file['name'])
+ if timestamp:
+ db.set_timestamp(cam, TimeFilterType.FIX, timestamp)
+
+ finally:
+ fix_job_running = False
+
+
+async def cleanup_job() -> None:
+ def fn2dt(name: str) -> datetime:
+ name = os.path.basename(name)
+
+ if name.startswith('record_'):
+ return datetime.strptime(re.match(r'record_(.*?)\.mp4', name).group(1), datetime_format)
+
+ m = re.match(rf'({datetime_format_re})__{datetime_format_re}\.mp4', name)
+ if m:
+ return datetime.strptime(m.group(1), datetime_format)
+
+ raise ValueError(f'unrecognized filename format: {name}')
+
+ def compare(i1: str, i2: str) -> int:
+ dt1 = fn2dt(i1)
+ dt2 = fn2dt(i2)
+
+ if dt1 < dt2:
+ return -1
+ elif dt1 > dt2:
+ return 1
+ else:
+ return 0
+
+ global cleanup_job_running
+ logger.debug('cleanup_job: starting')
+
+ if cleanup_job_running:
+ logger.error('cleanup_job: already running')
+ return
+
+ try:
+ cleanup_job_running = True
+
+ gb = float(1 << 30)
+ for storage in config['storages']:
+ if os.path.exists(storage['mountpoint']):
+ total, used, free = shutil.disk_usage(storage['mountpoint'])
+ free_gb = free // gb
+ if free_gb < config['cleanup_min_gb']:
+ # print(f"{storage['mountpoint']}: free={free}, free_gb={free_gb}")
+ cleaned = 0
+ files = []
+ for cam in storage['cams']:
+ for _dir in (config['camera'][cam]['recordings_path'], config['camera'][cam]['motion_path']):
+ files += list(map(lambda file: os.path.join(_dir, file), os.listdir(_dir)))
+ files = list(filter(lambda path: os.path.isfile(path) and path.endswith('.mp4'), files))
+ files.sort(key=cmp_to_key(compare))
+
+ for file in files:
+ size = os.stat(file).st_size
+ try:
+ os.unlink(file)
+ cleaned += size
+ except OSError as e:
+ logger.exception(e)
+ if (free + cleaned) // gb >= config['cleanup_min_gb']:
+ break
+ else:
+ logger.error(f"cleanup_job: {storage['mountpoint']} not found")
+ finally:
+ cleanup_job_running = False
+
+
+fix_job_running = False
+cleanup_job_running = False
+
+datetime_format = '%Y-%m-%d-%H.%M.%S'
+datetime_format_re = r'\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}.\d{2}'
+db: Optional[IPCamServerDatabase] = None
+server: Optional[IPCamWebServer] = None
+logger = logging.getLogger(__name__)
+
+
+# start of the program
+# --------------------
+
+if __name__ == '__main__':
+ config.load_app('ipcam_server')
+
+ open_database()
+
+ loop = asyncio.get_event_loop()
+
+ try:
+ scheduler = AsyncIOScheduler(event_loop=loop)
+ if config['fix_enabled']:
+ scheduler.add_job(fix_job, 'interval', seconds=config['fix_interval'], misfire_grace_time=None)
+
+ scheduler.add_job(cleanup_job, 'interval', seconds=config['cleanup_interval'], misfire_grace_time=None)
+ scheduler.start()
+ except KeyError:
+ pass
+
+ asyncio.ensure_future(fix_job())
+ asyncio.ensure_future(cleanup_job())
+
+ server = IPCamWebServer(config.get_addr('server.listen'))
+ server.run()