diff options
-rwxr-xr-x | bin/ipcam_ntp_util.py | 199 | ||||
-rw-r--r-- | include/py/homekit/camera/config.py | 9 | ||||
-rw-r--r-- | include/py/homekit/config/_configs.py | 1 | ||||
-rw-r--r-- | include/py/homekit/util.py | 9 |
4 files changed, 218 insertions, 0 deletions
diff --git a/bin/ipcam_ntp_util.py b/bin/ipcam_ntp_util.py new file mode 100755 index 0000000..98639bd --- /dev/null +++ b/bin/ipcam_ntp_util.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +import __py_include +import requests +import hashlib +import xml.etree.ElementTree as ET + +from time import time +from argparse import ArgumentParser, ArgumentError +from homekit.util import validate_ipv4, validate_ipv4_or_hostname +from homekit.camera import IpcamConfig + + +def xml_to_dict(xml_data: str) -> dict: + # Parse the XML data + root = ET.fromstring(xml_data) + + # Function to remove namespace from the tag name + def remove_namespace(tag): + return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace + + # Function to recursively convert XML elements to a dictionary + def elem_to_dict(elem): + tag = remove_namespace(elem.tag) + elem_dict = {tag: {}} + + # If the element has attributes, add them to the dictionary + elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()}) + + # Handle the element's text content, if present and not just whitespace + text = elem.text.strip() if elem.text and elem.text.strip() else None + if text: + elem_dict[tag]['#text'] = text + + # Process child elements + for child in elem: + child_dict = elem_to_dict(child) + child_tag = remove_namespace(child.tag) + if child_tag not in elem_dict[tag]: + elem_dict[tag][child_tag] = [] + elem_dict[tag][child_tag].append(child_dict[child_tag]) + + # Simplify structure if there's only text or no children and no attributes + if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]: + return {tag: elem_dict[tag]['#text']} + elif not elem_dict[tag]: + return {tag: ''} + + return elem_dict + + # Convert the root element to dictionary + return elem_to_dict(root) + + +def sha256_hex(input_string: str) -> str: + return hashlib.sha256(input_string.encode()).hexdigest() + + +class ResponseError(RuntimeError): + pass + + +class AuthError(ResponseError): + pass + + +class HikvisionISAPIClient: + def __init__(self, host): + self.host = host + self.cookies = {} + + def auth(self, username: str, password: str): + r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'), + {'username': username}, + headers={ + 'X-Requested-With': 'XMLHttpRequest', + }) + r.raise_for_status() + caps = xml_to_dict(r.text)['SessionLoginCap'] + is_irreversible = caps['isIrreversible'][0].lower() == 'true' + + # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl + # also look into webAuth.js and utils.js + + if 'salt' in caps and is_irreversible: + p = sha256_hex(username + caps['salt'][0] + password) + p = sha256_hex(p + caps['challenge'][0]) + for i in range(int(caps['iterations'][0])-2): + p = sha256_hex(p) + else: + p = sha256_hex(password) + caps['challenge'][0] + for i in range(int(caps['iterations'][0])-1): + p = sha256_hex(p) + + data = '<SessionLogin>' + data += f'<userName>{username}</userName>' + data += f'<password>{p}</password>' + data += f'<sessionID>{caps["sessionID"][0]}</sessionID>' + data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>' + data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>' + data += '</SessionLogin>' + + r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={ + 'Accept-Encoding': 'gzip, deflate', + 'If-Modified-Since': '0', + 'X-Requested-With': 'XMLHttpRequest', + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', + }) + r.raise_for_status() + resp = xml_to_dict(r.text)['SessionLogin'] + status_value = int(resp['statusValue'][0]) + status_string = resp['statusString'][0] + if status_value != 200: + raise AuthError(f'{status_value}: {status_string}') + + self.cookies = r.cookies.get_dict() + + def get_ntp_server(self) -> str: + r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies) + r.raise_for_status() + ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0] + + if ntp_server['addressingFormatType'][0]['#text'] == 'hostname': + ntp_host = ntp_server['hostName'][0] + else: + ntp_host = ntp_server['ipAddress'][0] + + return ntp_host + + def set_timezone(self): + data = '<?xml version="1.0" encoding="UTF-8"?>' + data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>' + + r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={ + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8' + }) + self.isapi_check_put_response(r) + + def set_ntp_server(self, ntp_host: str, ntp_port: int = 123): + format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname' + + data = '<?xml version="1.0" encoding="UTF-8"?>' + data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>' + + r = requests.put(self.isapi_uri('System/time/ntpServers/1'), + data=data, + cookies=self.cookies, + headers={ + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8' + }) + self.isapi_check_put_response(r) + + def isapi_uri(self, path: str) -> str: + return f'http://{self.host}/ISAPI/{path}' + + def isapi_check_put_response(self, r): + r.raise_for_status() + resp = xml_to_dict(r.text)['ResponseStatus'] + + status_code = int(resp['statusCode'][0]) + status_string = resp['statusString'][0] + + if status_code != 1 or status_string.lower() != 'ok': + raise ResponseError('response status looks bad') + + +def main(): + parser = ArgumentParser() + parser.add_argument('--host', type=str, required=True) + parser.add_argument('--get-ntp-server', action='store_true') + parser.add_argument('--set-ntp-server', type=str) + parser.add_argument('--username', type=str) + parser.add_argument('--password', type=str) + args = parser.parse_args() + + if not args.get_ntp_server and not args.set_ntp_server: + raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required') + + ipcam_config = IpcamConfig() + login = args.username if args.username else ipcam_config['web_creds']['login'] + password = args.password if args.password else ipcam_config['web_creds']['password'] + + client = HikvisionISAPIClient(args.host) + client.auth(args.username, args.password) + + if args.get_ntp_server: + print(client.get_ntp_server()) + return + + if not args.set_ntp_server: + raise ArgumentError(None, '--set-ntp-server is required') + + if not validate_ipv4_or_hostname(args.set_ntp_server): + raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname') + + client.set_ntp_server(args.set_ntp_server) + + +if __name__ == '__main__': + main()
\ No newline at end of file diff --git a/include/py/homekit/camera/config.py b/include/py/homekit/camera/config.py index 8e9bfd5..8aeb392 100644 --- a/include/py/homekit/camera/config.py +++ b/include/py/homekit/camera/config.py @@ -73,6 +73,15 @@ class IpcamConfig(ConfigUnit): 'login': {'type': 'string', 'required': True}, 'password': {'type': 'string', 'required': True}, } + }, + + 'web_creds': { + 'required': True, + 'type': 'dict', + 'schema': { + 'login': {'type': 'string', 'required': True}, + 'password': {'type': 'string', 'required': True}, + } } } diff --git a/include/py/homekit/config/_configs.py b/include/py/homekit/config/_configs.py index f88c8ea..2cd2aca 100644 --- a/include/py/homekit/config/_configs.py +++ b/include/py/homekit/config/_configs.py @@ -26,6 +26,7 @@ class LinuxBoardsConfig(ConfigUnit): 'schema': { 'mdns': {'type': 'string', 'required': True}, 'board': {'type': 'string', 'required': True}, + 'location': {'type': 'string', 'required': True}, 'network': { 'type': 'list', 'required': True, diff --git a/include/py/homekit/util.py b/include/py/homekit/util.py index 78a78a0..2b06600 100644 --- a/include/py/homekit/util.py +++ b/include/py/homekit/util.py @@ -10,6 +10,7 @@ import string import random import re import os +import ipaddress from enum import Enum from datetime import datetime @@ -37,6 +38,14 @@ def validate_ipv4_or_hostname(address: str, raise_exception: bool = False) -> bo return False +def validate_ipv4(address: str) -> bool: + try: + ipaddress.IPv6Address(address) + return True + except ipaddress.AddressValueError: + return False + + def validate_mac_address(mac_address: str) -> bool: mac_pattern = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$' if re.match(mac_pattern, mac_address): |