import requests
from time import time
from .util import xml_to_dict, sha256_hex
from ...util import validate_ipv4
from ...http import HTTPMethod
from typing import Optional, Union
class ResponseError(RuntimeError):
pass
class AuthError(ResponseError):
pass
class ISAPIClient:
def __init__(self, host):
self.host = host
self.cookies = {}
def call(self,
path: str,
method: HTTPMethod = HTTPMethod.GET,
data: Optional[Union[dict, str, bytes]] = None,
raise_for_status=True,
check_put_response=False):
f = getattr(requests, method.value.lower())
headers = {
'X-Requested-With': 'XMLHttpRequest',
}
if method in (HTTPMethod.PUT, HTTPMethod.POST):
headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
kwargs = {}
if data:
kwargs['data' if method is not HTTPMethod.GET else 'params'] = data
if len(self.cookies) > 0:
kwargs['cookies'] = self.cookies
r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs)
if raise_for_status or check_put_response:
r.raise_for_status()
parsed_xml = None
if check_put_response:
parsed_xml = xml_to_dict(r.text)
resp = parsed_xml['ResponseStatus']
status_code = int(resp['statusCode'][0])
status_string = resp['statusString'][0]
if status_code != 1 or status_string.lower() != 'ok':
raise ResponseError('response status looks bad')
self.cookies.update(r.cookies.get_dict())
if parsed_xml is None:
parsed_xml = xml_to_dict(r.text)
return parsed_xml
def auth(self, username: str, password: str):
xml = self.call('Security/sessionLogin/capabilities', data={'username': username})
caps = xml['SessionLoginCap']
is_irreversible = caps['isIrreversible'][0].lower() == 'true'
# https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
# also look into webAuth.js and utils.js
if 'salt' in caps and is_irreversible:
p = sha256_hex(username + caps['salt'][0] + password)
p = sha256_hex(p + caps['challenge'][0])
for i in range(int(caps['iterations'][0])-2):
p = sha256_hex(p)
else:
p = sha256_hex(password) + caps['challenge'][0]
for i in range(int(caps['iterations'][0])-1):
p = sha256_hex(p)
data = ''
data += f'{username}'
data += f'{p}'
data += f'{caps["sessionID"][0]}'
data += 'false'
data += f'{caps["sessionIDVersion"][0]}'
data += ''
resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin']
status_value = int(resp['statusValue'][0])
status_string = resp['statusString'][0]
if status_value != 200:
raise AuthError(f'{status_value}: {status_string}')
def get_ntp_server(self) -> str:
try:
# works on newer 1080p cams
xml = self.call('System/time/ntpServers/capabilities')
ntp_server = xml['NTPServerList']['NTPServer'][0]
if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
ntp_host = ntp_server['hostName'][0]
else:
ntp_host = ntp_server['ipAddress'][0]
except requests.exceptions.HTTPError:
# works on older 720p cams
ntp_server = self.call('System/time/ntpServers/1')['NTPServer']
if ntp_server['addressingFormatType'][0] == 'hostname':
ntp_host = ntp_server['hostName'][0]
else:
ntp_host = ntp_server['ipAddress'][0]
return ntp_host
def set_timezone(self):
data = ''
data += ''
self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True)
def set_ntp_server(self,
ntp_host: str,
ntp_port: int = 123):
format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
# test ntp server first
data = f'{format}{ntp_host}{ntp_port}'
resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult']
error_code = int(resp['errorCode'][0])
error_description = resp['errorDescription'][0]
if error_code != 0 or error_description.lower() != 'ok':
raise ResponseError('response status looks bad')
# then set it
data = ''
data += f'1{format}{ntp_host}{ntp_port}1440'
self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True)