import requests from time import time from .util import xml_to_dict, sha256_hex from ...util import validate_ipv4 from ...http import HTTPMethod from typing import Optional, Union class ResponseError(RuntimeError): pass class AuthError(ResponseError): pass class ISAPIClient: def __init__(self, host): self.host = host self.cookies = {} def call(self, path: str, method: HTTPMethod = HTTPMethod.GET, data: Optional[Union[dict, str, bytes]] = None, raise_for_status=True, check_put_response=False): f = getattr(requests, method.value.lower()) headers = { 'X-Requested-With': 'XMLHttpRequest', } if method in (HTTPMethod.PUT, HTTPMethod.POST): headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' kwargs = {} if data: kwargs['data' if method is not HTTPMethod.GET else 'params'] = data if len(self.cookies) > 0: kwargs['cookies'] = self.cookies r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs) if raise_for_status or check_put_response: r.raise_for_status() parsed_xml = None if check_put_response: parsed_xml = xml_to_dict(r.text) resp = parsed_xml['ResponseStatus'] status_code = int(resp['statusCode'][0]) status_string = resp['statusString'][0] if status_code != 1 or status_string.lower() != 'ok': raise ResponseError('response status looks bad') self.cookies.update(r.cookies.get_dict()) if parsed_xml is None: parsed_xml = xml_to_dict(r.text) return parsed_xml def auth(self, username: str, password: str): xml = self.call('Security/sessionLogin/capabilities', data={'username': username}) caps = xml['SessionLoginCap'] is_irreversible = caps['isIrreversible'][0].lower() == 'true' # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl # also look into webAuth.js and utils.js if 'salt' in caps and is_irreversible: p = sha256_hex(username + caps['salt'][0] + password) p = sha256_hex(p + caps['challenge'][0]) for i in range(int(caps['iterations'][0])-2): p = sha256_hex(p) else: p = sha256_hex(password) + caps['challenge'][0] for i in range(int(caps['iterations'][0])-1): p = sha256_hex(p) data = '' data += f'{username}' data += f'{p}' data += f'{caps["sessionID"][0]}' data += 'false' data += f'{caps["sessionIDVersion"][0]}' data += '' resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin'] status_value = int(resp['statusValue'][0]) status_string = resp['statusString'][0] if status_value != 200: raise AuthError(f'{status_value}: {status_string}') def get_ntp_server(self) -> str: try: # works on newer 1080p cams xml = self.call('System/time/ntpServers/capabilities') ntp_server = xml['NTPServerList']['NTPServer'][0] if ntp_server['addressingFormatType'][0]['#text'] == 'hostname': ntp_host = ntp_server['hostName'][0] else: ntp_host = ntp_server['ipAddress'][0] except requests.exceptions.HTTPError: # works on older 720p cams ntp_server = self.call('System/time/ntpServers/1')['NTPServer'] if ntp_server['addressingFormatType'][0] == 'hostname': ntp_host = ntp_server['hostName'][0] else: ntp_host = ntp_server['ipAddress'][0] return ntp_host def set_timezone(self): data = '' data += '' self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True) def set_ntp_server(self, ntp_host: str, ntp_port: int = 123): format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname' # test ntp server first data = f'{format}{ntp_host}{ntp_port}' resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult'] error_code = int(resp['errorCode'][0]) error_description = resp['errorDescription'][0] if error_code != 0 or error_description.lower() != 'ok': raise ResponseError('response status looks bad') # then set it data = '' data += f'1{format}{ntp_host}{ntp_port}1440' self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True)