summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2022-06-08 22:01:22 +0300
committerEvgeny Zinoviev <me@ch1p.io>2022-06-11 19:05:51 +0300
commiteb502ab9c94cc8a89a29f9310e2f56404b432053 (patch)
tree70ec81d40a9fc406960d85773436d3b33a014c7a /src
parent1ed87f69878b85daf94cde4c7b187939d9e15778 (diff)
ipcam: rewrite motion detection system
Diffstat (limited to 'src')
-rw-r--r--src/home/bot/store.py78
-rw-r--r--src/home/camera/util.py100
-rw-r--r--src/home/config/config.py52
-rw-r--r--src/home/database/sqlite.py62
-rw-r--r--src/home/http/__init__.py2
-rw-r--r--src/home/http/http.py92
-rw-r--r--src/home/soundsensor/server.py49
-rw-r--r--src/home/util.py3
-rwxr-xr-xsrc/inverter_bot.py2
-rwxr-xr-xsrc/ipcam_server.py369
-rwxr-xr-xsrc/sound_node.py70
-rwxr-xr-xsrc/test/test_inverter_monitor.py4
12 files changed, 720 insertions, 163 deletions
diff --git a/src/home/bot/store.py b/src/home/bot/store.py
index aeedc47..e655d8f 100644
--- a/src/home/bot/store.py
+++ b/src/home/bot/store.py
@@ -1,80 +1,32 @@
-import sqlite3
-import os.path
-import logging
+from ..database.sqlite import SQLiteBase
-from ..config import config
-
-logger = logging.getLogger(__name__)
-
-
-def _get_database_path() -> str:
- return os.path.join(os.environ['HOME'], '.config', config.app_name, 'bot.db')
-
-
-class Store:
- SCHEMA_VERSION = 1
+class Store(SQLiteBase):
def __init__(self):
- self.sqlite = sqlite3.connect(_get_database_path(), check_same_thread=False)
-
- sqlite_version = self._get_sqlite_version()
- logger.info(f'SQLite version: {sqlite_version}')
-
- schema_version = self._get_schema_version()
- logger.info(f'Schema version: {schema_version}')
-
- if schema_version < 1:
- self._database_init()
- elif schema_version < Store.SCHEMA_VERSION:
- self._database_upgrade(Store.SCHEMA_VERSION)
-
- def __del__(self):
- if self.sqlite:
- self.sqlite.commit()
- self.sqlite.close()
-
- def _get_sqlite_version(self) -> str:
- cursor = self.sqlite.cursor()
- cursor.execute("SELECT sqlite_version()")
-
- return cursor.fetchone()[0]
-
- def _get_schema_version(self) -> int:
- cursor = self.sqlite.execute('PRAGMA user_version')
- return int(cursor.fetchone()[0])
-
- def _set_schema_version(self, v) -> None:
- self.sqlite.execute('PRAGMA user_version={:d}'.format(v))
- logger.info(f'Schema set to {v}')
-
- def _database_init(self) -> None:
- cursor = self.sqlite.cursor()
- cursor.execute("""CREATE TABLE IF NOT EXISTS users (
- id INTEGER PRIMARY KEY,
- lang TEXT NOT NULL
- )""")
- self.sqlite.commit()
- self._set_schema_version(1)
-
- def _database_upgrade(self, version: int) -> None:
- # do the upgrade here
+ super().__init__()
- # self.sqlite.commit()
- self._set_schema_version(version)
+ def schema_init(self, version: int) -> None:
+ if version < 1:
+ cursor = self.cursor()
+ cursor.execute("""CREATE TABLE IF NOT EXISTS users (
+ id INTEGER PRIMARY KEY,
+ lang TEXT NOT NULL
+ )""")
+ self.commit()
def get_user_lang(self, user_id: int, default: str = 'en') -> str:
- cursor = self.sqlite.cursor()
+ cursor = self.cursor()
cursor.execute('SELECT lang FROM users WHERE id=?', (user_id,))
row = cursor.fetchone()
if row is None:
cursor.execute('INSERT INTO users (id, lang) VALUES (?, ?)', (user_id, default))
- self.sqlite.commit()
+ self.commit()
return default
else:
return row[0]
def set_user_lang(self, user_id: int, lang: str) -> None:
- cursor = self.sqlite.cursor()
+ cursor = self.cursor()
cursor.execute('UPDATE users SET lang=? WHERE id=?', (lang, user_id))
- self.sqlite.commit() \ No newline at end of file
+ self.commit()
diff --git a/src/home/camera/util.py b/src/home/camera/util.py
new file mode 100644
index 0000000..5f18a1f
--- /dev/null
+++ b/src/home/camera/util.py
@@ -0,0 +1,100 @@
+import asyncio
+import os.path
+import logging
+import psutil
+
+from ..util import chunks
+from ..config import config
+
+_logger = logging.getLogger(__name__)
+_temporary_fixing = '.temporary_fixing.mp4'
+
+
+def _get_ffmpeg_path() -> str:
+ return 'ffmpeg' if 'ffmpeg' not in config else config['ffmpeg']['path']
+
+
+def time2seconds(time: str) -> int:
+ time, frac = time.split('.')
+ frac = int(frac)
+
+ h, m, s = [int(i) for i in time.split(':')]
+
+ return round(s + m*60 + h*3600 + frac/1000)
+
+
+async def ffmpeg_recreate(filename: str):
+ filedir = os.path.dirname(filename)
+ tempname = os.path.join(filedir, _temporary_fixing)
+ mtime = os.path.getmtime(filename)
+
+ args = [_get_ffmpeg_path(), '-nostats', '-loglevel', 'error', '-i', filename, '-c', 'copy', '-y', tempname]
+ proc = await asyncio.create_subprocess_exec(*args,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE)
+ stdout, stderr = await proc.communicate()
+ if proc.returncode != 0:
+ _logger.error(f'fix_timestamps({filename}): ffmpeg returned {proc.returncode}, stderr: {stderr.decode().strip()}')
+
+ if os.path.isfile(tempname):
+ os.unlink(filename)
+ os.rename(tempname, filename)
+ os.utime(filename, (mtime, mtime))
+ _logger.info(f'fix_timestamps({filename}): OK')
+ else:
+ _logger.error(f'fix_timestamps({filename}): temp file \'{tempname}\' does not exists, fix failed')
+
+
+async def ffmpeg_cut(input: str,
+ output: str,
+ start_pos: int,
+ duration: int):
+ args = [_get_ffmpeg_path(), '-nostats', '-loglevel', 'error', '-i', input,
+ '-ss', str(start_pos), '-t', str(duration),
+ '-c', 'copy', '-y', output]
+ proc = await asyncio.create_subprocess_exec(*args,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE)
+ stdout, stderr = await proc.communicate()
+ if proc.returncode != 0:
+ _logger.error(f'ffmpeg_cut({input}, start_pos={start_pos}, duration={duration}): ffmpeg returned {proc.returncode}, stderr: {stderr.decode().strip()}')
+ else:
+ _logger.info(f'ffmpeg_cut({input}): OK')
+
+
+def dvr_scan_timecodes(timecodes: str) -> list[tuple[int, int]]:
+ timecodes = timecodes.split(',')
+ if len(timecodes) % 2 != 0:
+ raise ValueError('invalid number of timecodes')
+
+ timecodes = list(map(time2seconds, timecodes))
+ timecodes = list(chunks(timecodes, 2))
+
+ # sort out invalid fragments (dvr-scan returns them sometimes, idk why...)
+ timecodes = list(filter(lambda f: f[0] < f[1], timecodes))
+ if not timecodes:
+ raise ValueError('no valid timecodes')
+
+ # https://stackoverflow.com/a/43600953
+ timecodes.sort(key=lambda interval: interval[0])
+ merged = [timecodes[0]]
+ for current in timecodes:
+ previous = merged[-1]
+ if current[0] <= previous[1]:
+ previous[1] = max(previous[1], current[1])
+ else:
+ merged.append(current)
+
+ return merged
+
+
+def has_handle(fpath):
+ for proc in psutil.process_iter():
+ try:
+ for item in proc.open_files():
+ if fpath == item.path:
+ return True
+ except Exception:
+ pass
+
+ return False \ No newline at end of file
diff --git a/src/home/config/config.py b/src/home/config/config.py
index af1c772..8b50609 100644
--- a/src/home/config/config.py
+++ b/src/home/config/config.py
@@ -120,6 +120,7 @@ def setup_logging(verbose=False, log_file=None, default_fmt=False):
logging_level = logging.INFO
if is_development_mode() or verbose:
logging_level = logging.DEBUG
+ _add_logging_level('TRACE', logging.DEBUG-5)
log_config = {'level': logging_level}
if not default_fmt:
@@ -130,3 +131,54 @@ def setup_logging(verbose=False, log_file=None, default_fmt=False):
log_config['encoding'] = 'utf-8'
logging.basicConfig(**log_config)
+
+
+# https://stackoverflow.com/questions/2183233/how-to-add-a-custom-loglevel-to-pythons-logging-facility/35804945#35804945
+def _add_logging_level(levelName, levelNum, methodName=None):
+ """
+ Comprehensively adds a new logging level to the `logging` module and the
+ currently configured logging class.
+
+ `levelName` becomes an attribute of the `logging` module with the value
+ `levelNum`. `methodName` becomes a convenience method for both `logging`
+ itself and the class returned by `logging.getLoggerClass()` (usually just
+ `logging.Logger`). If `methodName` is not specified, `levelName.lower()` is
+ used.
+
+ To avoid accidental clobberings of existing attributes, this method will
+ raise an `AttributeError` if the level name is already an attribute of the
+ `logging` module or if the method name is already present
+
+ Example
+ -------
+ >>> addLoggingLevel('TRACE', logging.DEBUG - 5)
+ >>> logging.getLogger(__name__).setLevel("TRACE")
+ >>> logging.getLogger(__name__).trace('that worked')
+ >>> logging.trace('so did this')
+ >>> logging.TRACE
+ 5
+
+ """
+ if not methodName:
+ methodName = levelName.lower()
+
+ if hasattr(logging, levelName):
+ raise AttributeError('{} already defined in logging module'.format(levelName))
+ if hasattr(logging, methodName):
+ raise AttributeError('{} already defined in logging module'.format(methodName))
+ if hasattr(logging.getLoggerClass(), methodName):
+ raise AttributeError('{} already defined in logger class'.format(methodName))
+
+ # This method was inspired by the answers to Stack Overflow post
+ # http://stackoverflow.com/q/2183233/2988730, especially
+ # http://stackoverflow.com/a/13638084/2988730
+ def logForLevel(self, message, *args, **kwargs):
+ if self.isEnabledFor(levelNum):
+ self._log(levelNum, message, args, **kwargs)
+ def logToRoot(message, *args, **kwargs):
+ logging.log(levelNum, message, *args, **kwargs)
+
+ logging.addLevelName(levelNum, levelName)
+ setattr(logging, levelName, levelNum)
+ setattr(logging.getLoggerClass(), methodName, logForLevel)
+ setattr(logging, methodName, logToRoot) \ No newline at end of file
diff --git a/src/home/database/sqlite.py b/src/home/database/sqlite.py
new file mode 100644
index 0000000..8f1763e
--- /dev/null
+++ b/src/home/database/sqlite.py
@@ -0,0 +1,62 @@
+import sqlite3
+import os.path
+import logging
+
+from ..config import config, is_development_mode
+
+
+def _get_database_path(name) -> str:
+ return os.path.join(os.environ['HOME'], '.config', name, 'bot.db')
+
+
+class SQLiteBase:
+ SCHEMA = 1
+
+ def __init__(self, name=None, check_same_thread=False):
+ if not name:
+ name = config.app_name
+
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self.sqlite = sqlite3.connect(_get_database_path(name),
+ check_same_thread=check_same_thread)
+
+ if is_development_mode():
+ self.sql_logger = logging.getLogger(self.__class__.__name__)
+ self.sql_logger.setLevel('TRACE')
+ self.sqlite.set_trace_callback(self.sql_logger.trace)
+
+ sqlite_version = self._get_sqlite_version()
+ self.logger.debug(f'SQLite version: {sqlite_version}')
+
+ schema_version = self.schema_get_version()
+ self.logger.debug(f'Schema version: {schema_version}')
+
+ self.schema_init(schema_version)
+ self.schema_set_version(self.SCHEMA)
+
+ def __del__(self):
+ if self.sqlite:
+ self.sqlite.commit()
+ self.sqlite.close()
+
+ def _get_sqlite_version(self) -> str:
+ cursor = self.sqlite.cursor()
+ cursor.execute("SELECT sqlite_version()")
+ return cursor.fetchone()[0]
+
+ def schema_get_version(self) -> int:
+ cursor = self.sqlite.execute('PRAGMA user_version')
+ return int(cursor.fetchone()[0])
+
+ def schema_set_version(self, v) -> None:
+ self.sqlite.execute('PRAGMA user_version={:d}'.format(v))
+ self.logger.info(f'Schema set to {v}')
+
+ def cursor(self) -> sqlite3.Cursor:
+ return self.sqlite.cursor()
+
+ def commit(self) -> None:
+ return self.sqlite.commit()
+
+ def schema_init(self, version: int) -> None:
+ raise ValueError(f'{self.__class__.__name__}: must override schema_init')
diff --git a/src/home/http/__init__.py b/src/home/http/__init__.py
new file mode 100644
index 0000000..2597457
--- /dev/null
+++ b/src/home/http/__init__.py
@@ -0,0 +1,2 @@
+from .http import serve, ok, routes, HTTPServer
+from aiohttp.web import FileResponse, Request
diff --git a/src/home/http/http.py b/src/home/http/http.py
new file mode 100644
index 0000000..acacb51
--- /dev/null
+++ b/src/home/http/http.py
@@ -0,0 +1,92 @@
+import logging
+import asyncio
+
+from aiohttp import web
+from aiohttp.web_exceptions import HTTPNotFound
+
+from ..util import stringify, format_tb, Addr
+
+
+@web.middleware
+async def errors_handler_middleware(request, handler):
+ try:
+ response = await handler(request)
+ return response
+
+ except HTTPNotFound:
+ return web.json_response({'error': 'not found'}, status=404)
+
+ except Exception as exc:
+ data = {
+ 'error': exc.__class__.__name__,
+ 'message': exc.message if hasattr(exc, 'message') else str(exc)
+ }
+ tb = format_tb(exc)
+ if tb:
+ data['stacktrace'] = tb
+
+ return web.json_response(data, status=500)
+
+
+def serve(addr: Addr, route_table: web.RouteTableDef, handle_signals: bool = True):
+ app = web.Application()
+ app.add_routes(route_table)
+ app.middlewares.append(errors_handler_middleware)
+
+ host, port = addr
+
+ web.run_app(app,
+ host=host,
+ port=port,
+ handle_signals=handle_signals)
+
+
+def routes() -> web.RouteTableDef:
+ return web.RouteTableDef()
+
+
+def ok(data=None):
+ if data is None:
+ data = 1
+ response = {'response': data}
+ return web.json_response(response, dumps=stringify)
+
+
+class HTTPServer:
+ def __init__(self, addr: Addr, handle_errors=True):
+ self.addr = addr
+ self.app = web.Application()
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ if handle_errors:
+ self.app.middlewares.append(errors_handler_middleware)
+
+ def _add_route(self,
+ method: str,
+ path: str,
+ handler: callable):
+ self.app.router.add_routes([getattr(web, method)(path, handler)])
+
+ def get(self, path, handler):
+ self._add_route('get', path, handler)
+
+ def post(self, path, handler):
+ self._add_route('post', path, handler)
+
+ def run(self, event_loop=None, handle_signals=True):
+ if not event_loop:
+ event_loop = asyncio.get_event_loop()
+
+ runner = web.AppRunner(self.app, handle_signals=handle_signals)
+ event_loop.run_until_complete(runner.setup())
+
+ host, port = self.addr
+ site = web.TCPSite(runner, host=host, port=port)
+ event_loop.run_until_complete(site.start())
+
+ self.logger.info(f'Server started at http://{host}:{port}')
+
+ event_loop.run_forever()
+
+ def ok(self, data=None):
+ return ok(data)
diff --git a/src/home/soundsensor/server.py b/src/home/soundsensor/server.py
index 490fc36..0a53ae6 100644
--- a/src/home/soundsensor/server.py
+++ b/src/home/soundsensor/server.py
@@ -4,13 +4,10 @@ import logging
import threading
from ..config import config
-from aiohttp import web
-from aiohttp.web_exceptions import (
- HTTPNotFound
-)
+from .. import http
from typing import Type
-from ..util import Addr, stringify, format_tb
+from ..util import Addr
logger = logging.getLogger(__name__)
@@ -74,52 +71,22 @@ class SoundSensorServer:
loop.run_forever()
def run_guard_server(self):
- routes = web.RouteTableDef()
-
- def ok(data=None):
- if data is None:
- data = 1
- response = {'response': data}
- return web.json_response(response, dumps=stringify)
-
- @web.middleware
- async def errors_handler_middleware(request, handler):
- try:
- response = await handler(request)
- return response
- except HTTPNotFound:
- return web.json_response({'error': 'not found'}, status=404)
- except Exception as exc:
- data = {
- 'error': exc.__class__.__name__,
- 'message': exc.message if hasattr(exc, 'message') else str(exc)
- }
- tb = format_tb(exc)
- if tb:
- data['stacktrace'] = tb
-
- return web.json_response(data, status=500)
+ routes = http.routes()
@routes.post('/guard/enable')
async def guard_enable(request):
self.set_recording(True)
- return ok()
+ return http.ok()
@routes.post('/guard/disable')
async def guard_disable(request):
self.set_recording(False)
- return ok()
+ return http.ok()
@routes.get('/guard/status')
async def guard_status(request):
- return ok({'enabled': self.is_recording_enabled()})
+ return http.ok({'enabled': self.is_recording_enabled()})
asyncio.set_event_loop(asyncio.new_event_loop()) # need to create new event loop in new thread
- app = web.Application()
- app.add_routes(routes)
- app.middlewares.append(errors_handler_middleware)
-
- web.run_app(app,
- host=self.addr[0],
- port=self.addr[1],
- handle_signals=False) # handle_signals=True doesn't work in separate thread
+ http.serve(self.addr, routes, handle_signals=False) # handle_signals=True doesn't work in separate thread
+
diff --git a/src/home/util.py b/src/home/util.py
index 2c43cb0..a6ac906 100644
--- a/src/home/util.py
+++ b/src/home/util.py
@@ -8,6 +8,7 @@ import logging
import string
import random
+from enum import Enum
from .config import config
from datetime import datetime
from typing import Tuple, Optional
@@ -28,6 +29,8 @@ def json_serial(obj):
"""JSON serializer for datetime objects"""
if isinstance(obj, datetime):
return obj.timestamp()
+ if isinstance(obj, Enum):
+ return obj.value
raise TypeError("Type %s not serializable" % type(obj))
diff --git a/src/inverter_bot.py b/src/inverter_bot.py
index 5ad5e33..5b3fe45 100755
--- a/src/inverter_bot.py
+++ b/src/inverter_bot.py
@@ -452,7 +452,7 @@ class InverterBot(Wrapper):
if __name__ == '__main__':
config.load('inverter_bot')
- inverter.init(host=config['inverter']['ip'], port=config['inverter']['port'])
+ inverter.schema_init(host=config['inverter']['ip'], port=config['inverter']['port'])
monitor = InverterMonitor()
monitor.set_charging_event_handler(monitor_charging)
diff --git a/src/ipcam_server.py b/src/ipcam_server.py
new file mode 100755
index 0000000..d212be2
--- /dev/null
+++ b/src/ipcam_server.py
@@ -0,0 +1,369 @@
+#!/usr/bin/env python3
+import logging
+import os
+import asyncio
+import time
+
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+
+from home.config import config
+from home.util import parse_addr
+from home import http
+from home.database.sqlite import SQLiteBase
+from home.camera import util as camutil
+
+from enum import Enum
+from typing import Optional, Union
+from datetime import datetime, timedelta
+
+
+class TimeFilterType(Enum):
+ FIX = 'fix'
+ MOTION = 'motion'
+
+
+def valid_recording_name(filename: str) -> bool:
+ return filename.startswith('record_') and filename.endswith('.mp4')
+
+
+def filename_to_datetime(filename: str) -> datetime:
+ filename = os.path.basename(filename).replace('record_', '').replace('.mp4', '')
+ return datetime.strptime(filename, datetime_format)
+
+
+# ipcam database
+# --------------
+
+class IPCamServerDatabase(SQLiteBase):
+ SCHEMA = 3
+
+ def __init__(self):
+ super().__init__()
+
+ def schema_init(self, version: int) -> None:
+ cursor = self.cursor()
+
+ if version < 1:
+ # timestamps
+ cursor.execute("""CREATE TABLE IF NOT EXISTS timestamps (
+ camera INTEGER PRIMARY KEY,
+ fix_time INTEGER NOT NULL,
+ motion_time INTEGER NOT NULL
+ )""")
+ for cam in config['camera'].keys():
+ self.add_camera(cam)
+
+ if version < 2:
+ # motion_failures
+ cursor.execute("""CREATE TABLE IF NOT EXISTS motion_failures (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ camera INTEGER NOT NULL,
+ filename TEXT NOT NULL
+ )""")
+
+ if version < 3:
+ cursor.execute("ALTER TABLE motion_failures ADD COLUMN message TEXT NOT NULL DEFAULT ''")
+
+ self.commit()
+
+ def add_camera(self, camera: int):
+ self.cursor().execute("INSERT INTO timestamps (camera, fix_time, motion_time) VALUES (?, ?, ?)",
+ (camera, 0, 0))
+ self.commit()
+
+ def add_motion_failure(self,
+ camera: int,
+ filename: str,
+ message: Optional[str]):
+ self.cursor().execute("INSERT INTO motion_failures (camera, filename, message) VALUES (?, ?, ?)",
+ (camera, filename, message or ''))
+ self.commit()
+
+ def get_all_timestamps(self):
+ cur = self.cursor()
+ data = {}
+
+ cur.execute("SELECT camera, fix_time, motion_time FROM timestamps")
+ for cam, fix_time, motion_time in cur.fetchall():
+ data[int(cam)] = {
+ 'fix': int(fix_time),
+ 'motion': int(motion_time)
+ }
+
+ return data
+
+ def set_timestamp(self,
+ camera: int,
+ time_type: TimeFilterType,
+ time: Union[int, datetime]):
+ cur = self.cursor()
+ if isinstance(time, datetime):
+ time = int(time.timestamp())
+ cur.execute(f"UPDATE timestamps SET {time_type.value}_time=? WHERE camera=?", (time, camera))
+ self.commit()
+
+ def get_timestamp(self,
+ camera: int,
+ time_type: TimeFilterType) -> int:
+ cur = self.cursor()
+ cur.execute(f"SELECT {time_type.value}_time FROM timestamps WHERE camera=?", (camera,))
+ return int(cur.fetchone()[0])
+
+
+# ipcam web api
+# -------------
+
+class IPCamWebServer(http.HTTPServer):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self.get('/api/recordings/{name}', self.get_camera_recordings)
+ self.get('/api/recordings/{name}/download/{file}', self.download_recording)
+ self.get('/api/camera/list', self.camlist)
+ self.get('/api/timestamp/{name}/{type}', self.get_timestamp)
+ self.get('/api/timestamp/all', self.get_all_timestamps)
+
+ self.post('/api/debug/migrate-mtimes', self.debug_migrate_mtimes)
+ self.post('/api/debug/fix', self.debug_fix)
+ self.post('/api/timestamp/{name}/{type}', self.set_timestamp)
+
+ self.post('/api/motion/done/{name}', self.submit_motion)
+ self.post('/api/motion/fail/{name}', self.submit_motion_failure)
+
+ async def get_camera_recordings(self, req):
+ cam = int(req.match_info['name'])
+ try:
+ filter = TimeFilterType(req.query['filter'])
+ except KeyError:
+ filter = None
+
+ files = get_recordings_files(cam, filter)
+
+ return self.ok({'files': files})
+
+ async def download_recording(self, req: http.Request):
+ cam = int(req.match_info['name'])
+ file = req.match_info['file']
+
+ fullpath = os.path.join(config['camera'][cam]['recordings_path'], file)
+ if not os.path.isfile(fullpath):
+ raise ValueError(f'file "{fullpath}" does not exists')
+
+ return http.FileResponse(fullpath)
+
+ async def camlist(self, req: http.Request):
+ return self.ok(config['camera'])
+
+ async def submit_motion(self, req: http.Request):
+ data = await req.post()
+
+ camera = int(req.match_info['name'])
+ timecodes = data['timecodes']
+ filename = data['filename']
+
+ time = filename_to_datetime(filename)
+
+ try:
+ if timecodes != '':
+ fragments = camutil.dvr_scan_timecodes(timecodes)
+ asyncio.ensure_future(process_fragments(camera, filename, fragments))
+
+ db.set_timestamp(camera, TimeFilterType.MOTION, time)
+ return self.ok()
+
+ except ValueError as e:
+ db.set_timestamp(camera, TimeFilterType.MOTION, time)
+ raise e
+
+ async def submit_motion_failure(self, req: http.Request):
+ camera = int(req.match_info['name'])
+
+ data = await req.post()
+ filename = data['filename']
+ message = data['message']
+
+ db.add_motion_failure(camera, filename, message)
+ db.set_timestamp(camera, TimeFilterType.MOTION, filename_to_datetime(filename))
+
+ return self.ok()
+
+ async def debug_migrate_mtimes(self, req: http.Request):
+ written = {}
+ for cam in config['camera'].keys():
+ confdir = os.path.join(os.getenv('HOME'), '.config', f'video-util-{cam}')
+ for time_type in TimeFilterType:
+ txt_file = os.path.join(confdir, f'{time_type.value}_mtime')
+ if os.path.isfile(txt_file):
+ with open(txt_file, 'r') as fd:
+ data = fd.read()
+ db.set_timestamp(cam, time_type, int(data.strip()))
+
+ if cam not in written:
+ written[cam] = []
+ written[cam].append(time_type)
+
+ return self.ok({'written': written})
+
+ async def debug_fix(self, req: http.Request):
+ asyncio.ensure_future(fix_job())
+ return self.ok()
+
+ async def set_timestamp(self, req: http.Request):
+ cam, time_type, time = self._getset_timestamp_params(req, need_time=True)
+ db.set_timestamp(cam, time_type, time)
+ return self.ok()
+
+ async def get_timestamp(self, req: http.Request):
+ cam, time_type = self._getset_timestamp_params(req)
+ return self.ok(db.get_timestamp(cam, time_type))
+
+ async def get_all_timestamps(self, req: http.Request):
+ return self.ok(db.get_all_timestamps())
+
+ @staticmethod
+ def _getset_timestamp_params(req: http.Request, need_time=False):
+ values = []
+
+ cam = int(req.match_info['name'])
+ assert cam in config['camera'], 'invalid camera'
+
+ values.append(cam)
+ values.append(TimeFilterType(req.match_info['type']))
+
+ if need_time:
+ time = req.query['time']
+ if time.startswith('record_'):
+ time = filename_to_datetime(time)
+ elif time.isnumeric():
+ time = int(time)
+ else:
+ raise ValueError('invalid time')
+ values.append(time)
+
+ return values
+
+
+# other global stuff
+# ------------------
+
+def open_database():
+ global db
+ db = IPCamServerDatabase()
+
+ # update cams list in database, if needed
+ cams = db.get_all_timestamps().keys()
+ for cam in config['camera']:
+ if cam not in cams:
+ db.add_camera(cam)
+
+
+def get_recordings_path(cam: int) -> str:
+ return config['camera'][cam]['recordings_path']
+
+
+def get_motion_path(cam: int) -> str:
+ return config['camera'][cam]['motion_path']
+
+
+def get_recordings_files(cam: int,
+ time_filter_type: Optional[TimeFilterType] = None) -> list[dict]:
+ from_time = 0
+ to_time = int(time.time())
+
+ if time_filter_type:
+ from_time = db.get_timestamp(cam, time_filter_type)
+ if time_filter_type == TimeFilterType.MOTION:
+ to_time = db.get_timestamp(cam, TimeFilterType.FIX)
+
+ from_time = datetime.fromtimestamp(from_time)
+ to_time = datetime.fromtimestamp(to_time)
+
+ recdir = get_recordings_path(cam)
+ files = [{
+ 'name': file,
+ 'size': os.path.getsize(os.path.join(recdir, file))}
+ for file in os.listdir(recdir)
+ if valid_recording_name(file) and from_time < filename_to_datetime(file) <= to_time]
+ files.sort(key=lambda file: file['name'])
+
+ if files:
+ last = files[len(files)-1]
+ fullpath = os.path.join(recdir, last['name'])
+ if camutil.has_handle(fullpath):
+ logger.debug(f'get_recordings_files: file {fullpath} has opened handle, ignoring it')
+ files.pop()
+
+ return files
+
+
+async def process_fragments(camera: int,
+ filename: str,
+ fragments: list[tuple[int, int]]) -> None:
+ time = filename_to_datetime(filename)
+
+ rec_dir = get_recordings_path(camera)
+ motion_dir = get_motion_path(camera)
+ if not os.path.exists(motion_dir):
+ os.mkdir(motion_dir)
+
+ for fragment in fragments:
+ start, end = fragment
+
+ start -= config['motion']['padding']
+ end += config['motion']['padding']
+
+ if start < 0:
+ start = 0
+
+ duration = end - start
+
+ dt1 = (time + timedelta(seconds=start)).strftime(datetime_format)
+ dt2 = (time + timedelta(seconds=end)).strftime(datetime_format)
+
+ await camutil.ffmpeg_cut(input=os.path.join(rec_dir, filename),
+ output=os.path.join(motion_dir, f'{dt1}__{dt2}.mp4'),
+ start_pos=start,
+ duration=duration)
+
+
+async def fix_job() -> None:
+ logger.debug('fix_job: starting')
+
+ for cam in config['camera'].keys():
+ files = get_recordings_files(cam, TimeFilterType.FIX)
+ if not files:
+ logger.debug(f'fix_job: no files for camera {cam}')
+ continue
+
+ logger.debug(f'fix_job: got %d files for camera {cam}' % (len(files),))
+
+ for file in files:
+ fullpath = os.path.join(get_recordings_path(cam), file['name'])
+ await camutil.ffmpeg_recreate(fullpath)
+ timestamp = filename_to_datetime(file['name'])
+ if timestamp:
+ db.set_timestamp(cam, TimeFilterType.FIX, timestamp)
+
+
+datetime_format = '%Y-%m-%d-%H.%M.%S'
+db: Optional[IPCamServerDatabase] = None
+server: Optional[IPCamWebServer] = None
+logger = logging.getLogger(__name__)
+
+
+# start of the program
+# --------------------
+
+if __name__ == '__main__':
+ config.load('ipcam_server')
+
+ open_database()
+
+ loop = asyncio.get_event_loop()
+
+ scheduler = AsyncIOScheduler(event_loop=loop)
+ scheduler.add_job(fix_job, 'interval', seconds=config['fix_interval'])
+ scheduler.start()
+
+ server = IPCamWebServer(parse_addr(config['server']['listen']))
+ server.run()
diff --git a/src/sound_node.py b/src/sound_node.py
index 8ba1b50..a96a098 100755
--- a/src/sound_node.py
+++ b/src/sound_node.py
@@ -2,18 +2,16 @@
import os
from typing import Optional
-from aiohttp import web
-from aiohttp.web_exceptions import (
- HTTPNotFound
-)
+
from home.config import config
-from home.util import parse_addr, stringify, format_tb
+from home.util import parse_addr
from home.sound import (
amixer,
Recorder,
RecordStatus,
RecordStorage
)
+from home import http
"""
@@ -27,41 +25,10 @@ This script implements HTTP API for amixer and arecord.
# ---------------------
recorder: Optional[Recorder]
-routes = web.RouteTableDef()
+routes = http.routes()
storage: Optional[RecordStorage]
-# common http funcs & helpers
-# ---------------------------
-
-@web.middleware
-async def errors_handler_middleware(request, handler):
- try:
- response = await handler(request)
- return response
-
- except HTTPNotFound:
- return web.json_response({'error': 'not found'}, status=404)
-
- except Exception as exc:
- data = {
- 'error': exc.__class__.__name__,
- 'message': exc.message if hasattr(exc, 'message') else str(exc)
- }
- tb = format_tb(exc)
- if tb:
- data['stacktrace'] = tb
-
- return web.json_response(data, status=500)
-
-
-def ok(data=None):
- if data is None:
- data = 1
- response = {'response': data}
- return web.json_response(response, dumps=stringify)
-
-
# recording methods
# -----------------
@@ -73,14 +40,14 @@ async def do_record(request):
raise ValueError(f'invalid duration: max duration is {max}')
record_id = recorder.record(duration)
- return ok({'id': record_id})
+ return http.ok({'id': record_id})
@routes.get('/record/info/{id}/')
async def record_info(request):
record_id = int(request.match_info['id'])
info = recorder.get_info(record_id)
- return ok(info.as_dict())
+ return http.ok(info.as_dict())
@routes.get('/record/forget/{id}/')
@@ -91,7 +58,7 @@ async def record_forget(request):
assert info.status in (RecordStatus.FINISHED, RecordStatus.ERROR), f"can't forget: record status is {info.status}"
recorder.forget(record_id)
- return ok()
+ return http.ok()
@routes.get('/record/download/{id}/')
@@ -101,7 +68,7 @@ async def record_download(request):
info = recorder.get_info(record_id)
assert info.status == RecordStatus.FINISHED, f"record status is {info.status}"
- return web.FileResponse(info.file.path)
+ return http.FileResponse(info.file.path)
@routes.get('/storage/list/')
@@ -112,7 +79,7 @@ async def storage_list(request):
if extended:
files = list(map(lambda file: file.__dict__(), files))
- return ok({
+ return http.ok({
'files': files
})
@@ -125,7 +92,7 @@ async def storage_delete(request):
raise ValueError(f'file {file} not found')
storage.delete(file)
- return ok()
+ return http.ok()
@routes.get('/storage/download/')
@@ -135,7 +102,7 @@ async def storage_download(request):
if not file:
raise ValueError(f'file {file} not found')
- return web.FileResponse(file.path)
+ return http.FileResponse(file.path)
# ALSA mixer methods
@@ -144,7 +111,7 @@ async def storage_download(request):
def _amixer_control_response(control):
info = amixer.get(control)
caps = amixer.get_caps(control)
- return ok({
+ return http.ok({
'caps': caps,
'info': info
})
@@ -153,7 +120,7 @@ def _amixer_control_response(control):
@routes.get('/amixer/get-all/')
async def amixer_get_all(request):
controls_info = amixer.get_all()
- return ok(controls_info)
+ return http.ok(controls_info)
@routes.get('/amixer/get/{control}/')
@@ -213,13 +180,4 @@ if __name__ == '__main__':
recorder = Recorder(storage=storage)
recorder.start_thread()
- # start http server
- host, port = parse_addr(config['node']['listen'])
- app = web.Application()
- app.add_routes(routes)
- app.middlewares.append(errors_handler_middleware)
-
- web.run_app(app,
- host=host,
- port=port,
- handle_signals=True)
+ http.serve(parse_addr(config['node']['listen']), routes)
diff --git a/src/test/test_inverter_monitor.py b/src/test/test_inverter_monitor.py
index d9b63d3..4cccb77 100755
--- a/src/test/test_inverter_monitor.py
+++ b/src/test/test_inverter_monitor.py
@@ -347,8 +347,8 @@ def main():
charger.start()
# init inverterd wrapper
- inverter.init(host=config['inverter']['host'],
- port=config['inverter']['port'])
+ inverter.schema_init(host=config['inverter']['host'],
+ port=config['inverter']['port'])
# start monitor
mon = InverterMonitor()