#!/usr/bin/env python3
import logging
import os
import asyncio
import time
import shutil
import include_homekit
import homekit.telegram.aio as telegram
from socket import gethostname
from argparse import ArgumentParser
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from asyncio import Lock
from homekit.config import config as homekit_config
from homekit.linux import LinuxBoardsConfig
from homekit.util import Addr
from homekit import http
from homekit.database.sqlite import SQLiteBase
from homekit.camera import util as camutil, IpcamConfig
from homekit.camera.types import (
TimeFilterType,
TelegramLinkType,
VideoContainerType
)
from homekit.camera.util import (
get_recordings_path,
get_motion_path,
is_valid_recording_name,
datetime_from_filename
)
from typing import Optional, Union, List, Tuple
from datetime import datetime, timedelta
from functools import cmp_to_key
ipcam_config = IpcamConfig()
lbc_config = LinuxBoardsConfig()
# ipcam database
# --------------
class IpcamServerDatabase(SQLiteBase):
SCHEMA = 4
def __init__(self, path=None):
super().__init__(path=path)
def schema_init(self, version: int) -> None:
cursor = self.cursor()
if version < 1:
# timestamps
cursor.execute("""CREATE TABLE IF NOT EXISTS timestamps (
camera INTEGER PRIMARY KEY,
fix_time INTEGER NOT NULL,
motion_time INTEGER NOT NULL
)""")
for cam in ipcam_config.get_all_cam_names_for_this_server():
self.add_camera(cam)
if version < 2:
# motion_failures
cursor.execute("""CREATE TABLE IF NOT EXISTS motion_failures (
id INTEGER PRIMARY KEY AUTOINCREMENT,
camera INTEGER NOT NULL,
filename TEXT NOT NULL
)""")
if version < 3:
cursor.execute("ALTER TABLE motion_failures ADD COLUMN message TEXT NOT NULL DEFAULT ''")
if version < 4:
cursor.execute("ALTER TABLE timestamps ADD COLUMN motion_start_time INTEGER NOT NULL DEFAULT 0")
cursor.execute("UPDATE timestamps SET motion_start_time=motion_time")
self.commit()
def add_camera(self, camera: int):
self.cursor().execute("INSERT INTO timestamps (camera, fix_time, motion_time) VALUES (?, ?, ?)",
(camera, 0, 0))
self.commit()
def add_motion_failure(self,
camera: int,
filename: str,
message: Optional[str]):
self.cursor().execute("INSERT INTO motion_failures (camera, filename, message) VALUES (?, ?, ?)",
(camera, filename, message or ''))
self.commit()
def get_all_timestamps(self):
cur = self.cursor()
data = {}
cur.execute("SELECT camera, fix_time, motion_time, motion_start_time FROM timestamps")
for cam, fix_time, motion_time, motion_start_time in cur.fetchall():
data[int(cam)] = {
'fix': int(fix_time),
'motion': int(motion_time),
'motion_start': int(motion_start_time)
}
return data
def set_timestamp(self,
camera: int,
time_type: TimeFilterType,
time: Union[int, datetime]):
cur = self.cursor()
if isinstance(time, datetime):
time = int(time.timestamp())
cur.execute(f"UPDATE timestamps SET {time_type.value}_time=? WHERE camera=?", (time, camera))
self.commit()
def get_timestamp(self,
camera: int,
time_type: TimeFilterType) -> int:
cur = self.cursor()
cur.execute(f"SELECT {time_type.value}_time FROM timestamps WHERE camera=?", (camera,))
return int(cur.fetchone()[0])
# ipcam web api
# -------------
class IpcamWebServer(http.HTTPServer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.get('/api/recordings', self.get_motion_queue)
self.get('/api/recordings/{name}', self.get_camera_recordings)
self.get('/api/recordings/{name}/download/{file}', self.download_recording)
self.get('/api/camera/list', self.camlist)
self.get('/api/timestamp/{name}/{type}', self.get_timestamp)
self.get('/api/timestamp/all', self.get_all_timestamps)
self.post('/api/debug/fix', self.debug_fix)
self.post('/api/debug/cleanup', self.debug_cleanup)
self.post('/api/timestamp/{name}/{type}', self.set_timestamp)
self.post('/api/motion/done/{name}', self.submit_motion)
self.post('/api/motion/fail/{name}', self.submit_motion_failure)
# self.get('/api/motion/params/{name}', self.get_motion_params)
# self.get('/api/motion/params/{name}/roi', self.get_motion_roi_params)
self.queue_lock = Lock()
async def get_camera_recordings(self, req):
camera = int(req.match_info['name'])
try:
filter = TimeFilterType(req.query['filter'])
except KeyError:
filter = None
try:
limit = int(req.query['limit'])
except KeyError:
limit = 0
files = get_recordings_files(camera, filter, limit)
if files:
time = datetime_from_filename(files[len(files)-1]['name'])
db.set_timestamp(camera, TimeFilterType.MOTION_START, time)
return self.ok({'files': files})
async def get_motion_queue(self, req):
try:
limit = int(req.query['limit'])
except KeyError:
limit = 0
async with self.queue_lock:
files = get_recordings_files(None, TimeFilterType.MOTION_START, limit)
if files:
times_by_cam = {}
for file in files:
time = datetime_from_filename(file['name'])
if file['cam'] not in times_by_cam or times_by_cam[file['cam']] < time:
times_by_cam[file['cam']] = time
for cam, time in times_by_cam.items():
db.set_timestamp(cam, TimeFilterType.MOTION_START, time)
return self.ok({'files': files})
async def download_recording(self, req: http.Request):
cam = int(req.match_info['name'])
file = req.match_info['file']
fullpath = os.path.join(get_recordings_path(cam), file)
if not os.path.isfile(fullpath):
raise ValueError(f'file "{fullpath}" does not exists')
return http.FileResponse(fullpath)
async def camlist(self, req: http.Request):
return self.ok(ipcam_config.get_all_cam_names_for_this_server())
async def submit_motion(self, req: http.Request):
data = await req.post()
camera = int(req.match_info['name'])
timecodes = data['timecodes']
filename = data['filename']
time = datetime_from_filename(filename)
try:
if timecodes != '':
fragments = camutil.dvr_scan_timecodes(timecodes)
asyncio.ensure_future(process_fragments(camera, filename, fragments))
db.set_timestamp(camera, TimeFilterType.MOTION, time)
return self.ok()
except camutil.DVRScanInvalidTimecodes as e:
db.add_motion_failure(camera, filename, str(e))
db.set_timestamp(camera, TimeFilterType.MOTION, time)
return self.ok('invalid timecodes')
async def submit_motion_failure(self, req: http.Request):
camera = int(req.match_info['name'])
data = await req.post()
filename = data['filename']
message = data['message']
db.add_motion_failure(camera, filename, message)
db.set_timestamp(camera, TimeFilterType.MOTION, datetime_from_filename(filename))
return self.ok()
async def debug_fix(self, req: http.Request):
asyncio.ensure_future(fix_job())
return self.ok()
async def debug_cleanup(self, req: http.Request):
asyncio.ensure_future(cleanup_job())
return self.ok()
async def set_timestamp(self, req: http.Request):
cam, time_type, time = self._getset_timestamp_params(req, need_time=True)
db.set_timestamp(cam, time_type, time)
return self.ok()
async def get_timestamp(self, req: http.Request):
cam, time_type = self._getset_timestamp_params(req)
return self.ok(db.get_timestamp(cam, time_type))
async def get_all_timestamps(self, req: http.Request):
return self.ok(db.get_all_timestamps())
# async def get_motion_params(self, req: http.Request):
# data = config['motion_params'][int(req.match_info['name'])]
# lines = [
# f'threshold={data["threshold"]}',
# f'min_event_length=3s',
# f'frame_skip=2',
# f'downscale_factor=3',
# ]
# return self.plain('\n'.join(lines)+'\n')
#
# async def get_motion_roi_params(self, req: http.Request):
# data = config['motion_params'][int(req.match_info['name'])]
# return self.plain('\n'.join(data['roi'])+'\n')
@staticmethod
def _getset_timestamp_params(req: http.Request, need_time=False):
values = []
cam = int(req.match_info['name'])
assert cam in ipcam_config.get_all_cam_names_for_this_server(), 'invalid camera'
values.append(cam)
values.append(TimeFilterType(req.match_info['type']))
if need_time:
time = req.query['time']
if time.startswith('record_'):
time = datetime_from_filename(time)
elif time.isnumeric():
time = int(time)
else:
raise ValueError('invalid time')
values.append(time)
return values
# other global stuff
# ------------------
def open_database(database_path: str):
global db
db = IpcamServerDatabase(database_path)
# update cams list in database, if needed
stored_cams = db.get_all_timestamps().keys()
for cam in ipcam_config.get_all_cam_names_for_this_server():
if cam not in stored_cams:
db.add_camera(cam)
def get_recordings_files(cam: Optional[int] = None,
time_filter_type: Optional[TimeFilterType] = None,
limit=0) -> List[dict]:
from_time = 0
to_time = int(time.time())
cams = [cam] if cam is not None else ipcam_config.get_all_cam_names_for_this_server()
files = []
for cam in cams:
if time_filter_type:
from_time = db.get_timestamp(cam, time_filter_type)
if time_filter_type in (TimeFilterType.MOTION, TimeFilterType.MOTION_START):
to_time = db.get_timestamp(cam, TimeFilterType.FIX)
from_time = datetime.fromtimestamp(from_time)
to_time = datetime.fromtimestamp(to_time)
recdir = get_recordings_path(cam)
cam_files = [{
'cam': cam,
'name': file,
'size': os.path.getsize(os.path.join(recdir, file))}
for file in os.listdir(recdir)
if is_valid_recording_name(file) and from_time < datetime_from_filename(file) <= to_time]
cam_files.sort(key=lambda file: file['name'])
if cam_files:
last = cam_files[len(cam_files)-1]
fullpath = os.path.join(recdir, last['name'])
if camutil.has_handle(fullpath):
logger.debug(f'get_recordings_files: file {fullpath} has opened handle, ignoring it')
cam_files.pop()
files.extend(cam_files)
if limit > 0:
files = files[:limit]
return files
async def process_fragments(camera: int,
filename: str,
fragments: List[Tuple[int, int]]) -> None:
time = datetime_from_filename(filename)
rec_dir = get_recordings_path(camera)
motion_dir = get_motion_path(camera)
if not os.path.exists(motion_dir):
os.mkdir(motion_dir)
for fragment in fragments:
start, end = fragment
start -= ipcam_config['motion_padding']
end += ipcam_config['motion_padding']
if start < 0:
start = 0
duration = end - start
dt1 = (time + timedelta(seconds=start)).strftime(datetime_format)
dt2 = (time + timedelta(seconds=end)).strftime(datetime_format)
await camutil.ffmpeg_cut(input=os.path.join(rec_dir, filename),
output=os.path.join(motion_dir, f'{dt1}__{dt2}.mp4'),
start_pos=start,
duration=duration)
if fragments and ipcam_config['motion_telegram']:
asyncio.ensure_future(motion_notify_tg(camera, filename, fragments))
async def motion_notify_tg(camera: int,
filename: str,
fragments: List[Tuple[int, int]]):
dt_file = datetime_from_filename(filename)
fmt = '%H:%M:%S'
text = f'Camera: {camera}\n'
text += f'Original file: {filename} '
text += _tg_links(TelegramLinkType.ORIGINAL_FILE, camera, filename)
for start, end in fragments:
start -= ipcam_config['motion_padding']
end += ipcam_config['motion_padding']
if start < 0:
start = 0
duration = end - start
if duration < 0:
duration = 0
dt1 = dt_file + timedelta(seconds=start)
dt2 = dt_file + timedelta(seconds=end)
text += f'\nFragment: {duration}s, {dt1.strftime(fmt)}-{dt2.strftime(fmt)} '
text += _tg_links(TelegramLinkType.FRAGMENT, camera, f'{dt1.strftime(datetime_format)}__{dt2.strftime(datetime_format)}.mp4')
await telegram.send_message(text)
def _tg_links(link_type: TelegramLinkType,
camera: int,
file: str) -> str:
links = []
for link_name, link_template in ipcam_config[f'{link_type.value}_url_templates']:
link = link_template.replace('{camera}', str(camera)).replace('{file}', file)
links.append(f'{link_name}')
return ' '.join(links)
async def fix_job() -> None:
global fix_job_running
logger.debug('fix_job: starting')
if fix_job_running:
logger.error('fix_job: already running')
return
try:
fix_job_running = True
for cam in ipcam_config.get_all_cam_names_for_this_server():
files = get_recordings_files(cam, TimeFilterType.FIX)
if not files:
logger.debug(f'fix_job: no files for camera {cam}')
continue
logger.debug(f'fix_job: got %d files for camera {cam}' % (len(files),))
for file in files:
fullpath = os.path.join(get_recordings_path(cam), file['name'])
await camutil.ffmpeg_recreate(fullpath)
timestamp = datetime_from_filename(file['name'])
if timestamp:
db.set_timestamp(cam, TimeFilterType.FIX, timestamp)
finally:
fix_job_running = False
async def cleanup_job() -> None:
def compare(i1: str, i2: str) -> int:
dt1 = datetime_from_filename(i1)
dt2 = datetime_from_filename(i2)
if dt1 < dt2:
return -1
elif dt1 > dt2:
return 1
else:
return 0
global cleanup_job_running
logger.debug('cleanup_job: starting')
if cleanup_job_running:
logger.error('cleanup_job: already running')
return
try:
cleanup_job_running = True
gb = float(1 << 30)
disk_number = 0
for storage in lbc_config.get_board_disks(gethostname()):
disk_number += 1
if os.path.exists(storage['mountpoint']):
total, used, free = shutil.disk_usage(storage['mountpoint'])
free_gb = free // gb
if free_gb < ipcam_config['cleanup_min_gb']:
cleaned = 0
files = []
for cam in ipcam_config.get_all_cam_names_for_this_server(filter_by_disk=disk_number):
for _dir in (get_recordings_path(cam), get_motion_path(cam)):
files += list(map(lambda file: os.path.join(_dir, file), os.listdir(_dir)))
files = list(filter(lambda path: os.path.isfile(path) and path.endswith(tuple([f'.{t.value}' for t in VideoContainerType])), files))
files.sort(key=cmp_to_key(compare))
for file in files:
size = os.stat(file).st_size
try:
os.unlink(file)
cleaned += size
except OSError as e:
logger.exception(e)
if (free + cleaned) // gb >= ipcam_config['cleanup_min_gb']:
break
else:
logger.error(f"cleanup_job: {storage['mountpoint']} not found")
finally:
cleanup_job_running = False
fix_job_running = False
cleanup_job_running = False
datetime_format = '%Y-%m-%d-%H.%M.%S'
datetime_format_re = r'\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}.\d{2}'
db: Optional[IpcamServerDatabase] = None
server: Optional[IpcamWebServer] = None
logger = logging.getLogger(__name__)
# start of the program
# --------------------
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('--listen', type=str, required=True)
parser.add_argument('--database-path', type=str, required=True)
arg = homekit_config.load_app(no_config=True, parser=parser)
open_database(arg.database_path)
loop = asyncio.get_event_loop()
try:
scheduler = AsyncIOScheduler(event_loop=loop)
if ipcam_config['fix_enabled']:
scheduler.add_job(fix_job, 'interval',
seconds=ipcam_config['fix_interval'],
misfire_grace_time=None)
scheduler.add_job(cleanup_job, 'interval',
seconds=ipcam_config['cleanup_interval'],
misfire_grace_time=None)
scheduler.start()
except KeyError:
pass
asyncio.ensure_future(fix_job())
asyncio.ensure_future(cleanup_job())
server = IpcamWebServer(Addr.fromstring(arg.listen))
server.run()