summaryrefslogtreecommitdiff
path: root/include/py/hikvision/isapi.py
diff options
context:
space:
mode:
Diffstat (limited to 'include/py/hikvision/isapi.py')
-rw-r--r--include/py/hikvision/isapi.py137
1 files changed, 137 insertions, 0 deletions
diff --git a/include/py/hikvision/isapi.py b/include/py/hikvision/isapi.py
new file mode 100644
index 0000000..6cc34f8
--- /dev/null
+++ b/include/py/hikvision/isapi.py
@@ -0,0 +1,137 @@
+import requests
+
+from time import time
+from .util import xml_to_dict, sha256_hex
+from ...util import validate_ipv4
+from ...http import HTTPMethod
+from typing import Optional, Union
+
+
+class ResponseError(RuntimeError):
+ pass
+
+
+class AuthError(ResponseError):
+ pass
+
+
+class ISAPIClient:
+ def __init__(self, host):
+ self.host = host
+ self.cookies = {}
+
+ def call(self,
+ path: str,
+ method: HTTPMethod = HTTPMethod.GET,
+ data: Optional[Union[dict, str, bytes]] = None,
+ raise_for_status=True,
+ check_put_response=False):
+ f = getattr(requests, method.value.lower())
+
+ headers = {
+ 'X-Requested-With': 'XMLHttpRequest',
+ }
+ if method in (HTTPMethod.PUT, HTTPMethod.POST):
+ headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
+
+ kwargs = {}
+ if data:
+ kwargs['data' if method is not HTTPMethod.GET else 'params'] = data
+ if len(self.cookies) > 0:
+ kwargs['cookies'] = self.cookies
+
+ r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs)
+
+ if raise_for_status or check_put_response:
+ r.raise_for_status()
+
+ parsed_xml = None
+ if check_put_response:
+ parsed_xml = xml_to_dict(r.text)
+ resp = parsed_xml['ResponseStatus']
+ status_code = int(resp['statusCode'][0])
+ status_string = resp['statusString'][0]
+ if status_code != 1 or status_string.lower() != 'ok':
+ raise ResponseError('response status looks bad')
+
+ self.cookies.update(r.cookies.get_dict())
+
+ if parsed_xml is None:
+ parsed_xml = xml_to_dict(r.text)
+
+ return parsed_xml
+
+ def auth(self, username: str, password: str):
+ xml = self.call('Security/sessionLogin/capabilities', data={'username': username})
+ caps = xml['SessionLoginCap']
+ is_irreversible = caps['isIrreversible'][0].lower() == 'true'
+
+ # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
+ # also look into webAuth.js and utils.js
+
+ if 'salt' in caps and is_irreversible:
+ p = sha256_hex(username + caps['salt'][0] + password)
+ p = sha256_hex(p + caps['challenge'][0])
+ for i in range(int(caps['iterations'][0])-2):
+ p = sha256_hex(p)
+ else:
+ p = sha256_hex(password) + caps['challenge'][0]
+ for i in range(int(caps['iterations'][0])-1):
+ p = sha256_hex(p)
+
+ data = '<SessionLogin>'
+ data += f'<userName>{username}</userName>'
+ data += f'<password>{p}</password>'
+ data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
+ data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
+ data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
+ data += '</SessionLogin>'
+
+ resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin']
+ status_value = int(resp['statusValue'][0])
+ status_string = resp['statusString'][0]
+ if status_value != 200:
+ raise AuthError(f'{status_value}: {status_string}')
+
+ def get_ntp_server(self) -> str:
+ try:
+ # works on newer 1080p cams
+ xml = self.call('System/time/ntpServers/capabilities')
+ ntp_server = xml['NTPServerList']['NTPServer'][0]
+ if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
+ ntp_host = ntp_server['hostName'][0]
+ else:
+ ntp_host = ntp_server['ipAddress'][0]
+
+ except requests.exceptions.HTTPError:
+ # works on older 720p cams
+ ntp_server = self.call('System/time/ntpServers/1')['NTPServer']
+ if ntp_server['addressingFormatType'][0] == 'hostname':
+ ntp_host = ntp_server['hostName'][0]
+ else:
+ ntp_host = ntp_server['ipAddress'][0]
+
+ return ntp_host
+
+ def set_timezone(self):
+ data = '<?xml version="1.0" encoding="UTF-8"?>'
+ data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
+ self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True)
+
+ def set_ntp_server(self,
+ ntp_host: str,
+ ntp_port: int = 123):
+ format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
+
+ # test ntp server first
+ data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
+ resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult']
+ error_code = int(resp['errorCode'][0])
+ error_description = resp['errorDescription'][0]
+ if error_code != 0 or error_description.lower() != 'ok':
+ raise ResponseError('response status looks bad')
+
+ # then set it
+ data = '<?xml version="1.0" encoding="UTF-8"?>'
+ data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
+ self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True)