diff options
-rw-r--r-- | LICENSE | 2 | ||||
-rwxr-xr-x | bin/ipcam_ntp_util.py | 267 | ||||
-rw-r--r-- | include/py/homekit/camera/__init__.py | 2 | ||||
-rw-r--r-- | include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py | 42 | ||||
-rw-r--r-- | include/py/homekit/camera/alinoname/__init__.py | 1 | ||||
-rwxr-xr-x | include/py/homekit/camera/alinoname/nwipcam.py | 326 | ||||
-rw-r--r-- | include/py/homekit/camera/config.py | 9 | ||||
-rw-r--r-- | include/py/homekit/camera/hikvision/__init__.py | 1 | ||||
-rw-r--r-- | include/py/homekit/camera/hikvision/isapi.py | 137 | ||||
-rw-r--r-- | include/py/homekit/camera/hikvision/util.py | 48 | ||||
-rw-r--r-- | include/py/homekit/camera/types.py | 3 | ||||
-rw-r--r-- | include/py/homekit/http/http.py | 1 |
12 files changed, 635 insertions, 204 deletions
@@ -1,4 +1,4 @@ -Copyright 2022, Evgeny Zinoviev +Copyright 2022-2024, Evgeny Zinoviev All rights reserved. Redistribution and use in source and binary forms, with or without modification, diff --git a/bin/ipcam_ntp_util.py b/bin/ipcam_ntp_util.py index b27995c..81f6fe0 100755 --- a/bin/ipcam_ntp_util.py +++ b/bin/ipcam_ntp_util.py @@ -1,16 +1,13 @@ #!/usr/bin/env python3 import __py_include -import requests -import hashlib -import xml.etree.ElementTree as ET +import homekit.camera.hikvision as hikvision +import homekit.camera.alinoname as alinoname from enum import Enum, auto -from time import time from typing import Optional from argparse import ArgumentParser, ArgumentError -from homekit.util import validate_ipv4, validate_ipv4_or_hostname -from homekit.camera import IpcamConfig - +from homekit.util import validate_ipv4_or_hostname +from homekit.camera import IpcamConfig, CameraType ipcam_config = IpcamConfig() @@ -20,216 +17,63 @@ class Action(Enum): SET_NTP = auto() -def xml_to_dict(xml_data: str) -> dict: - # Parse the XML data - root = ET.fromstring(xml_data) - - # Function to remove namespace from the tag name - def remove_namespace(tag): - return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace - - # Function to recursively convert XML elements to a dictionary - def elem_to_dict(elem): - tag = remove_namespace(elem.tag) - elem_dict = {tag: {}} - - # If the element has attributes, add them to the dictionary - elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()}) - - # Handle the element's text content, if present and not just whitespace - text = elem.text.strip() if elem.text and elem.text.strip() else None - if text: - elem_dict[tag]['#text'] = text - - # Process child elements - for child in elem: - child_dict = elem_to_dict(child) - child_tag = remove_namespace(child.tag) - if child_tag not in elem_dict[tag]: - elem_dict[tag][child_tag] = [] - elem_dict[tag][child_tag].append(child_dict[child_tag]) - - # Simplify structure if there's only text or no children and no attributes - if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]: - return {tag: elem_dict[tag]['#text']} - elif not elem_dict[tag]: - return {tag: ''} - - return elem_dict - - # Convert the root element to dictionary - return elem_to_dict(root) - - -def sha256_hex(input_string: str) -> str: - return hashlib.sha256(input_string.encode()).hexdigest() - - -class ResponseError(RuntimeError): - pass - - -class AuthError(ResponseError): - pass - - -class HikvisionISAPIClient: - def __init__(self, host): - self.host = host - self.cookies = {} - - def auth(self, username: str, password: str): - r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'), - {'username': username}, - headers={ - 'X-Requested-With': 'XMLHttpRequest', - }) - r.raise_for_status() - caps = xml_to_dict(r.text)['SessionLoginCap'] - is_irreversible = caps['isIrreversible'][0].lower() == 'true' - - # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl - # also look into webAuth.js and utils.js - - if 'salt' in caps and is_irreversible: - p = sha256_hex(username + caps['salt'][0] + password) - p = sha256_hex(p + caps['challenge'][0]) - for i in range(int(caps['iterations'][0])-2): - p = sha256_hex(p) - else: - p = sha256_hex(password) + caps['challenge'][0] - for i in range(int(caps['iterations'][0])-1): - p = sha256_hex(p) - - data = '<SessionLogin>' - data += f'<userName>{username}</userName>' - data += f'<password>{p}</password>' - data += f'<sessionID>{caps["sessionID"][0]}</sessionID>' - data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>' - data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>' - data += '</SessionLogin>' - - r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={ - 'Accept-Encoding': 'gzip, deflate', - 'If-Modified-Since': '0', - 'X-Requested-With': 'XMLHttpRequest', - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', - }) - r.raise_for_status() - resp = xml_to_dict(r.text)['SessionLogin'] - status_value = int(resp['statusValue'][0]) - status_string = resp['statusString'][0] - if status_value != 200: - raise AuthError(f'{status_value}: {status_string}') - - self.cookies = r.cookies.get_dict() - - def get_ntp_server(self) -> str: - r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies) - r.raise_for_status() - ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0] - - if ntp_server['addressingFormatType'][0]['#text'] == 'hostname': - ntp_host = ntp_server['hostName'][0] - else: - ntp_host = ntp_server['ipAddress'][0] - - return ntp_host - - def set_timezone(self): - data = '<?xml version="1.0" encoding="UTF-8"?>' - data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>' - - r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={ - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', - 'X-Requested-With': 'XMLHttpRequest', - }) - self.isapi_check_put_response(r) - - def set_ntp_server(self, - ntp_host: str, - ntp_port: int = 123): - format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname' - - # test ntp server first - data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>' - r = requests.post(self.isapi_uri('System/time/ntpServers/test'), data=data, headers={ - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', - 'X-Requested-With': 'XMLHttpRequest', - }, cookies=self.cookies) - r.raise_for_status() - - resp = xml_to_dict(r.text)['NTPTestResult'] - - error_code = int(resp['errorCode'][0]) - error_description = resp['errorDescription'][0] - - if error_code != 0 or error_description.lower() != 'ok': - raise ResponseError('response status looks bad') +def process_camera(host: str, + action: Action, + login: str, + password: str, + camera_type: CameraType, + ntp_server: Optional[str] = None): + if camera_type.is_hikvision(): + client = hikvision.ISAPIClient(host) + try: + client.auth(login, password) + if action == Action.GET_NTP: + print(f'[{host}] {client.get_ntp_server()}') + return - # then set it - data = '<?xml version="1.0" encoding="UTF-8"?>' - data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>' + client.set_ntp_server(ntp_server) + print(f'[{host}] done') - r = requests.put(self.isapi_uri('System/time/ntpServers/1'), - data=data, - cookies=self.cookies, - headers={ - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', - 'X-Requested-With': 'XMLHttpRequest', - }) - self.isapi_check_put_response(r) + except hikvision.AuthError as e: + print(f'[{host}] ({str(e)})') - def isapi_uri(self, path: str) -> str: - return f'http://{self.host}/ISAPI/{path}' + except hikvision.ResponseError as e: + print(f'[{host}] ({str(e)})') - def isapi_check_put_response(self, r): + elif camera_type.is_ali(): try: - r.raise_for_status() - except requests.exceptions.HTTPError as e: - # print(r.text) - raise e - - resp = xml_to_dict(r.text)['ResponseStatus'] - - status_code = int(resp['statusCode'][0]) - status_string = resp['statusString'][0] + client = alinoname.XMEyeCamera(hostname=host, username=login, password=password) + client.login() - if status_code != 1 or status_string.lower() != 'ok': - raise ResponseError('response status looks bad') + if action == Action.GET_NTP: + print(f'[{host}] {client.get_ntp_server()}') + return + client.set_ntp_server(ntp_server) + print(f'[{host}] done') -def process_hikvision_camera(host: str, - action: Action, - login: str, - password: str, - ntp_server: Optional[str] = None): - client = HikvisionISAPIClient(host) - try: - client.auth(login, password) - if action == Action.GET_NTP: - print(f'[{host}] {client.get_ntp_server()}') - return - client.set_ntp_server(ntp_server) - print(f'[{host}] done') - except AuthError as e: - print(f'[{host}] ({str(e)})') - except ResponseError as e: - print(f'[{host}] ({str(e)})') + except OSError as e: + print(f'[{host}] ({str(e)})') def main(): + camera_types = ['hikvision', 'ali'] parser = ArgumentParser() - parser.add_argument('--host', type=str) + parser.add_argument('--camera', type=str) + parser.add_argument('--camera-type', type=str, choices=camera_types) parser.add_argument('--all', action='store_true') + parser.add_argument('--all-of-type', type=str, choices=camera_types) parser.add_argument('--get-ntp-server', action='store_true') parser.add_argument('--set-ntp-server', type=str) parser.add_argument('--username', type=str) parser.add_argument('--password', type=str) args = parser.parse_args() - if not args.host and not args.all: - raise ArgumentError(None, 'either --all or --host <IP> is required') + if args.all and args.all_of_type: + raise ArgumentError(None, 'you can\'t pass both --all and --all-of-type') + + if not args.camera and not args.all and not args.all_of_type: + raise ArgumentError(None, 'either --all, --all-of-type <TYPE> or --camera <NUM> is required') if not args.get_ntp_server and not args.set_ntp_server: raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required') @@ -247,13 +91,32 @@ def main(): kwargs = {} if args.set_ntp_server: kwargs['ntp_server'] = args.set_ntp_server - if not args.all: - process_hikvision_camera(args.host, action, login, password, **kwargs) + if not args.all and not args.all_of_type: + if not args.camera_type: + raise ArgumentError(None, '--camera-type is required') + + if not ipcam_config.has_camera(int(args.camera)): + raise ArgumentError(None, f'invalid camera {args.camera}') + camera_host = ipcam_config.get_camera_ip(args.camera) + + if args.camera_type == 'hikvision': + camera_type = CameraType.HIKVISION_264 + elif args.camera_type == 'ali': + camera_type = CameraType.ALIEXPRESS_NONAME + else: + raise ValueError('invalid --camera-type') + process_camera(camera_host, action, login, password, camera_type, **kwargs) else: for cam in ipcam_config.get_all_cam_names(): - if not ipcam_config.get_camera_type(cam).is_hikvision(): + if not ipcam_config.is_camera_enabled(cam): + continue + + cam_type = ipcam_config.get_camera_type(cam) + if args.all_of_type == 'hikvision' and not cam_type.is_hikvision(): + continue + if args.all_of_type == 'ali' and not ipcam_config.get_camera_type(cam).is_ali(): continue - process_hikvision_camera(ipcam_config.get_camera_ip(cam), action, login, password, **kwargs) + process_camera(ipcam_config.get_camera_ip(cam), action, login, password, cam_type, **kwargs) if __name__ == '__main__': diff --git a/include/py/homekit/camera/__init__.py b/include/py/homekit/camera/__init__.py index 4875031..7f8714b 100644 --- a/include/py/homekit/camera/__init__.py +++ b/include/py/homekit/camera/__init__.py @@ -1,2 +1,2 @@ from .types import CameraType, VideoContainerType, VideoCodecType, CaptureType -from .config import IpcamConfig
\ No newline at end of file +from .config import IpcamConfig diff --git a/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py b/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py new file mode 100644 index 0000000..9423382 --- /dev/null +++ b/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py @@ -0,0 +1,42 @@ +# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera +# Copyright (C) 2019-2019 Johannes Bauer +# +# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam). +# +# numenworld-ipcam is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; this program is ONLY licensed under +# version 3 of the License, later versions are explicitly excluded. +# +# numenworld-ipcam is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with numenworld-ipcam; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Johannes Bauer <JohannesBauer@gmx.de> + +import hashlib + + +class HorrificallyBrokenPasswordFunction(): + @classmethod + def derive(self, passphrase): + alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + assert (len(alphabet) == 62) + passphrase = passphrase.encode("utf-8") + hashval = hashlib.md5(passphrase).digest() + encoded = "" + for i in range(0, 16, 2): + index = (hashval[i] + hashval[i + 1]) % len(alphabet) + char = alphabet[index] + encoded += char + return encoded + + +if __name__ == "__main__": + assert (HorrificallyBrokenPasswordFunction.derive("") == "tlJwpbo6") + assert (HorrificallyBrokenPasswordFunction.derive("abc") == "LkM7s2Ht") diff --git a/include/py/homekit/camera/alinoname/__init__.py b/include/py/homekit/camera/alinoname/__init__.py new file mode 100644 index 0000000..bce9919 --- /dev/null +++ b/include/py/homekit/camera/alinoname/__init__.py @@ -0,0 +1 @@ +from .nwipcam import XMEyeCamera diff --git a/include/py/homekit/camera/alinoname/nwipcam.py b/include/py/homekit/camera/alinoname/nwipcam.py new file mode 100755 index 0000000..e54ec62 --- /dev/null +++ b/include/py/homekit/camera/alinoname/nwipcam.py @@ -0,0 +1,326 @@ +#!/usr/bin/python3 +# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera +# Copyright (C) 2019-2019 Johannes Bauer +# +# Changes and improvements: +# Copyright (C) 2024 Evgeny Zinoviev +# +# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam). +# +# numenworld-ipcam is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; this program is ONLY licensed under +# version 3 of the License, later versions are explicitly excluded. +# +# numenworld-ipcam is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with numenworld-ipcam; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Johannes Bauer <JohannesBauer@gmx.de> + +import collections +import struct +import socket +import enum +import json +import subprocess + +from .HorrificallyBrokenPasswordFunction import HorrificallyBrokenPasswordFunction + + +class XMEyeMsgCode(enum.IntEnum): + LoginCmd = 1000 + LoginReply = LoginCmd + 1 + + KeepAliveCmd = 1006 + KeepAliveReply = KeepAliveCmd + 1 + + SetConfigCmd = 1040 + SetConfigReply = SetConfigCmd + 1 + + GetConfigCmd = 1042 + GetConfidReply = GetConfigCmd + 1 + + GetSystemInfoCmd = 1020 + GetSystemInfoReply = GetSystemInfoCmd + 1 + + ChannelTitleCmd = 1048 + ChannelTitleReply = ChannelTitleCmd + 1 + + SystemFunctionCmd = 1360 + SystemFunctionReply = SystemFunctionCmd + 1 + + OPMonitorStartStopCmd = 1410 + OPMonitorStartStopReply = OPMonitorStartStopCmd + 1 + + OPMonitorClaimCmd = 1413 + OPMonitorClaimReply = OPMonitorClaimCmd + 1 + + OPTimeQueryCmd = 1452 + OPTimeQueryReply = OPTimeQueryCmd + 1 + + VideoStreamData = 1412 + + +class AudioVideoDataType(enum.IntEnum): + VideoIncomplete = 0xfc + VideoComplete = 0xfd + AudioComplete = 0xfa + + +class AudioVideoPayload(): + _HeaderFields = collections.namedtuple("HeaderFields", ["unknown1", "channel", "datatype", "unknown2", "length"]) + _HeaderStruct = struct.Struct("< H B B H H") + + def __init__(self, payload, hint=""): + self._header = self._HeaderFields(*self._HeaderStruct.unpack(payload[:self._HeaderStruct.size])) + print( + "%20s [%5d]: %s %s" % (hint, len(payload), " ".join("%02x" % (c) for c in payload[:8]), str(self._header))) + # if len(payload) != (self._HeaderStruct.size + self._header.length): + # raise Exception("Unexpected AV payload, expected %d bytes but got %d." % (self._HeaderStruct.size + self._header.length, len(payload))) + # print(self._header) + self._data = payload[self._HeaderStruct.size:] + + @property + def data(self): + return self._data + + +class XMEyeMessage(): + _HeaderFields = collections.namedtuple("HeaderFields", ["station", "session", "unknown", "msgcode", "length"]) + _HeaderStruct = struct.Struct("< L L 6s H L") + + def __init__(self, station, session, msgcode, message): + if isinstance(message, (bytes, bytearray)): + self._message = bytes(message) + else: + self._message = json.dumps(message).encode("ascii") + self._header = self._HeaderFields(station=station, + session=session, + unknown=bytes(6), + msgcode=msgcode, + length=len(self._message)) + + @property + def header(self): + return self._header + + @property + def message(self): + return self._message + + @property + def payload(self): + msg = self.message.rstrip(bytes(1)) + try: + data = json.loads(msg) + except (json.JSONDecodeError, UnicodeError): + return self.message + return data + + @property + def length(self): + return len(self.message) + + def __bytes__(self): + header = self._HeaderStruct.pack(*self._header) + return header + self._message + + @classmethod + def deserialize(cls, msg): + header_data = msg[:20] + header = cls._HeaderFields(*cls._HeaderStruct.unpack(header_data)) + payload = msg[20: 20 + header.length] + if len(payload) < header.length: + payload += bytes(header.length - len(payload)) + + try: + msgcode = XMEyeMsgCode(header.msgcode) + except ValueError: + msgcode = header.msgcode + return cls(station=header.station, session=header.session, msgcode=msgcode, message=payload) + + @classmethod + def deserialize_all(cls, msg): + msg = bytearray(msg) + while len(msg) >= 20: + next_msg = cls.deserialize(msg) + yield next_msg + msg = msg[20 + next_msg.length:] + + def dump(self): + print("%s (%d bytes payload):" % (self._header.msgcode, self.length)) + if isinstance(self.payload, bytes): + print(self.payload) + else: + print(json.dumps(self.payload, indent=4, sort_keys=True)) + print() + + def __repr__(self): + return "Msg(%s): %s" % (self.header, str(self.payload)) + + +class XMEyeCamera(): + def __init__(self, hostname, username="admin", password="", port=34567): + self._hostname = hostname + self._conn = socket.create_connection((hostname, port)) + self._session = None + self._username = username + self._password = password + + @property + def derived_password(self): + return HorrificallyBrokenPasswordFunction.derive(self._password) + + @property + def rtsp_uri(self): + rtsp_port = 554 + return "rtsp://%s:%d/user=%s&password=%s&channel=" % ( + self._hostname, rtsp_port, self._username, self.derived_password) + + def _rx_bytes(self, length): + result = bytearray() + while len(result) < length: + remaining_bytes = length - len(result) + rx_data = self._conn.recv(remaining_bytes) + result += rx_data + return result + + def _rx(self): + response_header = self._rx_bytes(20) + header = XMEyeMessage.deserialize(response_header) + payload_data = self._rx_bytes(header.length) + + response_data = response_header + payload_data + response = XMEyeMessage.deserialize(response_data) + # print("<-", response) + return response + + def _tx(self, command): + # print("->", command) + data = bytes(command) + self._conn.send(data) + + def _tx_rx(self, command): + self._tx(command) + return self._rx() + + def login(self): + data = { + "EncryptType": "MD5", + "LoginType": "DVRIP-Web", + "UserName": self._username, + "PassWord": self.derived_password, + } + command = XMEyeMessage(station=0xff, session=0, msgcode=XMEyeMsgCode.LoginCmd, message=data) + response = self._tx_rx(command) + if int(response.payload["Ret"]) == 100: + # Login successful + self._session = int(response.payload["SessionID"], 16) + else: + raise Exception("Login failed:", response) + + def _generic_cmd(self, name, msgcode): + data = { + "Name": name, + "SessionID": "0x%x" % (self._session,), + } + command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data) + return self._tx_rx(command) + + def get_systeminfo(self): + return self._generic_cmd("SystemInfo", XMEyeMsgCode.GetSystemInfoCmd) + + def get_channel_title(self): + return self._generic_cmd("ChannelTitle", XMEyeMsgCode.ChannelTitleCmd) + + def get_system_function(self): + return self._generic_cmd("SystemFunction", XMEyeMsgCode.SystemFunctionCmd) + + def get_talk_audio_format(self): + return self._generic_cmd("TalkAudioFormat", XMEyeMsgCode.SystemFunctionCmd) + + def get_ntp_server(self): + ntp_config = self._generic_cmd("NetWork.NetNTP", XMEyeMsgCode.GetConfigCmd) + return ntp_config.payload['NetWork.NetNTP']['Server']['Name'] + + def set_ntp_server(self, + ntp_host: str, + ntp_port: int = 123) -> None: + data = { + 'Name': 'NetWork.NetNTP', + 'NetWork.NetNTP': { + 'Enable': True, + 'Server': { + 'Address': '0x00000000', + 'Anonymity': False, + 'Name': ntp_host, + 'Password': '', + 'Port': ntp_port, + 'UserName': '' + }, + 'TimeZone': 13, # Moscow time + 'UpdatePeriod': 60 + }, + "SessionID": "0x%x" % (self._session,), + } + command = XMEyeMessage(station=0xff, session=self._session, msgcode=XMEyeMsgCode.SetConfigCmd, message=data) + self._tx_rx(command) + + def _opmonitor_cmd(self, action, msgcode, rx_msg=True): + data = { + "Name": "OPMonitor", + "OPMonitor": { + "Action": action, + "Parameter": { + "Channel": 0, + "CombinMode": "CONNECT_ALL", + "StreamType": "Main", + "TransMode": "TCP", + }, + }, + "SessionID": "0x%x" % (self._session,), + } + command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data) + if rx_msg: + return self._tx_rx(command) + else: + self._tx(command) + + def get_stream(self, packet_callback): + self._opmonitor_cmd("Claim", XMEyeMsgCode.OPMonitorClaimCmd) + self._opmonitor_cmd("Start", XMEyeMsgCode.OPMonitorStartStopCmd, rx_msg=False) + while True: + rx_pkt = self._rx() + packet_callback(rx_pkt) + + # def playback_stream(self): + # mplayer_process = subprocess.Popen(["mplayer", "-demuxer", "h264es", "-"], stdin=subprocess.PIPE) + # with open("audio.raw", "wb") as f, open("video.raw", "wb") as video_f: + # def pkt_callback(pkt): + # if (pkt.header.station == 511) and (pkt.header.msgcode == XMEyeMsgCode.VideoStreamData): + # avpayload = AudioVideoPayload(pkt.payload, hint="video") + # mplayer_process.stdin.raw.write(pkt.payload) + # video_f.write(pkt.payload) + # elif pkt.header.msgcode == XMEyeMsgCode.VideoStreamData: + # # Audio data? + # avpayload = AudioVideoPayload(pkt.payload, hint="audio") + # f.write(avpayload.data) + # elif pkt.header.msgcode != XMEyeMsgCode.VideoStreamData: + # print(pkt) + # + # self.get_stream(pkt_callback) + + +if __name__ == '__main__': + cam = XMEyeCamera("192.168.0.47", password="DerPr03ess") + cam.login() + print(cam.get_systeminfo()) + print(cam.get_channel_title()) + # print(cam.get_talk_audio_format()) + # print(cam.get_system_function()) diff --git a/include/py/homekit/camera/config.py b/include/py/homekit/camera/config.py index 0ed75cf..9685cab 100644 --- a/include/py/homekit/camera/config.py +++ b/include/py/homekit/camera/config.py @@ -132,6 +132,9 @@ class IpcamConfig(ConfigUnit): # def get_cam_server_and_disk(self, cam: int) -> tuple[str, int]: # return self['cameras'][cam]['server'], self['cameras'][cam]['disk'] + def has_camera(self, camera: int) -> bool: + return camera in tuple(self['cameras'].keys()) + def get_camera_container(self, camera: int) -> VideoContainerType: return self.get_camera_type(camera).get_container() @@ -143,3 +146,9 @@ class IpcamConfig(ConfigUnit): def get_camera_ip(self, camera: int) -> str: return self['camera_ip_template'] % (str(camera),) + + def is_camera_enabled(self, camera: int) -> bool: + try: + return self['cameras'][camera]['enabled'] + except KeyError: + return True diff --git a/include/py/homekit/camera/hikvision/__init__.py b/include/py/homekit/camera/hikvision/__init__.py new file mode 100644 index 0000000..72d6ae3 --- /dev/null +++ b/include/py/homekit/camera/hikvision/__init__.py @@ -0,0 +1 @@ +from .isapi import ISAPIClient, ResponseError, AuthError diff --git a/include/py/homekit/camera/hikvision/isapi.py b/include/py/homekit/camera/hikvision/isapi.py new file mode 100644 index 0000000..6cc34f8 --- /dev/null +++ b/include/py/homekit/camera/hikvision/isapi.py @@ -0,0 +1,137 @@ +import requests + +from time import time +from .util import xml_to_dict, sha256_hex +from ...util import validate_ipv4 +from ...http import HTTPMethod +from typing import Optional, Union + + +class ResponseError(RuntimeError): + pass + + +class AuthError(ResponseError): + pass + + +class ISAPIClient: + def __init__(self, host): + self.host = host + self.cookies = {} + + def call(self, + path: str, + method: HTTPMethod = HTTPMethod.GET, + data: Optional[Union[dict, str, bytes]] = None, + raise_for_status=True, + check_put_response=False): + f = getattr(requests, method.value.lower()) + + headers = { + 'X-Requested-With': 'XMLHttpRequest', + } + if method in (HTTPMethod.PUT, HTTPMethod.POST): + headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' + + kwargs = {} + if data: + kwargs['data' if method is not HTTPMethod.GET else 'params'] = data + if len(self.cookies) > 0: + kwargs['cookies'] = self.cookies + + r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs) + + if raise_for_status or check_put_response: + r.raise_for_status() + + parsed_xml = None + if check_put_response: + parsed_xml = xml_to_dict(r.text) + resp = parsed_xml['ResponseStatus'] + status_code = int(resp['statusCode'][0]) + status_string = resp['statusString'][0] + if status_code != 1 or status_string.lower() != 'ok': + raise ResponseError('response status looks bad') + + self.cookies.update(r.cookies.get_dict()) + + if parsed_xml is None: + parsed_xml = xml_to_dict(r.text) + + return parsed_xml + + def auth(self, username: str, password: str): + xml = self.call('Security/sessionLogin/capabilities', data={'username': username}) + caps = xml['SessionLoginCap'] + is_irreversible = caps['isIrreversible'][0].lower() == 'true' + + # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl + # also look into webAuth.js and utils.js + + if 'salt' in caps and is_irreversible: + p = sha256_hex(username + caps['salt'][0] + password) + p = sha256_hex(p + caps['challenge'][0]) + for i in range(int(caps['iterations'][0])-2): + p = sha256_hex(p) + else: + p = sha256_hex(password) + caps['challenge'][0] + for i in range(int(caps['iterations'][0])-1): + p = sha256_hex(p) + + data = '<SessionLogin>' + data += f'<userName>{username}</userName>' + data += f'<password>{p}</password>' + data += f'<sessionID>{caps["sessionID"][0]}</sessionID>' + data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>' + data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>' + data += '</SessionLogin>' + + resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin'] + status_value = int(resp['statusValue'][0]) + status_string = resp['statusString'][0] + if status_value != 200: + raise AuthError(f'{status_value}: {status_string}') + + def get_ntp_server(self) -> str: + try: + # works on newer 1080p cams + xml = self.call('System/time/ntpServers/capabilities') + ntp_server = xml['NTPServerList']['NTPServer'][0] + if ntp_server['addressingFormatType'][0]['#text'] == 'hostname': + ntp_host = ntp_server['hostName'][0] + else: + ntp_host = ntp_server['ipAddress'][0] + + except requests.exceptions.HTTPError: + # works on older 720p cams + ntp_server = self.call('System/time/ntpServers/1')['NTPServer'] + if ntp_server['addressingFormatType'][0] == 'hostname': + ntp_host = ntp_server['hostName'][0] + else: + ntp_host = ntp_server['ipAddress'][0] + + return ntp_host + + def set_timezone(self): + data = '<?xml version="1.0" encoding="UTF-8"?>' + data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>' + self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True) + + def set_ntp_server(self, + ntp_host: str, + ntp_port: int = 123): + format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname' + + # test ntp server first + data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>' + resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult'] + error_code = int(resp['errorCode'][0]) + error_description = resp['errorDescription'][0] + if error_code != 0 or error_description.lower() != 'ok': + raise ResponseError('response status looks bad') + + # then set it + data = '<?xml version="1.0" encoding="UTF-8"?>' + data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>' + self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True) diff --git a/include/py/homekit/camera/hikvision/util.py b/include/py/homekit/camera/hikvision/util.py new file mode 100644 index 0000000..581c6ea --- /dev/null +++ b/include/py/homekit/camera/hikvision/util.py @@ -0,0 +1,48 @@ +import requests +import hashlib +import xml.etree.ElementTree as ET + + +def xml_to_dict(xml_data: str) -> dict: + # Parse the XML data + root = ET.fromstring(xml_data) + + # Function to remove namespace from the tag name + def remove_namespace(tag): + return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace + + # Function to recursively convert XML elements to a dictionary + def elem_to_dict(elem): + tag = remove_namespace(elem.tag) + elem_dict = {tag: {}} + + # If the element has attributes, add them to the dictionary + elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()}) + + # Handle the element's text content, if present and not just whitespace + text = elem.text.strip() if elem.text and elem.text.strip() else None + if text: + elem_dict[tag]['#text'] = text + + # Process child elements + for child in elem: + child_dict = elem_to_dict(child) + child_tag = remove_namespace(child.tag) + if child_tag not in elem_dict[tag]: + elem_dict[tag][child_tag] = [] + elem_dict[tag][child_tag].append(child_dict[child_tag]) + + # Simplify structure if there's only text or no children and no attributes + if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]: + return {tag: elem_dict[tag]['#text']} + elif not elem_dict[tag]: + return {tag: ''} + + return elem_dict + + # Convert the root element to dictionary + return elem_to_dict(root) + + +def sha256_hex(input_string: str) -> str: + return hashlib.sha256(input_string.encode()).hexdigest() diff --git a/include/py/homekit/camera/types.py b/include/py/homekit/camera/types.py index ec4663b..0aaadef 100644 --- a/include/py/homekit/camera/types.py +++ b/include/py/homekit/camera/types.py @@ -44,6 +44,9 @@ class CameraType(Enum): def is_hikvision(self) -> bool: return self in (CameraType.HIKVISION_264, CameraType.HIKVISION_265) + def is_ali(self) -> bool: + return self == CameraType.ALIEXPRESS_NONAME + class TimeFilterType(Enum): FIX = 'fix' diff --git a/include/py/homekit/http/http.py b/include/py/homekit/http/http.py index 82c5aae..8819c46 100644 --- a/include/py/homekit/http/http.py +++ b/include/py/homekit/http/http.py @@ -113,3 +113,4 @@ class HTTPServer: class HTTPMethod(Enum): GET = 'GET' POST = 'POST' + PUT = 'PUT' |