From c5e69cf2c9b89d546ad7a4f6bb26aef47021dd50 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sat, 17 Feb 2024 03:51:08 +0300 Subject: ipcam_ntp_util (wip: only supports hikvision cams for now) --- bin/ipcam_ntp_util.py | 83 ++++++++++++++++++++++++++--------- include/py/homekit/camera/config.py | 24 +++++----- include/py/homekit/camera/types.py | 5 ++- include/py/homekit/config/_configs.py | 13 ++++-- include/py/homekit/config/config.py | 21 +++++++-- include/py/homekit/util.py | 2 +- 6 files changed, 110 insertions(+), 38 deletions(-) diff --git a/bin/ipcam_ntp_util.py b/bin/ipcam_ntp_util.py index 98639bd..0268a06 100755 --- a/bin/ipcam_ntp_util.py +++ b/bin/ipcam_ntp_util.py @@ -4,12 +4,22 @@ import requests import hashlib import xml.etree.ElementTree as ET +from enum import Enum, auto from time import time +from typing import Optional from argparse import ArgumentParser, ArgumentError from homekit.util import validate_ipv4, validate_ipv4_or_hostname from homekit.camera import IpcamConfig +ipcam_config = IpcamConfig() + + +class Action(Enum): + GET_NTP = auto() + SET_NTP = auto() + + def xml_to_dict(xml_data: str) -> dict: # Parse the XML data root = ET.fromstring(xml_data) @@ -131,11 +141,14 @@ class HikvisionISAPIClient: data += '' r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={ - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8' + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', + 'X-Requested-With': 'XMLHttpRequest', }) self.isapi_check_put_response(r) - def set_ntp_server(self, ntp_host: str, ntp_port: int = 123): + def set_ntp_server(self, + ntp_host: str, + ntp_port: int = 123): format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname' data = '' @@ -145,7 +158,8 @@ class HikvisionISAPIClient: data=data, cookies=self.cookies, headers={ - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8' + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', + 'X-Requested-With': 'XMLHttpRequest', }) self.isapi_check_put_response(r) @@ -153,7 +167,12 @@ class HikvisionISAPIClient: return f'http://{self.host}/ISAPI/{path}' def isapi_check_put_response(self, r): - r.raise_for_status() + try: + r.raise_for_status() + except requests.exceptions.HTTPError as e: + # print(r.text) + raise e + resp = xml_to_dict(r.text)['ResponseStatus'] status_code = int(resp['statusCode'][0]) @@ -163,36 +182,60 @@ class HikvisionISAPIClient: raise ResponseError('response status looks bad') +def process_hikvision_camera(host: str, + action: Action, + login: str, + password: str, + ntp_server: Optional[str] = None): + client = HikvisionISAPIClient(host) + try: + client.auth(login, password) + if action == Action.GET_NTP: + print(f'[{host}] {client.get_ntp_server()}') + return + client.set_ntp_server(ntp_server) + except AuthError as e: + print(f'[{host}] ({str(e)})') + except ResponseError as e: + print(f'[{host}] ({str(e)})') + + def main(): parser = ArgumentParser() - parser.add_argument('--host', type=str, required=True) + parser.add_argument('--host', type=str) + parser.add_argument('--all', action='store_true') parser.add_argument('--get-ntp-server', action='store_true') parser.add_argument('--set-ntp-server', type=str) parser.add_argument('--username', type=str) parser.add_argument('--password', type=str) args = parser.parse_args() + if not args.host and not args.all: + raise ArgumentError(None, 'either --all or --host is required') + if not args.get_ntp_server and not args.set_ntp_server: raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required') - ipcam_config = IpcamConfig() + action = Action.GET_NTP if args.get_ntp_server else Action.SET_NTP login = args.username if args.username else ipcam_config['web_creds']['login'] password = args.password if args.password else ipcam_config['web_creds']['password'] - client = HikvisionISAPIClient(args.host) - client.auth(args.username, args.password) - - if args.get_ntp_server: - print(client.get_ntp_server()) - return - - if not args.set_ntp_server: - raise ArgumentError(None, '--set-ntp-server is required') - - if not validate_ipv4_or_hostname(args.set_ntp_server): - raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname') - - client.set_ntp_server(args.set_ntp_server) + if action == Action.SET_NTP: + if not args.set_ntp_server: + raise ArgumentError(None, '--set-ntp-server is required') + if not validate_ipv4_or_hostname(args.set_ntp_server): + raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname') + + kwargs = {} + if args.set_ntp_server: + kwargs['ntp_server'] = args.set_ntp_server + if not args.all: + process_hikvision_camera(args.host, action, login, password, **kwargs) + else: + for cam in ipcam_config.get_all_cam_names(): + if not ipcam_config.get_camera_type(cam).is_hikvision(): + continue + process_hikvision_camera(ipcam_config.get_camera_ip(cam), action, login, password, **kwargs) if __name__ == '__main__': diff --git a/include/py/homekit/camera/config.py b/include/py/homekit/camera/config.py index 8aeb392..0ed75cf 100644 --- a/include/py/homekit/camera/config.py +++ b/include/py/homekit/camera/config.py @@ -30,6 +30,7 @@ class IpcamConfig(ConfigUnit): 'type': 'dict', 'schema': { 'type': {'type': 'string', 'allowed': [t.value for t in CameraType], 'required': True}, + 'enabled': {'type': 'boolean'}, 'motion': { 'type': 'dict', 'schema': { @@ -87,13 +88,16 @@ class IpcamConfig(ConfigUnit): @staticmethod def custom_validator(data): - for n, cam in data['cams'].items(): - linux_box = _lbc[cam['server']] - if 'ext_hdd' not in linux_box: - raise ValueError(f'cam-{n}: linux box {cam["server"]} must have ext_hdd defined') - disk = cam['disk']-1 - if disk < 0 or disk >= len(linux_box['ext_hdd']): - raise ValueError(f'cam-{n}: invalid disk index for linux box {cam["server"]}') + pass + + # FIXME rewrite or delete, looks kinda obsolete + # for n, cam in data['cameras'].items(): + # linux_box = _lbc[cam['server']] + # if 'ext_hdd' not in linux_box: + # raise ValueError(f'cam-{n}: linux box {cam["server"]} must have ext_hdd defined') + # disk = cam['disk']-1 + # if disk < 0 or disk >= len(linux_box['ext_hdd']): + # raise ValueError(f'cam-{n}: invalid disk index for linux box {cam["server"]}') @classmethod def _url_templates_schema(cls) -> dict: @@ -114,7 +118,7 @@ class IpcamConfig(ConfigUnit): cams = [] if filter_by_server is not None and filter_by_server not in _lbc: raise ValueError(f'invalid filter_by_server: {filter_by_server} not found in {_lbc.__class__.__name__}') - for cam, params in self['cams'].items(): + for cam, params in self['cameras'].items(): if filter_by_server is None or params['server'] == filter_by_server: if filter_by_disk is None or params['disk'] == filter_by_disk: cams.append(int(cam)) @@ -126,13 +130,13 @@ class IpcamConfig(ConfigUnit): # filter_by_disk=filter_by_disk) # def get_cam_server_and_disk(self, cam: int) -> tuple[str, int]: - # return self['cams'][cam]['server'], self['cams'][cam]['disk'] + # return self['cameras'][cam]['server'], self['cameras'][cam]['disk'] def get_camera_container(self, camera: int) -> VideoContainerType: return self.get_camera_type(camera).get_container() def get_camera_type(self, camera: int) -> CameraType: - return CameraType(self['cams'][camera]['type']) + return CameraType(self['cameras'][camera]['type']) def get_rtsp_creds(self) -> tuple[str, str]: return self['rtsp_creds']['login'], self['rtsp_creds']['password'] diff --git a/include/py/homekit/camera/types.py b/include/py/homekit/camera/types.py index da0fcc6..1a97e63 100644 --- a/include/py/homekit/camera/types.py +++ b/include/py/homekit/camera/types.py @@ -23,7 +23,7 @@ class CameraType(Enum): if channel == 1: return '' elif channel == 2: - if self.value in (CameraType.HIKVISION_264, CameraType.HIKVISION_265): + if self.is_hikvision(): return '/Streaming/Channels/2' elif self.value == CameraType.ALIEXPRESS_NONAME: return '/?stream=1.sdp' @@ -41,6 +41,9 @@ class CameraType(Enum): def get_container(self) -> VideoContainerType: return VideoContainerType.MP4 if self.get_codec(1) == VideoCodecType.H264 else VideoContainerType.MOV + def is_hikvision(self) -> bool: + return self in (CameraType.HIKVISION_264.value, CameraType.HIKVISION_265) + class TimeFilterType(Enum): FIX = 'fix' diff --git a/include/py/homekit/config/_configs.py b/include/py/homekit/config/_configs.py index 2cd2aca..43af25a 100644 --- a/include/py/homekit/config/_configs.py +++ b/include/py/homekit/config/_configs.py @@ -24,17 +24,18 @@ class LinuxBoardsConfig(ConfigUnit): return { 'type': 'dict', 'schema': { - 'mdns': {'type': 'string', 'required': True}, + # 'mdns': {'type': 'string', 'required': True}, 'board': {'type': 'string', 'required': True}, 'location': {'type': 'string', 'required': True}, + 'mac': cls._addr_schema(mac=True, required=False), # FIXME mac should be required field 'network': { 'type': 'list', 'required': True, 'empty': False, 'allowed': ['wifi', 'ethernet'] }, - 'ram': {'type': 'integer', 'required': True}, - 'online': {'type': 'boolean', 'required': True}, + 'ram': {'type': 'integer', 'required': False}, # FIXME same as below + 'online': {'type': 'boolean', 'required': False}, # FIXME made required=False temporarily, should be always required I guess # optional 'services': { @@ -52,6 +53,12 @@ class LinuxBoardsConfig(ConfigUnit): } }, }, + 'misc': { + 'type': 'dict', + 'schema': { + 'case': {'type': 'string', 'allowed': ['metal', 'plastic']} + } + }, } } diff --git a/include/py/homekit/config/config.py b/include/py/homekit/config/config.py index fec92a6..40ac211 100644 --- a/include/py/homekit/config/config.py +++ b/include/py/homekit/config/config.py @@ -3,6 +3,7 @@ import logging import os import cerberus import cerberus.errors +import re from abc import ABC from typing import Optional, Any, MutableMapping, Union @@ -135,11 +136,25 @@ class ConfigUnit(BaseConfigUnit): return None @classmethod - def _addr_schema(cls, required=False, only_ip=False, **kwargs): + def _addr_schema(cls, required=False, mac=False, only_ip=False, **kwargs): + def validate_mac_address(field, value, error): + if not re.match("[0-9a-fA-F]{2}([-:])[0-9a-fA-F]{2}(\\1[0-9a-fA-F]{2}){4}$", value): + error(field, "Invalid MAC address format") + + if mac: + l_kwargs = { + 'type': 'string', + 'check_with': validate_mac_address + } + else: + l_kwargs = { + 'type': 'addr', + 'coerce': Addr.fromstring if not only_ip else Addr.fromipstring, + } + return { - 'type': 'addr', - 'coerce': Addr.fromstring if not only_ip else Addr.fromipstring, 'required': required, + **l_kwargs, **kwargs } diff --git a/include/py/homekit/util.py b/include/py/homekit/util.py index f718291..7732d3b 100644 --- a/include/py/homekit/util.py +++ b/include/py/homekit/util.py @@ -41,7 +41,7 @@ def validate_ipv4_or_hostname(address: str, raise_exception: bool = False) -> bo def validate_ipv4(address: str) -> bool: try: - ipaddress.IPv6Address(address) + ipaddress.IPv4Address(address) return True except ipaddress.AddressValueError: return False -- cgit v1.2.3