#!/usr/bin/env python3 import __py_include import requests import hashlib import xml.etree.ElementTree as ET from enum import Enum, auto from time import time from typing import Optional from argparse import ArgumentParser, ArgumentError from homekit.util import validate_ipv4, validate_ipv4_or_hostname from homekit.camera import IpcamConfig ipcam_config = IpcamConfig() class Action(Enum): GET_NTP = auto() SET_NTP = auto() def xml_to_dict(xml_data: str) -> dict: # Parse the XML data root = ET.fromstring(xml_data) # Function to remove namespace from the tag name def remove_namespace(tag): return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace # Function to recursively convert XML elements to a dictionary def elem_to_dict(elem): tag = remove_namespace(elem.tag) elem_dict = {tag: {}} # If the element has attributes, add them to the dictionary elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()}) # Handle the element's text content, if present and not just whitespace text = elem.text.strip() if elem.text and elem.text.strip() else None if text: elem_dict[tag]['#text'] = text # Process child elements for child in elem: child_dict = elem_to_dict(child) child_tag = remove_namespace(child.tag) if child_tag not in elem_dict[tag]: elem_dict[tag][child_tag] = [] elem_dict[tag][child_tag].append(child_dict[child_tag]) # Simplify structure if there's only text or no children and no attributes if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]: return {tag: elem_dict[tag]['#text']} elif not elem_dict[tag]: return {tag: ''} return elem_dict # Convert the root element to dictionary return elem_to_dict(root) def sha256_hex(input_string: str) -> str: return hashlib.sha256(input_string.encode()).hexdigest() class ResponseError(RuntimeError): pass class AuthError(ResponseError): pass class HikvisionISAPIClient: def __init__(self, host): self.host = host self.cookies = {} def auth(self, username: str, password: str): r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'), {'username': username}, headers={ 'X-Requested-With': 'XMLHttpRequest', }) r.raise_for_status() caps = xml_to_dict(r.text)['SessionLoginCap'] is_irreversible = caps['isIrreversible'][0].lower() == 'true' # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl # also look into webAuth.js and utils.js if 'salt' in caps and is_irreversible: p = sha256_hex(username + caps['salt'][0] + password) p = sha256_hex(p + caps['challenge'][0]) for i in range(int(caps['iterations'][0])-2): p = sha256_hex(p) else: p = sha256_hex(password) + caps['challenge'][0] for i in range(int(caps['iterations'][0])-1): p = sha256_hex(p) data = '' data += f'{username}' data += f'{p}' data += f'{caps["sessionID"][0]}' data += 'false' data += f'{caps["sessionIDVersion"][0]}' data += '' r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={ 'Accept-Encoding': 'gzip, deflate', 'If-Modified-Since': '0', 'X-Requested-With': 'XMLHttpRequest', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', }) r.raise_for_status() resp = xml_to_dict(r.text)['SessionLogin'] status_value = int(resp['statusValue'][0]) status_string = resp['statusString'][0] if status_value != 200: raise AuthError(f'{status_value}: {status_string}') self.cookies = r.cookies.get_dict() def get_ntp_server(self) -> str: r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies) r.raise_for_status() ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0] if ntp_server['addressingFormatType'][0]['#text'] == 'hostname': ntp_host = ntp_server['hostName'][0] else: ntp_host = ntp_server['ipAddress'][0] return ntp_host def set_timezone(self): data = '' data += '' r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={ 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'X-Requested-With': 'XMLHttpRequest', }) self.isapi_check_put_response(r) def set_ntp_server(self, ntp_host: str, ntp_port: int = 123): format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname' # test ntp server first data = f'{format}{ntp_host}{ntp_port}' r = requests.post(self.isapi_uri('System/time/ntpServers/test'), data=data, headers={ 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'X-Requested-With': 'XMLHttpRequest', }, cookies=self.cookies) r.raise_for_status() resp = xml_to_dict(r.text)['NTPTestResult'] error_code = int(resp['errorCode'][0]) error_description = resp['errorDescription'][0] if error_code != 0 or error_description.lower() != 'ok': raise ResponseError('response status looks bad') # then set it data = '' data += f'1{format}{ntp_host}{ntp_port}1440' r = requests.put(self.isapi_uri('System/time/ntpServers/1'), data=data, cookies=self.cookies, headers={ 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'X-Requested-With': 'XMLHttpRequest', }) self.isapi_check_put_response(r) def isapi_uri(self, path: str) -> str: return f'http://{self.host}/ISAPI/{path}' def isapi_check_put_response(self, r): try: r.raise_for_status() except requests.exceptions.HTTPError as e: # print(r.text) raise e resp = xml_to_dict(r.text)['ResponseStatus'] status_code = int(resp['statusCode'][0]) status_string = resp['statusString'][0] if status_code != 1 or status_string.lower() != 'ok': raise ResponseError('response status looks bad') def process_hikvision_camera(host: str, action: Action, login: str, password: str, ntp_server: Optional[str] = None): client = HikvisionISAPIClient(host) try: client.auth(login, password) if action == Action.GET_NTP: print(f'[{host}] {client.get_ntp_server()}') return client.set_ntp_server(ntp_server) print(f'[{host}] done') except AuthError as e: print(f'[{host}] ({str(e)})') except ResponseError as e: print(f'[{host}] ({str(e)})') def main(): parser = ArgumentParser() parser.add_argument('--host', type=str) parser.add_argument('--all', action='store_true') parser.add_argument('--get-ntp-server', action='store_true') parser.add_argument('--set-ntp-server', type=str) parser.add_argument('--username', type=str) parser.add_argument('--password', type=str) args = parser.parse_args() if not args.host and not args.all: raise ArgumentError(None, 'either --all or --host is required') if not args.get_ntp_server and not args.set_ntp_server: raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required') action = Action.GET_NTP if args.get_ntp_server else Action.SET_NTP login = args.username if args.username else ipcam_config['web_creds']['login'] password = args.password if args.password else ipcam_config['web_creds']['password'] if action == Action.SET_NTP: if not args.set_ntp_server: raise ArgumentError(None, '--set-ntp-server is required') if not validate_ipv4_or_hostname(args.set_ntp_server): raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname') kwargs = {} if args.set_ntp_server: kwargs['ntp_server'] = args.set_ntp_server if not args.all: process_hikvision_camera(args.host, action, login, password, **kwargs) else: for cam in ipcam_config.get_all_cam_names(): if not ipcam_config.get_camera_type(cam).is_hikvision(): continue process_hikvision_camera(ipcam_config.get_camera_ip(cam), action, login, password, **kwargs) if __name__ == '__main__': main()