diff options
Diffstat (limited to 'include/py/hikvision/isapi.py')
-rw-r--r-- | include/py/hikvision/isapi.py | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/include/py/hikvision/isapi.py b/include/py/hikvision/isapi.py new file mode 100644 index 0000000..6cc34f8 --- /dev/null +++ b/include/py/hikvision/isapi.py @@ -0,0 +1,137 @@ +import requests + +from time import time +from .util import xml_to_dict, sha256_hex +from ...util import validate_ipv4 +from ...http import HTTPMethod +from typing import Optional, Union + + +class ResponseError(RuntimeError): + pass + + +class AuthError(ResponseError): + pass + + +class ISAPIClient: + def __init__(self, host): + self.host = host + self.cookies = {} + + def call(self, + path: str, + method: HTTPMethod = HTTPMethod.GET, + data: Optional[Union[dict, str, bytes]] = None, + raise_for_status=True, + check_put_response=False): + f = getattr(requests, method.value.lower()) + + headers = { + 'X-Requested-With': 'XMLHttpRequest', + } + if method in (HTTPMethod.PUT, HTTPMethod.POST): + headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' + + kwargs = {} + if data: + kwargs['data' if method is not HTTPMethod.GET else 'params'] = data + if len(self.cookies) > 0: + kwargs['cookies'] = self.cookies + + r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs) + + if raise_for_status or check_put_response: + r.raise_for_status() + + parsed_xml = None + if check_put_response: + parsed_xml = xml_to_dict(r.text) + resp = parsed_xml['ResponseStatus'] + status_code = int(resp['statusCode'][0]) + status_string = resp['statusString'][0] + if status_code != 1 or status_string.lower() != 'ok': + raise ResponseError('response status looks bad') + + self.cookies.update(r.cookies.get_dict()) + + if parsed_xml is None: + parsed_xml = xml_to_dict(r.text) + + return parsed_xml + + def auth(self, username: str, password: str): + xml = self.call('Security/sessionLogin/capabilities', data={'username': username}) + caps = xml['SessionLoginCap'] + is_irreversible = caps['isIrreversible'][0].lower() == 'true' + + # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl + # also look into webAuth.js and utils.js + + if 'salt' in caps and is_irreversible: + p = sha256_hex(username + caps['salt'][0] + password) + p = sha256_hex(p + caps['challenge'][0]) + for i in range(int(caps['iterations'][0])-2): + p = sha256_hex(p) + else: + p = sha256_hex(password) + caps['challenge'][0] + for i in range(int(caps['iterations'][0])-1): + p = sha256_hex(p) + + data = '<SessionLogin>' + data += f'<userName>{username}</userName>' + data += f'<password>{p}</password>' + data += f'<sessionID>{caps["sessionID"][0]}</sessionID>' + data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>' + data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>' + data += '</SessionLogin>' + + resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin'] + status_value = int(resp['statusValue'][0]) + status_string = resp['statusString'][0] + if status_value != 200: + raise AuthError(f'{status_value}: {status_string}') + + def get_ntp_server(self) -> str: + try: + # works on newer 1080p cams + xml = self.call('System/time/ntpServers/capabilities') + ntp_server = xml['NTPServerList']['NTPServer'][0] + if ntp_server['addressingFormatType'][0]['#text'] == 'hostname': + ntp_host = ntp_server['hostName'][0] + else: + ntp_host = ntp_server['ipAddress'][0] + + except requests.exceptions.HTTPError: + # works on older 720p cams + ntp_server = self.call('System/time/ntpServers/1')['NTPServer'] + if ntp_server['addressingFormatType'][0] == 'hostname': + ntp_host = ntp_server['hostName'][0] + else: + ntp_host = ntp_server['ipAddress'][0] + + return ntp_host + + def set_timezone(self): + data = '<?xml version="1.0" encoding="UTF-8"?>' + data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>' + self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True) + + def set_ntp_server(self, + ntp_host: str, + ntp_port: int = 123): + format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname' + + # test ntp server first + data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>' + resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult'] + error_code = int(resp['errorCode'][0]) + error_description = resp['errorDescription'][0] + if error_code != 0 or error_description.lower() != 'ok': + raise ResponseError('response status looks bad') + + # then set it + data = '<?xml version="1.0" encoding="UTF-8"?>' + data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>' + self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True) |