From 70b4a4f044cac8052bb0af7c585572e54489ea2f Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sat, 17 Feb 2024 23:20:49 +0300 Subject: ipcam_ntp_util: support chinese noname cameras --- bin/ipcam_ntp_util.py | 267 ++++++++++++-------------------------------------- 1 file changed, 65 insertions(+), 202 deletions(-) (limited to 'bin') diff --git a/bin/ipcam_ntp_util.py b/bin/ipcam_ntp_util.py index b27995c..81f6fe0 100755 --- a/bin/ipcam_ntp_util.py +++ b/bin/ipcam_ntp_util.py @@ -1,16 +1,13 @@ #!/usr/bin/env python3 import __py_include -import requests -import hashlib -import xml.etree.ElementTree as ET +import homekit.camera.hikvision as hikvision +import homekit.camera.alinoname as alinoname from enum import Enum, auto -from time import time from typing import Optional from argparse import ArgumentParser, ArgumentError -from homekit.util import validate_ipv4, validate_ipv4_or_hostname -from homekit.camera import IpcamConfig - +from homekit.util import validate_ipv4_or_hostname +from homekit.camera import IpcamConfig, CameraType ipcam_config = IpcamConfig() @@ -20,216 +17,63 @@ class Action(Enum): SET_NTP = auto() -def xml_to_dict(xml_data: str) -> dict: - # Parse the XML data - root = ET.fromstring(xml_data) - - # Function to remove namespace from the tag name - def remove_namespace(tag): - return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace - - # Function to recursively convert XML elements to a dictionary - def elem_to_dict(elem): - tag = remove_namespace(elem.tag) - elem_dict = {tag: {}} - - # If the element has attributes, add them to the dictionary - elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()}) - - # Handle the element's text content, if present and not just whitespace - text = elem.text.strip() if elem.text and elem.text.strip() else None - if text: - elem_dict[tag]['#text'] = text - - # Process child elements - for child in elem: - child_dict = elem_to_dict(child) - child_tag = remove_namespace(child.tag) - if child_tag not in elem_dict[tag]: - elem_dict[tag][child_tag] = [] - elem_dict[tag][child_tag].append(child_dict[child_tag]) - - # Simplify structure if there's only text or no children and no attributes - if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]: - return {tag: elem_dict[tag]['#text']} - elif not elem_dict[tag]: - return {tag: ''} - - return elem_dict - - # Convert the root element to dictionary - return elem_to_dict(root) - - -def sha256_hex(input_string: str) -> str: - return hashlib.sha256(input_string.encode()).hexdigest() - - -class ResponseError(RuntimeError): - pass - - -class AuthError(ResponseError): - pass - - -class HikvisionISAPIClient: - def __init__(self, host): - self.host = host - self.cookies = {} - - def auth(self, username: str, password: str): - r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'), - {'username': username}, - headers={ - 'X-Requested-With': 'XMLHttpRequest', - }) - r.raise_for_status() - caps = xml_to_dict(r.text)['SessionLoginCap'] - is_irreversible = caps['isIrreversible'][0].lower() == 'true' - - # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl - # also look into webAuth.js and utils.js - - if 'salt' in caps and is_irreversible: - p = sha256_hex(username + caps['salt'][0] + password) - p = sha256_hex(p + caps['challenge'][0]) - for i in range(int(caps['iterations'][0])-2): - p = sha256_hex(p) - else: - p = sha256_hex(password) + caps['challenge'][0] - for i in range(int(caps['iterations'][0])-1): - p = sha256_hex(p) - - data = '' - data += f'{username}' - data += f'{p}' - data += f'{caps["sessionID"][0]}' - data += 'false' - data += f'{caps["sessionIDVersion"][0]}' - data += '' - - r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={ - 'Accept-Encoding': 'gzip, deflate', - 'If-Modified-Since': '0', - 'X-Requested-With': 'XMLHttpRequest', - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', - }) - r.raise_for_status() - resp = xml_to_dict(r.text)['SessionLogin'] - status_value = int(resp['statusValue'][0]) - status_string = resp['statusString'][0] - if status_value != 200: - raise AuthError(f'{status_value}: {status_string}') - - self.cookies = r.cookies.get_dict() - - def get_ntp_server(self) -> str: - r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies) - r.raise_for_status() - ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0] - - if ntp_server['addressingFormatType'][0]['#text'] == 'hostname': - ntp_host = ntp_server['hostName'][0] - else: - ntp_host = ntp_server['ipAddress'][0] - - return ntp_host - - def set_timezone(self): - data = '' - data += '' - - r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={ - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', - 'X-Requested-With': 'XMLHttpRequest', - }) - self.isapi_check_put_response(r) - - def set_ntp_server(self, - ntp_host: str, - ntp_port: int = 123): - format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname' - - # test ntp server first - data = f'{format}{ntp_host}{ntp_port}' - r = requests.post(self.isapi_uri('System/time/ntpServers/test'), data=data, headers={ - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', - 'X-Requested-With': 'XMLHttpRequest', - }, cookies=self.cookies) - r.raise_for_status() - - resp = xml_to_dict(r.text)['NTPTestResult'] - - error_code = int(resp['errorCode'][0]) - error_description = resp['errorDescription'][0] - - if error_code != 0 or error_description.lower() != 'ok': - raise ResponseError('response status looks bad') +def process_camera(host: str, + action: Action, + login: str, + password: str, + camera_type: CameraType, + ntp_server: Optional[str] = None): + if camera_type.is_hikvision(): + client = hikvision.ISAPIClient(host) + try: + client.auth(login, password) + if action == Action.GET_NTP: + print(f'[{host}] {client.get_ntp_server()}') + return - # then set it - data = '' - data += f'1{format}{ntp_host}{ntp_port}1440' + client.set_ntp_server(ntp_server) + print(f'[{host}] done') - r = requests.put(self.isapi_uri('System/time/ntpServers/1'), - data=data, - cookies=self.cookies, - headers={ - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', - 'X-Requested-With': 'XMLHttpRequest', - }) - self.isapi_check_put_response(r) + except hikvision.AuthError as e: + print(f'[{host}] ({str(e)})') - def isapi_uri(self, path: str) -> str: - return f'http://{self.host}/ISAPI/{path}' + except hikvision.ResponseError as e: + print(f'[{host}] ({str(e)})') - def isapi_check_put_response(self, r): + elif camera_type.is_ali(): try: - r.raise_for_status() - except requests.exceptions.HTTPError as e: - # print(r.text) - raise e - - resp = xml_to_dict(r.text)['ResponseStatus'] - - status_code = int(resp['statusCode'][0]) - status_string = resp['statusString'][0] + client = alinoname.XMEyeCamera(hostname=host, username=login, password=password) + client.login() - if status_code != 1 or status_string.lower() != 'ok': - raise ResponseError('response status looks bad') + if action == Action.GET_NTP: + print(f'[{host}] {client.get_ntp_server()}') + return + client.set_ntp_server(ntp_server) + print(f'[{host}] done') -def process_hikvision_camera(host: str, - action: Action, - login: str, - password: str, - ntp_server: Optional[str] = None): - client = HikvisionISAPIClient(host) - try: - client.auth(login, password) - if action == Action.GET_NTP: - print(f'[{host}] {client.get_ntp_server()}') - return - client.set_ntp_server(ntp_server) - print(f'[{host}] done') - except AuthError as e: - print(f'[{host}] ({str(e)})') - except ResponseError as e: - print(f'[{host}] ({str(e)})') + except OSError as e: + print(f'[{host}] ({str(e)})') def main(): + camera_types = ['hikvision', 'ali'] parser = ArgumentParser() - parser.add_argument('--host', type=str) + parser.add_argument('--camera', type=str) + parser.add_argument('--camera-type', type=str, choices=camera_types) parser.add_argument('--all', action='store_true') + parser.add_argument('--all-of-type', type=str, choices=camera_types) parser.add_argument('--get-ntp-server', action='store_true') parser.add_argument('--set-ntp-server', type=str) parser.add_argument('--username', type=str) parser.add_argument('--password', type=str) args = parser.parse_args() - if not args.host and not args.all: - raise ArgumentError(None, 'either --all or --host is required') + if args.all and args.all_of_type: + raise ArgumentError(None, 'you can\'t pass both --all and --all-of-type') + + if not args.camera and not args.all and not args.all_of_type: + raise ArgumentError(None, 'either --all, --all-of-type or --camera is required') if not args.get_ntp_server and not args.set_ntp_server: raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required') @@ -247,13 +91,32 @@ def main(): kwargs = {} if args.set_ntp_server: kwargs['ntp_server'] = args.set_ntp_server - if not args.all: - process_hikvision_camera(args.host, action, login, password, **kwargs) + if not args.all and not args.all_of_type: + if not args.camera_type: + raise ArgumentError(None, '--camera-type is required') + + if not ipcam_config.has_camera(int(args.camera)): + raise ArgumentError(None, f'invalid camera {args.camera}') + camera_host = ipcam_config.get_camera_ip(args.camera) + + if args.camera_type == 'hikvision': + camera_type = CameraType.HIKVISION_264 + elif args.camera_type == 'ali': + camera_type = CameraType.ALIEXPRESS_NONAME + else: + raise ValueError('invalid --camera-type') + process_camera(camera_host, action, login, password, camera_type, **kwargs) else: for cam in ipcam_config.get_all_cam_names(): - if not ipcam_config.get_camera_type(cam).is_hikvision(): + if not ipcam_config.is_camera_enabled(cam): + continue + + cam_type = ipcam_config.get_camera_type(cam) + if args.all_of_type == 'hikvision' and not cam_type.is_hikvision(): + continue + if args.all_of_type == 'ali' and not ipcam_config.get_camera_type(cam).is_ali(): continue - process_hikvision_camera(ipcam_config.get_camera_ip(cam), action, login, password, **kwargs) + process_camera(ipcam_config.get_camera_ip(cam), action, login, password, cam_type, **kwargs) if __name__ == '__main__': -- cgit v1.2.3