aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2024-02-17 03:51:08 +0300
committerEvgeny Zinoviev <me@ch1p.io>2024-02-17 03:51:08 +0300
commitc5e69cf2c9b89d546ad7a4f6bb26aef47021dd50 (patch)
tree61ea0185392909cf9d0198c51f439a2ee01e8089
parent0ce2e41a2bad790c5232fafb4b6ed631ca8cd957 (diff)
ipcam_ntp_util (wip: only supports hikvision cams for now)
-rwxr-xr-xbin/ipcam_ntp_util.py83
-rw-r--r--include/py/homekit/camera/config.py24
-rw-r--r--include/py/homekit/camera/types.py5
-rw-r--r--include/py/homekit/config/_configs.py13
-rw-r--r--include/py/homekit/config/config.py21
-rw-r--r--include/py/homekit/util.py2
6 files changed, 110 insertions, 38 deletions
diff --git a/bin/ipcam_ntp_util.py b/bin/ipcam_ntp_util.py
index 98639bd..0268a06 100755
--- a/bin/ipcam_ntp_util.py
+++ b/bin/ipcam_ntp_util.py
@@ -4,12 +4,22 @@ import requests
import hashlib
import xml.etree.ElementTree as ET
+from enum import Enum, auto
from time import time
+from typing import Optional
from argparse import ArgumentParser, ArgumentError
from homekit.util import validate_ipv4, validate_ipv4_or_hostname
from homekit.camera import IpcamConfig
+ipcam_config = IpcamConfig()
+
+
+class Action(Enum):
+ GET_NTP = auto()
+ SET_NTP = auto()
+
+
def xml_to_dict(xml_data: str) -> dict:
# Parse the XML data
root = ET.fromstring(xml_data)
@@ -131,11 +141,14 @@ class HikvisionISAPIClient:
data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={
- 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
+ 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
+ 'X-Requested-With': 'XMLHttpRequest',
})
self.isapi_check_put_response(r)
- def set_ntp_server(self, ntp_host: str, ntp_port: int = 123):
+ def set_ntp_server(self,
+ ntp_host: str,
+ ntp_port: int = 123):
format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
data = '<?xml version="1.0" encoding="UTF-8"?>'
@@ -145,7 +158,8 @@ class HikvisionISAPIClient:
data=data,
cookies=self.cookies,
headers={
- 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
+ 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
+ 'X-Requested-With': 'XMLHttpRequest',
})
self.isapi_check_put_response(r)
@@ -153,7 +167,12 @@ class HikvisionISAPIClient:
return f'http://{self.host}/ISAPI/{path}'
def isapi_check_put_response(self, r):
- r.raise_for_status()
+ try:
+ r.raise_for_status()
+ except requests.exceptions.HTTPError as e:
+ # print(r.text)
+ raise e
+
resp = xml_to_dict(r.text)['ResponseStatus']
status_code = int(resp['statusCode'][0])
@@ -163,36 +182,60 @@ class HikvisionISAPIClient:
raise ResponseError('response status looks bad')
+def process_hikvision_camera(host: str,
+ action: Action,
+ login: str,
+ password: str,
+ ntp_server: Optional[str] = None):
+ client = HikvisionISAPIClient(host)
+ try:
+ client.auth(login, password)
+ if action == Action.GET_NTP:
+ print(f'[{host}] {client.get_ntp_server()}')
+ return
+ client.set_ntp_server(ntp_server)
+ except AuthError as e:
+ print(f'[{host}] ({str(e)})')
+ except ResponseError as e:
+ print(f'[{host}] ({str(e)})')
+
+
def main():
parser = ArgumentParser()
- parser.add_argument('--host', type=str, required=True)
+ parser.add_argument('--host', type=str)
+ parser.add_argument('--all', action='store_true')
parser.add_argument('--get-ntp-server', action='store_true')
parser.add_argument('--set-ntp-server', type=str)
parser.add_argument('--username', type=str)
parser.add_argument('--password', type=str)
args = parser.parse_args()
+ if not args.host and not args.all:
+ raise ArgumentError(None, 'either --all or --host <IP> is required')
+
if not args.get_ntp_server and not args.set_ntp_server:
raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required')
- ipcam_config = IpcamConfig()
+ action = Action.GET_NTP if args.get_ntp_server else Action.SET_NTP
login = args.username if args.username else ipcam_config['web_creds']['login']
password = args.password if args.password else ipcam_config['web_creds']['password']
- client = HikvisionISAPIClient(args.host)
- client.auth(args.username, args.password)
-
- if args.get_ntp_server:
- print(client.get_ntp_server())
- return
-
- if not args.set_ntp_server:
- raise ArgumentError(None, '--set-ntp-server is required')
-
- if not validate_ipv4_or_hostname(args.set_ntp_server):
- raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname')
-
- client.set_ntp_server(args.set_ntp_server)
+ if action == Action.SET_NTP:
+ if not args.set_ntp_server:
+ raise ArgumentError(None, '--set-ntp-server is required')
+ if not validate_ipv4_or_hostname(args.set_ntp_server):
+ raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname')
+
+ kwargs = {}
+ if args.set_ntp_server:
+ kwargs['ntp_server'] = args.set_ntp_server
+ if not args.all:
+ process_hikvision_camera(args.host, action, login, password, **kwargs)
+ else:
+ for cam in ipcam_config.get_all_cam_names():
+ if not ipcam_config.get_camera_type(cam).is_hikvision():
+ continue
+ process_hikvision_camera(ipcam_config.get_camera_ip(cam), action, login, password, **kwargs)
if __name__ == '__main__':
diff --git a/include/py/homekit/camera/config.py b/include/py/homekit/camera/config.py
index 8aeb392..0ed75cf 100644
--- a/include/py/homekit/camera/config.py
+++ b/include/py/homekit/camera/config.py
@@ -30,6 +30,7 @@ class IpcamConfig(ConfigUnit):
'type': 'dict',
'schema': {
'type': {'type': 'string', 'allowed': [t.value for t in CameraType], 'required': True},
+ 'enabled': {'type': 'boolean'},
'motion': {
'type': 'dict',
'schema': {
@@ -87,13 +88,16 @@ class IpcamConfig(ConfigUnit):
@staticmethod
def custom_validator(data):
- for n, cam in data['cams'].items():
- linux_box = _lbc[cam['server']]
- if 'ext_hdd' not in linux_box:
- raise ValueError(f'cam-{n}: linux box {cam["server"]} must have ext_hdd defined')
- disk = cam['disk']-1
- if disk < 0 or disk >= len(linux_box['ext_hdd']):
- raise ValueError(f'cam-{n}: invalid disk index for linux box {cam["server"]}')
+ pass
+
+ # FIXME rewrite or delete, looks kinda obsolete
+ # for n, cam in data['cameras'].items():
+ # linux_box = _lbc[cam['server']]
+ # if 'ext_hdd' not in linux_box:
+ # raise ValueError(f'cam-{n}: linux box {cam["server"]} must have ext_hdd defined')
+ # disk = cam['disk']-1
+ # if disk < 0 or disk >= len(linux_box['ext_hdd']):
+ # raise ValueError(f'cam-{n}: invalid disk index for linux box {cam["server"]}')
@classmethod
def _url_templates_schema(cls) -> dict:
@@ -114,7 +118,7 @@ class IpcamConfig(ConfigUnit):
cams = []
if filter_by_server is not None and filter_by_server not in _lbc:
raise ValueError(f'invalid filter_by_server: {filter_by_server} not found in {_lbc.__class__.__name__}')
- for cam, params in self['cams'].items():
+ for cam, params in self['cameras'].items():
if filter_by_server is None or params['server'] == filter_by_server:
if filter_by_disk is None or params['disk'] == filter_by_disk:
cams.append(int(cam))
@@ -126,13 +130,13 @@ class IpcamConfig(ConfigUnit):
# filter_by_disk=filter_by_disk)
# def get_cam_server_and_disk(self, cam: int) -> tuple[str, int]:
- # return self['cams'][cam]['server'], self['cams'][cam]['disk']
+ # return self['cameras'][cam]['server'], self['cameras'][cam]['disk']
def get_camera_container(self, camera: int) -> VideoContainerType:
return self.get_camera_type(camera).get_container()
def get_camera_type(self, camera: int) -> CameraType:
- return CameraType(self['cams'][camera]['type'])
+ return CameraType(self['cameras'][camera]['type'])
def get_rtsp_creds(self) -> tuple[str, str]:
return self['rtsp_creds']['login'], self['rtsp_creds']['password']
diff --git a/include/py/homekit/camera/types.py b/include/py/homekit/camera/types.py
index da0fcc6..1a97e63 100644
--- a/include/py/homekit/camera/types.py
+++ b/include/py/homekit/camera/types.py
@@ -23,7 +23,7 @@ class CameraType(Enum):
if channel == 1:
return ''
elif channel == 2:
- if self.value in (CameraType.HIKVISION_264, CameraType.HIKVISION_265):
+ if self.is_hikvision():
return '/Streaming/Channels/2'
elif self.value == CameraType.ALIEXPRESS_NONAME:
return '/?stream=1.sdp'
@@ -41,6 +41,9 @@ class CameraType(Enum):
def get_container(self) -> VideoContainerType:
return VideoContainerType.MP4 if self.get_codec(1) == VideoCodecType.H264 else VideoContainerType.MOV
+ def is_hikvision(self) -> bool:
+ return self in (CameraType.HIKVISION_264.value, CameraType.HIKVISION_265)
+
class TimeFilterType(Enum):
FIX = 'fix'
diff --git a/include/py/homekit/config/_configs.py b/include/py/homekit/config/_configs.py
index 2cd2aca..43af25a 100644
--- a/include/py/homekit/config/_configs.py
+++ b/include/py/homekit/config/_configs.py
@@ -24,17 +24,18 @@ class LinuxBoardsConfig(ConfigUnit):
return {
'type': 'dict',
'schema': {
- 'mdns': {'type': 'string', 'required': True},
+ # 'mdns': {'type': 'string', 'required': True},
'board': {'type': 'string', 'required': True},
'location': {'type': 'string', 'required': True},
+ 'mac': cls._addr_schema(mac=True, required=False), # FIXME mac should be required field
'network': {
'type': 'list',
'required': True,
'empty': False,
'allowed': ['wifi', 'ethernet']
},
- 'ram': {'type': 'integer', 'required': True},
- 'online': {'type': 'boolean', 'required': True},
+ 'ram': {'type': 'integer', 'required': False}, # FIXME same as below
+ 'online': {'type': 'boolean', 'required': False}, # FIXME made required=False temporarily, should be always required I guess
# optional
'services': {
@@ -52,6 +53,12 @@ class LinuxBoardsConfig(ConfigUnit):
}
},
},
+ 'misc': {
+ 'type': 'dict',
+ 'schema': {
+ 'case': {'type': 'string', 'allowed': ['metal', 'plastic']}
+ }
+ },
}
}
diff --git a/include/py/homekit/config/config.py b/include/py/homekit/config/config.py
index fec92a6..40ac211 100644
--- a/include/py/homekit/config/config.py
+++ b/include/py/homekit/config/config.py
@@ -3,6 +3,7 @@ import logging
import os
import cerberus
import cerberus.errors
+import re
from abc import ABC
from typing import Optional, Any, MutableMapping, Union
@@ -135,11 +136,25 @@ class ConfigUnit(BaseConfigUnit):
return None
@classmethod
- def _addr_schema(cls, required=False, only_ip=False, **kwargs):
+ def _addr_schema(cls, required=False, mac=False, only_ip=False, **kwargs):
+ def validate_mac_address(field, value, error):
+ if not re.match("[0-9a-fA-F]{2}([-:])[0-9a-fA-F]{2}(\\1[0-9a-fA-F]{2}){4}$", value):
+ error(field, "Invalid MAC address format")
+
+ if mac:
+ l_kwargs = {
+ 'type': 'string',
+ 'check_with': validate_mac_address
+ }
+ else:
+ l_kwargs = {
+ 'type': 'addr',
+ 'coerce': Addr.fromstring if not only_ip else Addr.fromipstring,
+ }
+
return {
- 'type': 'addr',
- 'coerce': Addr.fromstring if not only_ip else Addr.fromipstring,
'required': required,
+ **l_kwargs,
**kwargs
}
diff --git a/include/py/homekit/util.py b/include/py/homekit/util.py
index f718291..7732d3b 100644
--- a/include/py/homekit/util.py
+++ b/include/py/homekit/util.py
@@ -41,7 +41,7 @@ def validate_ipv4_or_hostname(address: str, raise_exception: bool = False) -> bo
def validate_ipv4(address: str) -> bool:
try:
- ipaddress.IPv6Address(address)
+ ipaddress.IPv4Address(address)
return True
except ipaddress.AddressValueError:
return False