#!/usr/bin/env python3
import __py_include
import requests
import hashlib
import xml.etree.ElementTree as ET
from enum import Enum, auto
from time import time
from typing import Optional
from argparse import ArgumentParser, ArgumentError
from homekit.util import validate_ipv4, validate_ipv4_or_hostname
from homekit.camera import IpcamConfig
ipcam_config = IpcamConfig()
class Action(Enum):
GET_NTP = auto()
SET_NTP = auto()
def xml_to_dict(xml_data: str) -> dict:
# Parse the XML data
root = ET.fromstring(xml_data)
# Function to remove namespace from the tag name
def remove_namespace(tag):
return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
# Function to recursively convert XML elements to a dictionary
def elem_to_dict(elem):
tag = remove_namespace(elem.tag)
elem_dict = {tag: {}}
# If the element has attributes, add them to the dictionary
elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
# Handle the element's text content, if present and not just whitespace
text = elem.text.strip() if elem.text and elem.text.strip() else None
if text:
elem_dict[tag]['#text'] = text
# Process child elements
for child in elem:
child_dict = elem_to_dict(child)
child_tag = remove_namespace(child.tag)
if child_tag not in elem_dict[tag]:
elem_dict[tag][child_tag] = []
elem_dict[tag][child_tag].append(child_dict[child_tag])
# Simplify structure if there's only text or no children and no attributes
if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
return {tag: elem_dict[tag]['#text']}
elif not elem_dict[tag]:
return {tag: ''}
return elem_dict
# Convert the root element to dictionary
return elem_to_dict(root)
def sha256_hex(input_string: str) -> str:
return hashlib.sha256(input_string.encode()).hexdigest()
class ResponseError(RuntimeError):
pass
class AuthError(ResponseError):
pass
class HikvisionISAPIClient:
def __init__(self, host):
self.host = host
self.cookies = {}
def auth(self, username: str, password: str):
r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'),
{'username': username},
headers={
'X-Requested-With': 'XMLHttpRequest',
})
r.raise_for_status()
caps = xml_to_dict(r.text)['SessionLoginCap']
is_irreversible = caps['isIrreversible'][0].lower() == 'true'
# https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
# also look into webAuth.js and utils.js
if 'salt' in caps and is_irreversible:
p = sha256_hex(username + caps['salt'][0] + password)
p = sha256_hex(p + caps['challenge'][0])
for i in range(int(caps['iterations'][0])-2):
p = sha256_hex(p)
else:
p = sha256_hex(password) + caps['challenge'][0]
for i in range(int(caps['iterations'][0])-1):
p = sha256_hex(p)
data = ''
data += f'{username}'
data += f'{p}'
data += f'{caps["sessionID"][0]}'
data += 'false'
data += f'{caps["sessionIDVersion"][0]}'
data += ''
r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={
'Accept-Encoding': 'gzip, deflate',
'If-Modified-Since': '0',
'X-Requested-With': 'XMLHttpRequest',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
})
r.raise_for_status()
resp = xml_to_dict(r.text)['SessionLogin']
status_value = int(resp['statusValue'][0])
status_string = resp['statusString'][0]
if status_value != 200:
raise AuthError(f'{status_value}: {status_string}')
self.cookies = r.cookies.get_dict()
def get_ntp_server(self) -> str:
r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies)
r.raise_for_status()
ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0]
if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
ntp_host = ntp_server['hostName'][0]
else:
ntp_host = ntp_server['ipAddress'][0]
return ntp_host
def set_timezone(self):
data = ''
data += ''
r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
})
self.isapi_check_put_response(r)
def set_ntp_server(self,
ntp_host: str,
ntp_port: int = 123):
format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
# test ntp server first
data = f'{format}{ntp_host}{ntp_port}'
r = requests.post(self.isapi_uri('System/time/ntpServers/test'), data=data, headers={
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
}, cookies=self.cookies)
r.raise_for_status()
resp = xml_to_dict(r.text)['NTPTestResult']
error_code = int(resp['errorCode'][0])
error_description = resp['errorDescription'][0]
if error_code != 0 or error_description.lower() != 'ok':
raise ResponseError('response status looks bad')
# then set it
data = ''
data += f'1{format}{ntp_host}{ntp_port}1440'
r = requests.put(self.isapi_uri('System/time/ntpServers/1'),
data=data,
cookies=self.cookies,
headers={
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
})
self.isapi_check_put_response(r)
def isapi_uri(self, path: str) -> str:
return f'http://{self.host}/ISAPI/{path}'
def isapi_check_put_response(self, r):
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
# print(r.text)
raise e
resp = xml_to_dict(r.text)['ResponseStatus']
status_code = int(resp['statusCode'][0])
status_string = resp['statusString'][0]
if status_code != 1 or status_string.lower() != 'ok':
raise ResponseError('response status looks bad')
def process_hikvision_camera(host: str,
action: Action,
login: str,
password: str,
ntp_server: Optional[str] = None):
client = HikvisionISAPIClient(host)
try:
client.auth(login, password)
if action == Action.GET_NTP:
print(f'[{host}] {client.get_ntp_server()}')
return
client.set_ntp_server(ntp_server)
print(f'[{host}] done')
except AuthError as e:
print(f'[{host}] ({str(e)})')
except ResponseError as e:
print(f'[{host}] ({str(e)})')
def main():
parser = ArgumentParser()
parser.add_argument('--host', type=str)
parser.add_argument('--all', action='store_true')
parser.add_argument('--get-ntp-server', action='store_true')
parser.add_argument('--set-ntp-server', type=str)
parser.add_argument('--username', type=str)
parser.add_argument('--password', type=str)
args = parser.parse_args()
if not args.host and not args.all:
raise ArgumentError(None, 'either --all or --host is required')
if not args.get_ntp_server and not args.set_ntp_server:
raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required')
action = Action.GET_NTP if args.get_ntp_server else Action.SET_NTP
login = args.username if args.username else ipcam_config['web_creds']['login']
password = args.password if args.password else ipcam_config['web_creds']['password']
if action == Action.SET_NTP:
if not args.set_ntp_server:
raise ArgumentError(None, '--set-ntp-server is required')
if not validate_ipv4_or_hostname(args.set_ntp_server):
raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname')
kwargs = {}
if args.set_ntp_server:
kwargs['ntp_server'] = args.set_ntp_server
if not args.all:
process_hikvision_camera(args.host, action, login, password, **kwargs)
else:
for cam in ipcam_config.get_all_cam_names():
if not ipcam_config.get_camera_type(cam).is_hikvision():
continue
process_hikvision_camera(ipcam_config.get_camera_ip(cam), action, login, password, **kwargs)
if __name__ == '__main__':
main()