1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
import requests
from time import time
from .util import xml_to_dict, sha256_hex
from ...util import validate_ipv4
from ...http import HTTPMethod
from typing import Optional, Union
class ResponseError(RuntimeError):
pass
class AuthError(ResponseError):
pass
class ISAPIClient:
def __init__(self, host):
self.host = host
self.cookies = {}
def call(self,
path: str,
method: HTTPMethod = HTTPMethod.GET,
data: Optional[Union[dict, str, bytes]] = None,
raise_for_status=True,
check_put_response=False):
f = getattr(requests, method.value.lower())
headers = {
'X-Requested-With': 'XMLHttpRequest',
}
if method in (HTTPMethod.PUT, HTTPMethod.POST):
headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
kwargs = {}
if data:
kwargs['data' if method is not HTTPMethod.GET else 'params'] = data
if len(self.cookies) > 0:
kwargs['cookies'] = self.cookies
r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs)
if raise_for_status or check_put_response:
r.raise_for_status()
parsed_xml = None
if check_put_response:
parsed_xml = xml_to_dict(r.text)
resp = parsed_xml['ResponseStatus']
status_code = int(resp['statusCode'][0])
status_string = resp['statusString'][0]
if status_code != 1 or status_string.lower() != 'ok':
raise ResponseError('response status looks bad')
self.cookies.update(r.cookies.get_dict())
if parsed_xml is None:
parsed_xml = xml_to_dict(r.text)
return parsed_xml
def auth(self, username: str, password: str):
xml = self.call('Security/sessionLogin/capabilities', data={'username': username})
caps = xml['SessionLoginCap']
is_irreversible = caps['isIrreversible'][0].lower() == 'true'
# https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
# also look into webAuth.js and utils.js
if 'salt' in caps and is_irreversible:
p = sha256_hex(username + caps['salt'][0] + password)
p = sha256_hex(p + caps['challenge'][0])
for i in range(int(caps['iterations'][0])-2):
p = sha256_hex(p)
else:
p = sha256_hex(password) + caps['challenge'][0]
for i in range(int(caps['iterations'][0])-1):
p = sha256_hex(p)
data = '<SessionLogin>'
data += f'<userName>{username}</userName>'
data += f'<password>{p}</password>'
data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
data += '</SessionLogin>'
resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin']
status_value = int(resp['statusValue'][0])
status_string = resp['statusString'][0]
if status_value != 200:
raise AuthError(f'{status_value}: {status_string}')
def get_ntp_server(self) -> str:
try:
# works on newer 1080p cams
xml = self.call('System/time/ntpServers/capabilities')
ntp_server = xml['NTPServerList']['NTPServer'][0]
if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
ntp_host = ntp_server['hostName'][0]
else:
ntp_host = ntp_server['ipAddress'][0]
except requests.exceptions.HTTPError:
# works on older 720p cams
ntp_server = self.call('System/time/ntpServers/1')['NTPServer']
if ntp_server['addressingFormatType'][0] == 'hostname':
ntp_host = ntp_server['hostName'][0]
else:
ntp_host = ntp_server['ipAddress'][0]
return ntp_host
def set_timezone(self):
data = '<?xml version="1.0" encoding="UTF-8"?>'
data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True)
def set_ntp_server(self,
ntp_host: str,
ntp_port: int = 123):
format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
# test ntp server first
data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult']
error_code = int(resp['errorCode'][0])
error_description = resp['errorDescription'][0]
if error_code != 0 or error_description.lower() != 'ok':
raise ResponseError('response status looks bad')
# then set it
data = '<?xml version="1.0" encoding="UTF-8"?>'
data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True)
|