1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
|
#!/usr/bin/env python3
import __py_include
import requests
import hashlib
import xml.etree.ElementTree as ET
from time import time
from argparse import ArgumentParser, ArgumentError
from homekit.util import validate_ipv4, validate_ipv4_or_hostname
from homekit.camera import IpcamConfig
def xml_to_dict(xml_data: str) -> dict:
# Parse the XML data
root = ET.fromstring(xml_data)
# Function to remove namespace from the tag name
def remove_namespace(tag):
return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
# Function to recursively convert XML elements to a dictionary
def elem_to_dict(elem):
tag = remove_namespace(elem.tag)
elem_dict = {tag: {}}
# If the element has attributes, add them to the dictionary
elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
# Handle the element's text content, if present and not just whitespace
text = elem.text.strip() if elem.text and elem.text.strip() else None
if text:
elem_dict[tag]['#text'] = text
# Process child elements
for child in elem:
child_dict = elem_to_dict(child)
child_tag = remove_namespace(child.tag)
if child_tag not in elem_dict[tag]:
elem_dict[tag][child_tag] = []
elem_dict[tag][child_tag].append(child_dict[child_tag])
# Simplify structure if there's only text or no children and no attributes
if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
return {tag: elem_dict[tag]['#text']}
elif not elem_dict[tag]:
return {tag: ''}
return elem_dict
# Convert the root element to dictionary
return elem_to_dict(root)
def sha256_hex(input_string: str) -> str:
return hashlib.sha256(input_string.encode()).hexdigest()
class ResponseError(RuntimeError):
pass
class AuthError(ResponseError):
pass
class HikvisionISAPIClient:
def __init__(self, host):
self.host = host
self.cookies = {}
def auth(self, username: str, password: str):
r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'),
{'username': username},
headers={
'X-Requested-With': 'XMLHttpRequest',
})
r.raise_for_status()
caps = xml_to_dict(r.text)['SessionLoginCap']
is_irreversible = caps['isIrreversible'][0].lower() == 'true'
# https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
# also look into webAuth.js and utils.js
if 'salt' in caps and is_irreversible:
p = sha256_hex(username + caps['salt'][0] + password)
p = sha256_hex(p + caps['challenge'][0])
for i in range(int(caps['iterations'][0])-2):
p = sha256_hex(p)
else:
p = sha256_hex(password) + caps['challenge'][0]
for i in range(int(caps['iterations'][0])-1):
p = sha256_hex(p)
data = '<SessionLogin>'
data += f'<userName>{username}</userName>'
data += f'<password>{p}</password>'
data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
data += '</SessionLogin>'
r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={
'Accept-Encoding': 'gzip, deflate',
'If-Modified-Since': '0',
'X-Requested-With': 'XMLHttpRequest',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
})
r.raise_for_status()
resp = xml_to_dict(r.text)['SessionLogin']
status_value = int(resp['statusValue'][0])
status_string = resp['statusString'][0]
if status_value != 200:
raise AuthError(f'{status_value}: {status_string}')
self.cookies = r.cookies.get_dict()
def get_ntp_server(self) -> str:
r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies)
r.raise_for_status()
ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0]
if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
ntp_host = ntp_server['hostName'][0]
else:
ntp_host = ntp_server['ipAddress'][0]
return ntp_host
def set_timezone(self):
data = '<?xml version="1.0" encoding="UTF-8"?>'
data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
})
self.isapi_check_put_response(r)
def set_ntp_server(self, ntp_host: str, ntp_port: int = 123):
format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
data = '<?xml version="1.0" encoding="UTF-8"?>'
data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
r = requests.put(self.isapi_uri('System/time/ntpServers/1'),
data=data,
cookies=self.cookies,
headers={
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
})
self.isapi_check_put_response(r)
def isapi_uri(self, path: str) -> str:
return f'http://{self.host}/ISAPI/{path}'
def isapi_check_put_response(self, r):
r.raise_for_status()
resp = xml_to_dict(r.text)['ResponseStatus']
status_code = int(resp['statusCode'][0])
status_string = resp['statusString'][0]
if status_code != 1 or status_string.lower() != 'ok':
raise ResponseError('response status looks bad')
def main():
parser = ArgumentParser()
parser.add_argument('--host', type=str, required=True)
parser.add_argument('--get-ntp-server', action='store_true')
parser.add_argument('--set-ntp-server', type=str)
parser.add_argument('--username', type=str)
parser.add_argument('--password', type=str)
args = parser.parse_args()
if not args.get_ntp_server and not args.set_ntp_server:
raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required')
ipcam_config = IpcamConfig()
login = args.username if args.username else ipcam_config['web_creds']['login']
password = args.password if args.password else ipcam_config['web_creds']['password']
client = HikvisionISAPIClient(args.host)
client.auth(args.username, args.password)
if args.get_ntp_server:
print(client.get_ntp_server())
return
if not args.set_ntp_server:
raise ArgumentError(None, '--set-ntp-server is required')
if not validate_ipv4_or_hostname(args.set_ntp_server):
raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname')
client.set_ntp_server(args.set_ntp_server)
if __name__ == '__main__':
main()
|