summaryrefslogtreecommitdiff
path: root/include/py/hikvision
diff options
context:
space:
mode:
Diffstat (limited to 'include/py/hikvision')
-rw-r--r--include/py/hikvision/__init__.py1
-rw-r--r--include/py/hikvision/isapi.py137
-rw-r--r--include/py/hikvision/util.py48
3 files changed, 186 insertions, 0 deletions
diff --git a/include/py/hikvision/__init__.py b/include/py/hikvision/__init__.py
new file mode 100644
index 0000000..72d6ae3
--- /dev/null
+++ b/include/py/hikvision/__init__.py
@@ -0,0 +1 @@
+from .isapi import ISAPIClient, ResponseError, AuthError
diff --git a/include/py/hikvision/isapi.py b/include/py/hikvision/isapi.py
new file mode 100644
index 0000000..6cc34f8
--- /dev/null
+++ b/include/py/hikvision/isapi.py
@@ -0,0 +1,137 @@
+import requests
+
+from time import time
+from .util import xml_to_dict, sha256_hex
+from ...util import validate_ipv4
+from ...http import HTTPMethod
+from typing import Optional, Union
+
+
+class ResponseError(RuntimeError):
+ pass
+
+
+class AuthError(ResponseError):
+ pass
+
+
+class ISAPIClient:
+ def __init__(self, host):
+ self.host = host
+ self.cookies = {}
+
+ def call(self,
+ path: str,
+ method: HTTPMethod = HTTPMethod.GET,
+ data: Optional[Union[dict, str, bytes]] = None,
+ raise_for_status=True,
+ check_put_response=False):
+ f = getattr(requests, method.value.lower())
+
+ headers = {
+ 'X-Requested-With': 'XMLHttpRequest',
+ }
+ if method in (HTTPMethod.PUT, HTTPMethod.POST):
+ headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
+
+ kwargs = {}
+ if data:
+ kwargs['data' if method is not HTTPMethod.GET else 'params'] = data
+ if len(self.cookies) > 0:
+ kwargs['cookies'] = self.cookies
+
+ r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs)
+
+ if raise_for_status or check_put_response:
+ r.raise_for_status()
+
+ parsed_xml = None
+ if check_put_response:
+ parsed_xml = xml_to_dict(r.text)
+ resp = parsed_xml['ResponseStatus']
+ status_code = int(resp['statusCode'][0])
+ status_string = resp['statusString'][0]
+ if status_code != 1 or status_string.lower() != 'ok':
+ raise ResponseError('response status looks bad')
+
+ self.cookies.update(r.cookies.get_dict())
+
+ if parsed_xml is None:
+ parsed_xml = xml_to_dict(r.text)
+
+ return parsed_xml
+
+ def auth(self, username: str, password: str):
+ xml = self.call('Security/sessionLogin/capabilities', data={'username': username})
+ caps = xml['SessionLoginCap']
+ is_irreversible = caps['isIrreversible'][0].lower() == 'true'
+
+ # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
+ # also look into webAuth.js and utils.js
+
+ if 'salt' in caps and is_irreversible:
+ p = sha256_hex(username + caps['salt'][0] + password)
+ p = sha256_hex(p + caps['challenge'][0])
+ for i in range(int(caps['iterations'][0])-2):
+ p = sha256_hex(p)
+ else:
+ p = sha256_hex(password) + caps['challenge'][0]
+ for i in range(int(caps['iterations'][0])-1):
+ p = sha256_hex(p)
+
+ data = '<SessionLogin>'
+ data += f'<userName>{username}</userName>'
+ data += f'<password>{p}</password>'
+ data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
+ data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
+ data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
+ data += '</SessionLogin>'
+
+ resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin']
+ status_value = int(resp['statusValue'][0])
+ status_string = resp['statusString'][0]
+ if status_value != 200:
+ raise AuthError(f'{status_value}: {status_string}')
+
+ def get_ntp_server(self) -> str:
+ try:
+ # works on newer 1080p cams
+ xml = self.call('System/time/ntpServers/capabilities')
+ ntp_server = xml['NTPServerList']['NTPServer'][0]
+ if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
+ ntp_host = ntp_server['hostName'][0]
+ else:
+ ntp_host = ntp_server['ipAddress'][0]
+
+ except requests.exceptions.HTTPError:
+ # works on older 720p cams
+ ntp_server = self.call('System/time/ntpServers/1')['NTPServer']
+ if ntp_server['addressingFormatType'][0] == 'hostname':
+ ntp_host = ntp_server['hostName'][0]
+ else:
+ ntp_host = ntp_server['ipAddress'][0]
+
+ return ntp_host
+
+ def set_timezone(self):
+ data = '<?xml version="1.0" encoding="UTF-8"?>'
+ data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
+ self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True)
+
+ def set_ntp_server(self,
+ ntp_host: str,
+ ntp_port: int = 123):
+ format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
+
+ # test ntp server first
+ data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
+ resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult']
+ error_code = int(resp['errorCode'][0])
+ error_description = resp['errorDescription'][0]
+ if error_code != 0 or error_description.lower() != 'ok':
+ raise ResponseError('response status looks bad')
+
+ # then set it
+ data = '<?xml version="1.0" encoding="UTF-8"?>'
+ data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
+ self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True)
diff --git a/include/py/hikvision/util.py b/include/py/hikvision/util.py
new file mode 100644
index 0000000..581c6ea
--- /dev/null
+++ b/include/py/hikvision/util.py
@@ -0,0 +1,48 @@
+import requests
+import hashlib
+import xml.etree.ElementTree as ET
+
+
+def xml_to_dict(xml_data: str) -> dict:
+ # Parse the XML data
+ root = ET.fromstring(xml_data)
+
+ # Function to remove namespace from the tag name
+ def remove_namespace(tag):
+ return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
+
+ # Function to recursively convert XML elements to a dictionary
+ def elem_to_dict(elem):
+ tag = remove_namespace(elem.tag)
+ elem_dict = {tag: {}}
+
+ # If the element has attributes, add them to the dictionary
+ elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
+
+ # Handle the element's text content, if present and not just whitespace
+ text = elem.text.strip() if elem.text and elem.text.strip() else None
+ if text:
+ elem_dict[tag]['#text'] = text
+
+ # Process child elements
+ for child in elem:
+ child_dict = elem_to_dict(child)
+ child_tag = remove_namespace(child.tag)
+ if child_tag not in elem_dict[tag]:
+ elem_dict[tag][child_tag] = []
+ elem_dict[tag][child_tag].append(child_dict[child_tag])
+
+ # Simplify structure if there's only text or no children and no attributes
+ if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
+ return {tag: elem_dict[tag]['#text']}
+ elif not elem_dict[tag]:
+ return {tag: ''}
+
+ return elem_dict
+
+ # Convert the root element to dictionary
+ return elem_to_dict(root)
+
+
+def sha256_hex(input_string: str) -> str:
+ return hashlib.sha256(input_string.encode()).hexdigest()