summaryrefslogtreecommitdiff
path: root/bin
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2024-02-17 23:20:49 +0300
committerEvgeny Zinoviev <me@ch1p.io>2024-02-17 23:20:49 +0300
commit70b4a4f044cac8052bb0af7c585572e54489ea2f (patch)
tree5611bfb19af4d847b017df5d79f89b4bbb50e9f1 /bin
parent77b80dd9b3500539b24b7dc6258c02c23fd4d015 (diff)
ipcam_ntp_util: support chinese noname cameras
Diffstat (limited to 'bin')
-rwxr-xr-xbin/ipcam_ntp_util.py267
1 files changed, 65 insertions, 202 deletions
diff --git a/bin/ipcam_ntp_util.py b/bin/ipcam_ntp_util.py
index b27995c..81f6fe0 100755
--- a/bin/ipcam_ntp_util.py
+++ b/bin/ipcam_ntp_util.py
@@ -1,16 +1,13 @@
#!/usr/bin/env python3
import __py_include
-import requests
-import hashlib
-import xml.etree.ElementTree as ET
+import homekit.camera.hikvision as hikvision
+import homekit.camera.alinoname as alinoname
from enum import Enum, auto
-from time import time
from typing import Optional
from argparse import ArgumentParser, ArgumentError
-from homekit.util import validate_ipv4, validate_ipv4_or_hostname
-from homekit.camera import IpcamConfig
-
+from homekit.util import validate_ipv4_or_hostname
+from homekit.camera import IpcamConfig, CameraType
ipcam_config = IpcamConfig()
@@ -20,216 +17,63 @@ class Action(Enum):
SET_NTP = auto()
-def xml_to_dict(xml_data: str) -> dict:
- # Parse the XML data
- root = ET.fromstring(xml_data)
-
- # Function to remove namespace from the tag name
- def remove_namespace(tag):
- return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
-
- # Function to recursively convert XML elements to a dictionary
- def elem_to_dict(elem):
- tag = remove_namespace(elem.tag)
- elem_dict = {tag: {}}
-
- # If the element has attributes, add them to the dictionary
- elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
-
- # Handle the element's text content, if present and not just whitespace
- text = elem.text.strip() if elem.text and elem.text.strip() else None
- if text:
- elem_dict[tag]['#text'] = text
-
- # Process child elements
- for child in elem:
- child_dict = elem_to_dict(child)
- child_tag = remove_namespace(child.tag)
- if child_tag not in elem_dict[tag]:
- elem_dict[tag][child_tag] = []
- elem_dict[tag][child_tag].append(child_dict[child_tag])
-
- # Simplify structure if there's only text or no children and no attributes
- if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
- return {tag: elem_dict[tag]['#text']}
- elif not elem_dict[tag]:
- return {tag: ''}
-
- return elem_dict
-
- # Convert the root element to dictionary
- return elem_to_dict(root)
-
-
-def sha256_hex(input_string: str) -> str:
- return hashlib.sha256(input_string.encode()).hexdigest()
-
-
-class ResponseError(RuntimeError):
- pass
-
-
-class AuthError(ResponseError):
- pass
-
-
-class HikvisionISAPIClient:
- def __init__(self, host):
- self.host = host
- self.cookies = {}
-
- def auth(self, username: str, password: str):
- r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'),
- {'username': username},
- headers={
- 'X-Requested-With': 'XMLHttpRequest',
- })
- r.raise_for_status()
- caps = xml_to_dict(r.text)['SessionLoginCap']
- is_irreversible = caps['isIrreversible'][0].lower() == 'true'
-
- # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
- # also look into webAuth.js and utils.js
-
- if 'salt' in caps and is_irreversible:
- p = sha256_hex(username + caps['salt'][0] + password)
- p = sha256_hex(p + caps['challenge'][0])
- for i in range(int(caps['iterations'][0])-2):
- p = sha256_hex(p)
- else:
- p = sha256_hex(password) + caps['challenge'][0]
- for i in range(int(caps['iterations'][0])-1):
- p = sha256_hex(p)
-
- data = '<SessionLogin>'
- data += f'<userName>{username}</userName>'
- data += f'<password>{p}</password>'
- data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
- data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
- data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
- data += '</SessionLogin>'
-
- r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={
- 'Accept-Encoding': 'gzip, deflate',
- 'If-Modified-Since': '0',
- 'X-Requested-With': 'XMLHttpRequest',
- 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
- })
- r.raise_for_status()
- resp = xml_to_dict(r.text)['SessionLogin']
- status_value = int(resp['statusValue'][0])
- status_string = resp['statusString'][0]
- if status_value != 200:
- raise AuthError(f'{status_value}: {status_string}')
-
- self.cookies = r.cookies.get_dict()
-
- def get_ntp_server(self) -> str:
- r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies)
- r.raise_for_status()
- ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0]
-
- if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
- ntp_host = ntp_server['hostName'][0]
- else:
- ntp_host = ntp_server['ipAddress'][0]
-
- return ntp_host
-
- def set_timezone(self):
- data = '<?xml version="1.0" encoding="UTF-8"?>'
- data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
-
- r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={
- 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
- 'X-Requested-With': 'XMLHttpRequest',
- })
- self.isapi_check_put_response(r)
-
- def set_ntp_server(self,
- ntp_host: str,
- ntp_port: int = 123):
- format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
-
- # test ntp server first
- data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
- r = requests.post(self.isapi_uri('System/time/ntpServers/test'), data=data, headers={
- 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
- 'X-Requested-With': 'XMLHttpRequest',
- }, cookies=self.cookies)
- r.raise_for_status()
-
- resp = xml_to_dict(r.text)['NTPTestResult']
-
- error_code = int(resp['errorCode'][0])
- error_description = resp['errorDescription'][0]
-
- if error_code != 0 or error_description.lower() != 'ok':
- raise ResponseError('response status looks bad')
+def process_camera(host: str,
+ action: Action,
+ login: str,
+ password: str,
+ camera_type: CameraType,
+ ntp_server: Optional[str] = None):
+ if camera_type.is_hikvision():
+ client = hikvision.ISAPIClient(host)
+ try:
+ client.auth(login, password)
+ if action == Action.GET_NTP:
+ print(f'[{host}] {client.get_ntp_server()}')
+ return
- # then set it
- data = '<?xml version="1.0" encoding="UTF-8"?>'
- data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
+ client.set_ntp_server(ntp_server)
+ print(f'[{host}] done')
- r = requests.put(self.isapi_uri('System/time/ntpServers/1'),
- data=data,
- cookies=self.cookies,
- headers={
- 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
- 'X-Requested-With': 'XMLHttpRequest',
- })
- self.isapi_check_put_response(r)
+ except hikvision.AuthError as e:
+ print(f'[{host}] ({str(e)})')
- def isapi_uri(self, path: str) -> str:
- return f'http://{self.host}/ISAPI/{path}'
+ except hikvision.ResponseError as e:
+ print(f'[{host}] ({str(e)})')
- def isapi_check_put_response(self, r):
+ elif camera_type.is_ali():
try:
- r.raise_for_status()
- except requests.exceptions.HTTPError as e:
- # print(r.text)
- raise e
-
- resp = xml_to_dict(r.text)['ResponseStatus']
-
- status_code = int(resp['statusCode'][0])
- status_string = resp['statusString'][0]
+ client = alinoname.XMEyeCamera(hostname=host, username=login, password=password)
+ client.login()
- if status_code != 1 or status_string.lower() != 'ok':
- raise ResponseError('response status looks bad')
+ if action == Action.GET_NTP:
+ print(f'[{host}] {client.get_ntp_server()}')
+ return
+ client.set_ntp_server(ntp_server)
+ print(f'[{host}] done')
-def process_hikvision_camera(host: str,
- action: Action,
- login: str,
- password: str,
- ntp_server: Optional[str] = None):
- client = HikvisionISAPIClient(host)
- try:
- client.auth(login, password)
- if action == Action.GET_NTP:
- print(f'[{host}] {client.get_ntp_server()}')
- return
- client.set_ntp_server(ntp_server)
- print(f'[{host}] done')
- except AuthError as e:
- print(f'[{host}] ({str(e)})')
- except ResponseError as e:
- print(f'[{host}] ({str(e)})')
+ except OSError as e:
+ print(f'[{host}] ({str(e)})')
def main():
+ camera_types = ['hikvision', 'ali']
parser = ArgumentParser()
- parser.add_argument('--host', type=str)
+ parser.add_argument('--camera', type=str)
+ parser.add_argument('--camera-type', type=str, choices=camera_types)
parser.add_argument('--all', action='store_true')
+ parser.add_argument('--all-of-type', type=str, choices=camera_types)
parser.add_argument('--get-ntp-server', action='store_true')
parser.add_argument('--set-ntp-server', type=str)
parser.add_argument('--username', type=str)
parser.add_argument('--password', type=str)
args = parser.parse_args()
- if not args.host and not args.all:
- raise ArgumentError(None, 'either --all or --host <IP> is required')
+ if args.all and args.all_of_type:
+ raise ArgumentError(None, 'you can\'t pass both --all and --all-of-type')
+
+ if not args.camera and not args.all and not args.all_of_type:
+ raise ArgumentError(None, 'either --all, --all-of-type <TYPE> or --camera <NUM> is required')
if not args.get_ntp_server and not args.set_ntp_server:
raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required')
@@ -247,13 +91,32 @@ def main():
kwargs = {}
if args.set_ntp_server:
kwargs['ntp_server'] = args.set_ntp_server
- if not args.all:
- process_hikvision_camera(args.host, action, login, password, **kwargs)
+ if not args.all and not args.all_of_type:
+ if not args.camera_type:
+ raise ArgumentError(None, '--camera-type is required')
+
+ if not ipcam_config.has_camera(int(args.camera)):
+ raise ArgumentError(None, f'invalid camera {args.camera}')
+ camera_host = ipcam_config.get_camera_ip(args.camera)
+
+ if args.camera_type == 'hikvision':
+ camera_type = CameraType.HIKVISION_264
+ elif args.camera_type == 'ali':
+ camera_type = CameraType.ALIEXPRESS_NONAME
+ else:
+ raise ValueError('invalid --camera-type')
+ process_camera(camera_host, action, login, password, camera_type, **kwargs)
else:
for cam in ipcam_config.get_all_cam_names():
- if not ipcam_config.get_camera_type(cam).is_hikvision():
+ if not ipcam_config.is_camera_enabled(cam):
+ continue
+
+ cam_type = ipcam_config.get_camera_type(cam)
+ if args.all_of_type == 'hikvision' and not cam_type.is_hikvision():
+ continue
+ if args.all_of_type == 'ali' and not ipcam_config.get_camera_type(cam).is_ali():
continue
- process_hikvision_camera(ipcam_config.get_camera_ip(cam), action, login, password, **kwargs)
+ process_camera(ipcam_config.get_camera_ip(cam), action, login, password, cam_type, **kwargs)
if __name__ == '__main__':