aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2024-02-17 02:41:37 +0300
committerEvgeny Zinoviev <me@ch1p.io>2024-02-17 02:41:37 +0300
commit05c85757b8e2340441057d9ddfde2e9649ae8676 (patch)
treefa03fc1c81511bdbc3c464b403b13f6a4fb23514
parentd237e81873a9e043f579e7f6a979f00510ddce08 (diff)
save
-rwxr-xr-xbin/ipcam_ntp_util.py199
-rw-r--r--include/py/homekit/camera/config.py9
-rw-r--r--include/py/homekit/config/_configs.py1
-rw-r--r--include/py/homekit/util.py9
4 files changed, 218 insertions, 0 deletions
diff --git a/bin/ipcam_ntp_util.py b/bin/ipcam_ntp_util.py
new file mode 100755
index 0000000..98639bd
--- /dev/null
+++ b/bin/ipcam_ntp_util.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python3
+import __py_include
+import requests
+import hashlib
+import xml.etree.ElementTree as ET
+
+from time import time
+from argparse import ArgumentParser, ArgumentError
+from homekit.util import validate_ipv4, validate_ipv4_or_hostname
+from homekit.camera import IpcamConfig
+
+
+def xml_to_dict(xml_data: str) -> dict:
+ # Parse the XML data
+ root = ET.fromstring(xml_data)
+
+ # Function to remove namespace from the tag name
+ def remove_namespace(tag):
+ return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
+
+ # Function to recursively convert XML elements to a dictionary
+ def elem_to_dict(elem):
+ tag = remove_namespace(elem.tag)
+ elem_dict = {tag: {}}
+
+ # If the element has attributes, add them to the dictionary
+ elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
+
+ # Handle the element's text content, if present and not just whitespace
+ text = elem.text.strip() if elem.text and elem.text.strip() else None
+ if text:
+ elem_dict[tag]['#text'] = text
+
+ # Process child elements
+ for child in elem:
+ child_dict = elem_to_dict(child)
+ child_tag = remove_namespace(child.tag)
+ if child_tag not in elem_dict[tag]:
+ elem_dict[tag][child_tag] = []
+ elem_dict[tag][child_tag].append(child_dict[child_tag])
+
+ # Simplify structure if there's only text or no children and no attributes
+ if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
+ return {tag: elem_dict[tag]['#text']}
+ elif not elem_dict[tag]:
+ return {tag: ''}
+
+ return elem_dict
+
+ # Convert the root element to dictionary
+ return elem_to_dict(root)
+
+
+def sha256_hex(input_string: str) -> str:
+ return hashlib.sha256(input_string.encode()).hexdigest()
+
+
+class ResponseError(RuntimeError):
+ pass
+
+
+class AuthError(ResponseError):
+ pass
+
+
+class HikvisionISAPIClient:
+ def __init__(self, host):
+ self.host = host
+ self.cookies = {}
+
+ def auth(self, username: str, password: str):
+ r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'),
+ {'username': username},
+ headers={
+ 'X-Requested-With': 'XMLHttpRequest',
+ })
+ r.raise_for_status()
+ caps = xml_to_dict(r.text)['SessionLoginCap']
+ is_irreversible = caps['isIrreversible'][0].lower() == 'true'
+
+ # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
+ # also look into webAuth.js and utils.js
+
+ if 'salt' in caps and is_irreversible:
+ p = sha256_hex(username + caps['salt'][0] + password)
+ p = sha256_hex(p + caps['challenge'][0])
+ for i in range(int(caps['iterations'][0])-2):
+ p = sha256_hex(p)
+ else:
+ p = sha256_hex(password) + caps['challenge'][0]
+ for i in range(int(caps['iterations'][0])-1):
+ p = sha256_hex(p)
+
+ data = '<SessionLogin>'
+ data += f'<userName>{username}</userName>'
+ data += f'<password>{p}</password>'
+ data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
+ data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
+ data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
+ data += '</SessionLogin>'
+
+ r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={
+ 'Accept-Encoding': 'gzip, deflate',
+ 'If-Modified-Since': '0',
+ 'X-Requested-With': 'XMLHttpRequest',
+ 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
+ })
+ r.raise_for_status()
+ resp = xml_to_dict(r.text)['SessionLogin']
+ status_value = int(resp['statusValue'][0])
+ status_string = resp['statusString'][0]
+ if status_value != 200:
+ raise AuthError(f'{status_value}: {status_string}')
+
+ self.cookies = r.cookies.get_dict()
+
+ def get_ntp_server(self) -> str:
+ r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies)
+ r.raise_for_status()
+ ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0]
+
+ if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
+ ntp_host = ntp_server['hostName'][0]
+ else:
+ ntp_host = ntp_server['ipAddress'][0]
+
+ return ntp_host
+
+ def set_timezone(self):
+ data = '<?xml version="1.0" encoding="UTF-8"?>'
+ data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
+
+ r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={
+ 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
+ })
+ self.isapi_check_put_response(r)
+
+ def set_ntp_server(self, ntp_host: str, ntp_port: int = 123):
+ format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
+
+ data = '<?xml version="1.0" encoding="UTF-8"?>'
+ data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
+
+ r = requests.put(self.isapi_uri('System/time/ntpServers/1'),
+ data=data,
+ cookies=self.cookies,
+ headers={
+ 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
+ })
+ self.isapi_check_put_response(r)
+
+ def isapi_uri(self, path: str) -> str:
+ return f'http://{self.host}/ISAPI/{path}'
+
+ def isapi_check_put_response(self, r):
+ r.raise_for_status()
+ resp = xml_to_dict(r.text)['ResponseStatus']
+
+ status_code = int(resp['statusCode'][0])
+ status_string = resp['statusString'][0]
+
+ if status_code != 1 or status_string.lower() != 'ok':
+ raise ResponseError('response status looks bad')
+
+
+def main():
+ parser = ArgumentParser()
+ parser.add_argument('--host', type=str, required=True)
+ parser.add_argument('--get-ntp-server', action='store_true')
+ parser.add_argument('--set-ntp-server', type=str)
+ parser.add_argument('--username', type=str)
+ parser.add_argument('--password', type=str)
+ args = parser.parse_args()
+
+ if not args.get_ntp_server and not args.set_ntp_server:
+ raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required')
+
+ ipcam_config = IpcamConfig()
+ login = args.username if args.username else ipcam_config['web_creds']['login']
+ password = args.password if args.password else ipcam_config['web_creds']['password']
+
+ client = HikvisionISAPIClient(args.host)
+ client.auth(args.username, args.password)
+
+ if args.get_ntp_server:
+ print(client.get_ntp_server())
+ return
+
+ if not args.set_ntp_server:
+ raise ArgumentError(None, '--set-ntp-server is required')
+
+ if not validate_ipv4_or_hostname(args.set_ntp_server):
+ raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname')
+
+ client.set_ntp_server(args.set_ntp_server)
+
+
+if __name__ == '__main__':
+ main() \ No newline at end of file
diff --git a/include/py/homekit/camera/config.py b/include/py/homekit/camera/config.py
index 8e9bfd5..8aeb392 100644
--- a/include/py/homekit/camera/config.py
+++ b/include/py/homekit/camera/config.py
@@ -73,6 +73,15 @@ class IpcamConfig(ConfigUnit):
'login': {'type': 'string', 'required': True},
'password': {'type': 'string', 'required': True},
}
+ },
+
+ 'web_creds': {
+ 'required': True,
+ 'type': 'dict',
+ 'schema': {
+ 'login': {'type': 'string', 'required': True},
+ 'password': {'type': 'string', 'required': True},
+ }
}
}
diff --git a/include/py/homekit/config/_configs.py b/include/py/homekit/config/_configs.py
index f88c8ea..2cd2aca 100644
--- a/include/py/homekit/config/_configs.py
+++ b/include/py/homekit/config/_configs.py
@@ -26,6 +26,7 @@ class LinuxBoardsConfig(ConfigUnit):
'schema': {
'mdns': {'type': 'string', 'required': True},
'board': {'type': 'string', 'required': True},
+ 'location': {'type': 'string', 'required': True},
'network': {
'type': 'list',
'required': True,
diff --git a/include/py/homekit/util.py b/include/py/homekit/util.py
index 78a78a0..2b06600 100644
--- a/include/py/homekit/util.py
+++ b/include/py/homekit/util.py
@@ -10,6 +10,7 @@ import string
import random
import re
import os
+import ipaddress
from enum import Enum
from datetime import datetime
@@ -37,6 +38,14 @@ def validate_ipv4_or_hostname(address: str, raise_exception: bool = False) -> bo
return False
+def validate_ipv4(address: str) -> bool:
+ try:
+ ipaddress.IPv6Address(address)
+ return True
+ except ipaddress.AddressValueError:
+ return False
+
+
def validate_mac_address(mac_address: str) -> bool:
mac_pattern = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$'
if re.match(mac_pattern, mac_address):