1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
from ..config import ConfigUnit, LinuxBoardsConfig
from typing import Optional
from .types import CameraType, VideoContainerType, VideoCodecType
_lbc = LinuxBoardsConfig()
def _validate_roi_line(field, value, error) -> bool:
p = value.split(' ')
if len(p) != 4:
error(field, f'{field}: must contain four coordinates separated by space')
for n in p:
if not n.isnumeric():
error(field, f'{field}: invalid coordinates (not a number)')
return True
class IpcamConfig(ConfigUnit):
NAME = 'ipcam'
@classmethod
def schema(cls) -> Optional[dict]:
return {
'cams': {
'type': 'dict',
'keysrules': {'type': ['string', 'integer']},
'valuesrules': {
'type': 'dict',
'schema': {
'type': {'type': 'string', 'allowed': [t.value for t in CameraType], 'required': True},
'codec': {'type': 'string', 'allowed': [t.value for t in VideoCodecType], 'required': True},
'container': {'type': 'string', 'allowed': [t.value for t in VideoContainerType], 'required': True},
'server': {'type': 'string', 'allowed': list(_lbc.get().keys()), 'required': True},
'disk': {'type': 'integer', 'required': True},
'motion': {
'type': 'dict',
'schema': {
'threshold': {'type': ['float', 'integer']},
'roi': {
'type': 'list',
'schema': {'type': 'string', 'check_with': _validate_roi_line}
}
}
}
}
}
},
'motion_padding': {'type': 'integer', 'required': True},
'motion_telegram': {'type': 'boolean', 'required': True},
'fix_interval': {'type': 'integer', 'required': True},
'fix_enabled': {'type': 'boolean', 'required': True},
'cleanup_min_gb': {'type': 'integer', 'required': True},
'cleanup_interval': {'type': 'integer', 'required': True},
# TODO FIXME
'fragment_url_templates': cls._url_templates_schema(),
'original_file_url_templates': cls._url_templates_schema()
}
@staticmethod
def custom_validator(data):
for n, cam in data['cams'].items():
linux_box = _lbc[cam['server']]
if 'ext_hdd' not in linux_box:
raise ValueError(f'cam-{n}: linux box {cam["server"]} must have ext_hdd defined')
disk = cam['disk']-1
if disk < 0 or disk >= len(linux_box['ext_hdd']):
raise ValueError(f'cam-{n}: invalid disk index for linux box {cam["server"]}')
@classmethod
def _url_templates_schema(cls) -> dict:
return {
'type': 'list',
'empty': False,
'schema': {
'type': 'list',
'empty': False,
'schema': {'type': 'string'}
}
}
|