#!/usr/bin/env python3 import __py_include import requests import hashlib import xml.etree.ElementTree as ET from time import time from argparse import ArgumentParser, ArgumentError from homekit.util import validate_ipv4, validate_ipv4_or_hostname from homekit.camera import IpcamConfig def xml_to_dict(xml_data: str) -> dict: # Parse the XML data root = ET.fromstring(xml_data) # Function to remove namespace from the tag name def remove_namespace(tag): return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace # Function to recursively convert XML elements to a dictionary def elem_to_dict(elem): tag = remove_namespace(elem.tag) elem_dict = {tag: {}} # If the element has attributes, add them to the dictionary elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()}) # Handle the element's text content, if present and not just whitespace text = elem.text.strip() if elem.text and elem.text.strip() else None if text: elem_dict[tag]['#text'] = text # Process child elements for child in elem: child_dict = elem_to_dict(child) child_tag = remove_namespace(child.tag) if child_tag not in elem_dict[tag]: elem_dict[tag][child_tag] = [] elem_dict[tag][child_tag].append(child_dict[child_tag]) # Simplify structure if there's only text or no children and no attributes if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]: return {tag: elem_dict[tag]['#text']} elif not elem_dict[tag]: return {tag: ''} return elem_dict # Convert the root element to dictionary return elem_to_dict(root) def sha256_hex(input_string: str) -> str: return hashlib.sha256(input_string.encode()).hexdigest() class ResponseError(RuntimeError): pass class AuthError(ResponseError): pass class HikvisionISAPIClient: def __init__(self, host): self.host = host self.cookies = {} def auth(self, username: str, password: str): r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'), {'username': username}, headers={ 'X-Requested-With': 'XMLHttpRequest', }) r.raise_for_status() caps = xml_to_dict(r.text)['SessionLoginCap'] is_irreversible = caps['isIrreversible'][0].lower() == 'true' # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl # also look into webAuth.js and utils.js if 'salt' in caps and is_irreversible: p = sha256_hex(username + caps['salt'][0] + password) p = sha256_hex(p + caps['challenge'][0]) for i in range(int(caps['iterations'][0])-2): p = sha256_hex(p) else: p = sha256_hex(password) + caps['challenge'][0] for i in range(int(caps['iterations'][0])-1): p = sha256_hex(p) data = '' data += f'{username}' data += f'{p}' data += f'{caps["sessionID"][0]}' data += 'false' data += f'{caps["sessionIDVersion"][0]}' data += '' r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={ 'Accept-Encoding': 'gzip, deflate', 'If-Modified-Since': '0', 'X-Requested-With': 'XMLHttpRequest', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', }) r.raise_for_status() resp = xml_to_dict(r.text)['SessionLogin'] status_value = int(resp['statusValue'][0]) status_string = resp['statusString'][0] if status_value != 200: raise AuthError(f'{status_value}: {status_string}') self.cookies = r.cookies.get_dict() def get_ntp_server(self) -> str: r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies) r.raise_for_status() ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0] if ntp_server['addressingFormatType'][0]['#text'] == 'hostname': ntp_host = ntp_server['hostName'][0] else: ntp_host = ntp_server['ipAddress'][0] return ntp_host def set_timezone(self): data = '' data += '' r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={ 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8' }) self.isapi_check_put_response(r) def set_ntp_server(self, ntp_host: str, ntp_port: int = 123): format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname' data = '' data += f'1{format}{ntp_host}{ntp_port}1440' r = requests.put(self.isapi_uri('System/time/ntpServers/1'), data=data, cookies=self.cookies, headers={ 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8' }) self.isapi_check_put_response(r) def isapi_uri(self, path: str) -> str: return f'http://{self.host}/ISAPI/{path}' def isapi_check_put_response(self, r): r.raise_for_status() resp = xml_to_dict(r.text)['ResponseStatus'] status_code = int(resp['statusCode'][0]) status_string = resp['statusString'][0] if status_code != 1 or status_string.lower() != 'ok': raise ResponseError('response status looks bad') def main(): parser = ArgumentParser() parser.add_argument('--host', type=str, required=True) parser.add_argument('--get-ntp-server', action='store_true') parser.add_argument('--set-ntp-server', type=str) parser.add_argument('--username', type=str) parser.add_argument('--password', type=str) args = parser.parse_args() if not args.get_ntp_server and not args.set_ntp_server: raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required') ipcam_config = IpcamConfig() login = args.username if args.username else ipcam_config['web_creds']['login'] password = args.password if args.password else ipcam_config['web_creds']['password'] client = HikvisionISAPIClient(args.host) client.auth(args.username, args.password) if args.get_ntp_server: print(client.get_ntp_server()) return if not args.set_ntp_server: raise ArgumentError(None, '--set-ntp-server is required') if not validate_ipv4_or_hostname(args.set_ntp_server): raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname') client.set_ntp_server(args.set_ntp_server) if __name__ == '__main__': main()