summaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2024-02-17 23:20:49 +0300
committerEvgeny Zinoviev <me@ch1p.io>2024-02-17 23:20:49 +0300
commit70b4a4f044cac8052bb0af7c585572e54489ea2f (patch)
tree5611bfb19af4d847b017df5d79f89b4bbb50e9f1 /include
parent77b80dd9b3500539b24b7dc6258c02c23fd4d015 (diff)
ipcam_ntp_util: support chinese noname cameras
Diffstat (limited to 'include')
-rw-r--r--include/py/homekit/camera/__init__.py2
-rw-r--r--include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py42
-rw-r--r--include/py/homekit/camera/alinoname/__init__.py1
-rwxr-xr-xinclude/py/homekit/camera/alinoname/nwipcam.py326
-rw-r--r--include/py/homekit/camera/config.py9
-rw-r--r--include/py/homekit/camera/hikvision/__init__.py1
-rw-r--r--include/py/homekit/camera/hikvision/isapi.py137
-rw-r--r--include/py/homekit/camera/hikvision/util.py48
-rw-r--r--include/py/homekit/camera/types.py3
-rw-r--r--include/py/homekit/http/http.py1
10 files changed, 569 insertions, 1 deletions
diff --git a/include/py/homekit/camera/__init__.py b/include/py/homekit/camera/__init__.py
index 4875031..7f8714b 100644
--- a/include/py/homekit/camera/__init__.py
+++ b/include/py/homekit/camera/__init__.py
@@ -1,2 +1,2 @@
from .types import CameraType, VideoContainerType, VideoCodecType, CaptureType
-from .config import IpcamConfig \ No newline at end of file
+from .config import IpcamConfig
diff --git a/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py b/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py
new file mode 100644
index 0000000..9423382
--- /dev/null
+++ b/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py
@@ -0,0 +1,42 @@
+# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera
+# Copyright (C) 2019-2019 Johannes Bauer
+#
+# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam).
+#
+# numenworld-ipcam is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; this program is ONLY licensed under
+# version 3 of the License, later versions are explicitly excluded.
+#
+# numenworld-ipcam is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with numenworld-ipcam; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Johannes Bauer <JohannesBauer@gmx.de>
+
+import hashlib
+
+
+class HorrificallyBrokenPasswordFunction():
+ @classmethod
+ def derive(self, passphrase):
+ alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+ assert (len(alphabet) == 62)
+ passphrase = passphrase.encode("utf-8")
+ hashval = hashlib.md5(passphrase).digest()
+ encoded = ""
+ for i in range(0, 16, 2):
+ index = (hashval[i] + hashval[i + 1]) % len(alphabet)
+ char = alphabet[index]
+ encoded += char
+ return encoded
+
+
+if __name__ == "__main__":
+ assert (HorrificallyBrokenPasswordFunction.derive("") == "tlJwpbo6")
+ assert (HorrificallyBrokenPasswordFunction.derive("abc") == "LkM7s2Ht")
diff --git a/include/py/homekit/camera/alinoname/__init__.py b/include/py/homekit/camera/alinoname/__init__.py
new file mode 100644
index 0000000..bce9919
--- /dev/null
+++ b/include/py/homekit/camera/alinoname/__init__.py
@@ -0,0 +1 @@
+from .nwipcam import XMEyeCamera
diff --git a/include/py/homekit/camera/alinoname/nwipcam.py b/include/py/homekit/camera/alinoname/nwipcam.py
new file mode 100755
index 0000000..e54ec62
--- /dev/null
+++ b/include/py/homekit/camera/alinoname/nwipcam.py
@@ -0,0 +1,326 @@
+#!/usr/bin/python3
+# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera
+# Copyright (C) 2019-2019 Johannes Bauer
+#
+# Changes and improvements:
+# Copyright (C) 2024 Evgeny Zinoviev
+#
+# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam).
+#
+# numenworld-ipcam is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; this program is ONLY licensed under
+# version 3 of the License, later versions are explicitly excluded.
+#
+# numenworld-ipcam is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with numenworld-ipcam; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Johannes Bauer <JohannesBauer@gmx.de>
+
+import collections
+import struct
+import socket
+import enum
+import json
+import subprocess
+
+from .HorrificallyBrokenPasswordFunction import HorrificallyBrokenPasswordFunction
+
+
+class XMEyeMsgCode(enum.IntEnum):
+ LoginCmd = 1000
+ LoginReply = LoginCmd + 1
+
+ KeepAliveCmd = 1006
+ KeepAliveReply = KeepAliveCmd + 1
+
+ SetConfigCmd = 1040
+ SetConfigReply = SetConfigCmd + 1
+
+ GetConfigCmd = 1042
+ GetConfidReply = GetConfigCmd + 1
+
+ GetSystemInfoCmd = 1020
+ GetSystemInfoReply = GetSystemInfoCmd + 1
+
+ ChannelTitleCmd = 1048
+ ChannelTitleReply = ChannelTitleCmd + 1
+
+ SystemFunctionCmd = 1360
+ SystemFunctionReply = SystemFunctionCmd + 1
+
+ OPMonitorStartStopCmd = 1410
+ OPMonitorStartStopReply = OPMonitorStartStopCmd + 1
+
+ OPMonitorClaimCmd = 1413
+ OPMonitorClaimReply = OPMonitorClaimCmd + 1
+
+ OPTimeQueryCmd = 1452
+ OPTimeQueryReply = OPTimeQueryCmd + 1
+
+ VideoStreamData = 1412
+
+
+class AudioVideoDataType(enum.IntEnum):
+ VideoIncomplete = 0xfc
+ VideoComplete = 0xfd
+ AudioComplete = 0xfa
+
+
+class AudioVideoPayload():
+ _HeaderFields = collections.namedtuple("HeaderFields", ["unknown1", "channel", "datatype", "unknown2", "length"])
+ _HeaderStruct = struct.Struct("< H B B H H")
+
+ def __init__(self, payload, hint=""):
+ self._header = self._HeaderFields(*self._HeaderStruct.unpack(payload[:self._HeaderStruct.size]))
+ print(
+ "%20s [%5d]: %s %s" % (hint, len(payload), " ".join("%02x" % (c) for c in payload[:8]), str(self._header)))
+ # if len(payload) != (self._HeaderStruct.size + self._header.length):
+ # raise Exception("Unexpected AV payload, expected %d bytes but got %d." % (self._HeaderStruct.size + self._header.length, len(payload)))
+ # print(self._header)
+ self._data = payload[self._HeaderStruct.size:]
+
+ @property
+ def data(self):
+ return self._data
+
+
+class XMEyeMessage():
+ _HeaderFields = collections.namedtuple("HeaderFields", ["station", "session", "unknown", "msgcode", "length"])
+ _HeaderStruct = struct.Struct("< L L 6s H L")
+
+ def __init__(self, station, session, msgcode, message):
+ if isinstance(message, (bytes, bytearray)):
+ self._message = bytes(message)
+ else:
+ self._message = json.dumps(message).encode("ascii")
+ self._header = self._HeaderFields(station=station,
+ session=session,
+ unknown=bytes(6),
+ msgcode=msgcode,
+ length=len(self._message))
+
+ @property
+ def header(self):
+ return self._header
+
+ @property
+ def message(self):
+ return self._message
+
+ @property
+ def payload(self):
+ msg = self.message.rstrip(bytes(1))
+ try:
+ data = json.loads(msg)
+ except (json.JSONDecodeError, UnicodeError):
+ return self.message
+ return data
+
+ @property
+ def length(self):
+ return len(self.message)
+
+ def __bytes__(self):
+ header = self._HeaderStruct.pack(*self._header)
+ return header + self._message
+
+ @classmethod
+ def deserialize(cls, msg):
+ header_data = msg[:20]
+ header = cls._HeaderFields(*cls._HeaderStruct.unpack(header_data))
+ payload = msg[20: 20 + header.length]
+ if len(payload) < header.length:
+ payload += bytes(header.length - len(payload))
+
+ try:
+ msgcode = XMEyeMsgCode(header.msgcode)
+ except ValueError:
+ msgcode = header.msgcode
+ return cls(station=header.station, session=header.session, msgcode=msgcode, message=payload)
+
+ @classmethod
+ def deserialize_all(cls, msg):
+ msg = bytearray(msg)
+ while len(msg) >= 20:
+ next_msg = cls.deserialize(msg)
+ yield next_msg
+ msg = msg[20 + next_msg.length:]
+
+ def dump(self):
+ print("%s (%d bytes payload):" % (self._header.msgcode, self.length))
+ if isinstance(self.payload, bytes):
+ print(self.payload)
+ else:
+ print(json.dumps(self.payload, indent=4, sort_keys=True))
+ print()
+
+ def __repr__(self):
+ return "Msg(%s): %s" % (self.header, str(self.payload))
+
+
+class XMEyeCamera():
+ def __init__(self, hostname, username="admin", password="", port=34567):
+ self._hostname = hostname
+ self._conn = socket.create_connection((hostname, port))
+ self._session = None
+ self._username = username
+ self._password = password
+
+ @property
+ def derived_password(self):
+ return HorrificallyBrokenPasswordFunction.derive(self._password)
+
+ @property
+ def rtsp_uri(self):
+ rtsp_port = 554
+ return "rtsp://%s:%d/user=%s&password=%s&channel=" % (
+ self._hostname, rtsp_port, self._username, self.derived_password)
+
+ def _rx_bytes(self, length):
+ result = bytearray()
+ while len(result) < length:
+ remaining_bytes = length - len(result)
+ rx_data = self._conn.recv(remaining_bytes)
+ result += rx_data
+ return result
+
+ def _rx(self):
+ response_header = self._rx_bytes(20)
+ header = XMEyeMessage.deserialize(response_header)
+ payload_data = self._rx_bytes(header.length)
+
+ response_data = response_header + payload_data
+ response = XMEyeMessage.deserialize(response_data)
+ # print("<-", response)
+ return response
+
+ def _tx(self, command):
+ # print("->", command)
+ data = bytes(command)
+ self._conn.send(data)
+
+ def _tx_rx(self, command):
+ self._tx(command)
+ return self._rx()
+
+ def login(self):
+ data = {
+ "EncryptType": "MD5",
+ "LoginType": "DVRIP-Web",
+ "UserName": self._username,
+ "PassWord": self.derived_password,
+ }
+ command = XMEyeMessage(station=0xff, session=0, msgcode=XMEyeMsgCode.LoginCmd, message=data)
+ response = self._tx_rx(command)
+ if int(response.payload["Ret"]) == 100:
+ # Login successful
+ self._session = int(response.payload["SessionID"], 16)
+ else:
+ raise Exception("Login failed:", response)
+
+ def _generic_cmd(self, name, msgcode):
+ data = {
+ "Name": name,
+ "SessionID": "0x%x" % (self._session,),
+ }
+ command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
+ return self._tx_rx(command)
+
+ def get_systeminfo(self):
+ return self._generic_cmd("SystemInfo", XMEyeMsgCode.GetSystemInfoCmd)
+
+ def get_channel_title(self):
+ return self._generic_cmd("ChannelTitle", XMEyeMsgCode.ChannelTitleCmd)
+
+ def get_system_function(self):
+ return self._generic_cmd("SystemFunction", XMEyeMsgCode.SystemFunctionCmd)
+
+ def get_talk_audio_format(self):
+ return self._generic_cmd("TalkAudioFormat", XMEyeMsgCode.SystemFunctionCmd)
+
+ def get_ntp_server(self):
+ ntp_config = self._generic_cmd("NetWork.NetNTP", XMEyeMsgCode.GetConfigCmd)
+ return ntp_config.payload['NetWork.NetNTP']['Server']['Name']
+
+ def set_ntp_server(self,
+ ntp_host: str,
+ ntp_port: int = 123) -> None:
+ data = {
+ 'Name': 'NetWork.NetNTP',
+ 'NetWork.NetNTP': {
+ 'Enable': True,
+ 'Server': {
+ 'Address': '0x00000000',
+ 'Anonymity': False,
+ 'Name': ntp_host,
+ 'Password': '',
+ 'Port': ntp_port,
+ 'UserName': ''
+ },
+ 'TimeZone': 13, # Moscow time
+ 'UpdatePeriod': 60
+ },
+ "SessionID": "0x%x" % (self._session,),
+ }
+ command = XMEyeMessage(station=0xff, session=self._session, msgcode=XMEyeMsgCode.SetConfigCmd, message=data)
+ self._tx_rx(command)
+
+ def _opmonitor_cmd(self, action, msgcode, rx_msg=True):
+ data = {
+ "Name": "OPMonitor",
+ "OPMonitor": {
+ "Action": action,
+ "Parameter": {
+ "Channel": 0,
+ "CombinMode": "CONNECT_ALL",
+ "StreamType": "Main",
+ "TransMode": "TCP",
+ },
+ },
+ "SessionID": "0x%x" % (self._session,),
+ }
+ command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
+ if rx_msg:
+ return self._tx_rx(command)
+ else:
+ self._tx(command)
+
+ def get_stream(self, packet_callback):
+ self._opmonitor_cmd("Claim", XMEyeMsgCode.OPMonitorClaimCmd)
+ self._opmonitor_cmd("Start", XMEyeMsgCode.OPMonitorStartStopCmd, rx_msg=False)
+ while True:
+ rx_pkt = self._rx()
+ packet_callback(rx_pkt)
+
+ # def playback_stream(self):
+ # mplayer_process = subprocess.Popen(["mplayer", "-demuxer", "h264es", "-"], stdin=subprocess.PIPE)
+ # with open("audio.raw", "wb") as f, open("video.raw", "wb") as video_f:
+ # def pkt_callback(pkt):
+ # if (pkt.header.station == 511) and (pkt.header.msgcode == XMEyeMsgCode.VideoStreamData):
+ # avpayload = AudioVideoPayload(pkt.payload, hint="video")
+ # mplayer_process.stdin.raw.write(pkt.payload)
+ # video_f.write(pkt.payload)
+ # elif pkt.header.msgcode == XMEyeMsgCode.VideoStreamData:
+ # # Audio data?
+ # avpayload = AudioVideoPayload(pkt.payload, hint="audio")
+ # f.write(avpayload.data)
+ # elif pkt.header.msgcode != XMEyeMsgCode.VideoStreamData:
+ # print(pkt)
+ #
+ # self.get_stream(pkt_callback)
+
+
+if __name__ == '__main__':
+ cam = XMEyeCamera("192.168.0.47", password="DerPr03ess")
+ cam.login()
+ print(cam.get_systeminfo())
+ print(cam.get_channel_title())
+ # print(cam.get_talk_audio_format())
+ # print(cam.get_system_function())
diff --git a/include/py/homekit/camera/config.py b/include/py/homekit/camera/config.py
index 0ed75cf..9685cab 100644
--- a/include/py/homekit/camera/config.py
+++ b/include/py/homekit/camera/config.py
@@ -132,6 +132,9 @@ class IpcamConfig(ConfigUnit):
# def get_cam_server_and_disk(self, cam: int) -> tuple[str, int]:
# return self['cameras'][cam]['server'], self['cameras'][cam]['disk']
+ def has_camera(self, camera: int) -> bool:
+ return camera in tuple(self['cameras'].keys())
+
def get_camera_container(self, camera: int) -> VideoContainerType:
return self.get_camera_type(camera).get_container()
@@ -143,3 +146,9 @@ class IpcamConfig(ConfigUnit):
def get_camera_ip(self, camera: int) -> str:
return self['camera_ip_template'] % (str(camera),)
+
+ def is_camera_enabled(self, camera: int) -> bool:
+ try:
+ return self['cameras'][camera]['enabled']
+ except KeyError:
+ return True
diff --git a/include/py/homekit/camera/hikvision/__init__.py b/include/py/homekit/camera/hikvision/__init__.py
new file mode 100644
index 0000000..72d6ae3
--- /dev/null
+++ b/include/py/homekit/camera/hikvision/__init__.py
@@ -0,0 +1 @@
+from .isapi import ISAPIClient, ResponseError, AuthError
diff --git a/include/py/homekit/camera/hikvision/isapi.py b/include/py/homekit/camera/hikvision/isapi.py
new file mode 100644
index 0000000..6cc34f8
--- /dev/null
+++ b/include/py/homekit/camera/hikvision/isapi.py
@@ -0,0 +1,137 @@
+import requests
+
+from time import time
+from .util import xml_to_dict, sha256_hex
+from ...util import validate_ipv4
+from ...http import HTTPMethod
+from typing import Optional, Union
+
+
+class ResponseError(RuntimeError):
+ pass
+
+
+class AuthError(ResponseError):
+ pass
+
+
+class ISAPIClient:
+ def __init__(self, host):
+ self.host = host
+ self.cookies = {}
+
+ def call(self,
+ path: str,
+ method: HTTPMethod = HTTPMethod.GET,
+ data: Optional[Union[dict, str, bytes]] = None,
+ raise_for_status=True,
+ check_put_response=False):
+ f = getattr(requests, method.value.lower())
+
+ headers = {
+ 'X-Requested-With': 'XMLHttpRequest',
+ }
+ if method in (HTTPMethod.PUT, HTTPMethod.POST):
+ headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
+
+ kwargs = {}
+ if data:
+ kwargs['data' if method is not HTTPMethod.GET else 'params'] = data
+ if len(self.cookies) > 0:
+ kwargs['cookies'] = self.cookies
+
+ r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs)
+
+ if raise_for_status or check_put_response:
+ r.raise_for_status()
+
+ parsed_xml = None
+ if check_put_response:
+ parsed_xml = xml_to_dict(r.text)
+ resp = parsed_xml['ResponseStatus']
+ status_code = int(resp['statusCode'][0])
+ status_string = resp['statusString'][0]
+ if status_code != 1 or status_string.lower() != 'ok':
+ raise ResponseError('response status looks bad')
+
+ self.cookies.update(r.cookies.get_dict())
+
+ if parsed_xml is None:
+ parsed_xml = xml_to_dict(r.text)
+
+ return parsed_xml
+
+ def auth(self, username: str, password: str):
+ xml = self.call('Security/sessionLogin/capabilities', data={'username': username})
+ caps = xml['SessionLoginCap']
+ is_irreversible = caps['isIrreversible'][0].lower() == 'true'
+
+ # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
+ # also look into webAuth.js and utils.js
+
+ if 'salt' in caps and is_irreversible:
+ p = sha256_hex(username + caps['salt'][0] + password)
+ p = sha256_hex(p + caps['challenge'][0])
+ for i in range(int(caps['iterations'][0])-2):
+ p = sha256_hex(p)
+ else:
+ p = sha256_hex(password) + caps['challenge'][0]
+ for i in range(int(caps['iterations'][0])-1):
+ p = sha256_hex(p)
+
+ data = '<SessionLogin>'
+ data += f'<userName>{username}</userName>'
+ data += f'<password>{p}</password>'
+ data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
+ data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
+ data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
+ data += '</SessionLogin>'
+
+ resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin']
+ status_value = int(resp['statusValue'][0])
+ status_string = resp['statusString'][0]
+ if status_value != 200:
+ raise AuthError(f'{status_value}: {status_string}')
+
+ def get_ntp_server(self) -> str:
+ try:
+ # works on newer 1080p cams
+ xml = self.call('System/time/ntpServers/capabilities')
+ ntp_server = xml['NTPServerList']['NTPServer'][0]
+ if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
+ ntp_host = ntp_server['hostName'][0]
+ else:
+ ntp_host = ntp_server['ipAddress'][0]
+
+ except requests.exceptions.HTTPError:
+ # works on older 720p cams
+ ntp_server = self.call('System/time/ntpServers/1')['NTPServer']
+ if ntp_server['addressingFormatType'][0] == 'hostname':
+ ntp_host = ntp_server['hostName'][0]
+ else:
+ ntp_host = ntp_server['ipAddress'][0]
+
+ return ntp_host
+
+ def set_timezone(self):
+ data = '<?xml version="1.0" encoding="UTF-8"?>'
+ data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
+ self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True)
+
+ def set_ntp_server(self,
+ ntp_host: str,
+ ntp_port: int = 123):
+ format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
+
+ # test ntp server first
+ data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
+ resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult']
+ error_code = int(resp['errorCode'][0])
+ error_description = resp['errorDescription'][0]
+ if error_code != 0 or error_description.lower() != 'ok':
+ raise ResponseError('response status looks bad')
+
+ # then set it
+ data = '<?xml version="1.0" encoding="UTF-8"?>'
+ data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
+ self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True)
diff --git a/include/py/homekit/camera/hikvision/util.py b/include/py/homekit/camera/hikvision/util.py
new file mode 100644
index 0000000..581c6ea
--- /dev/null
+++ b/include/py/homekit/camera/hikvision/util.py
@@ -0,0 +1,48 @@
+import requests
+import hashlib
+import xml.etree.ElementTree as ET
+
+
+def xml_to_dict(xml_data: str) -> dict:
+ # Parse the XML data
+ root = ET.fromstring(xml_data)
+
+ # Function to remove namespace from the tag name
+ def remove_namespace(tag):
+ return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
+
+ # Function to recursively convert XML elements to a dictionary
+ def elem_to_dict(elem):
+ tag = remove_namespace(elem.tag)
+ elem_dict = {tag: {}}
+
+ # If the element has attributes, add them to the dictionary
+ elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
+
+ # Handle the element's text content, if present and not just whitespace
+ text = elem.text.strip() if elem.text and elem.text.strip() else None
+ if text:
+ elem_dict[tag]['#text'] = text
+
+ # Process child elements
+ for child in elem:
+ child_dict = elem_to_dict(child)
+ child_tag = remove_namespace(child.tag)
+ if child_tag not in elem_dict[tag]:
+ elem_dict[tag][child_tag] = []
+ elem_dict[tag][child_tag].append(child_dict[child_tag])
+
+ # Simplify structure if there's only text or no children and no attributes
+ if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
+ return {tag: elem_dict[tag]['#text']}
+ elif not elem_dict[tag]:
+ return {tag: ''}
+
+ return elem_dict
+
+ # Convert the root element to dictionary
+ return elem_to_dict(root)
+
+
+def sha256_hex(input_string: str) -> str:
+ return hashlib.sha256(input_string.encode()).hexdigest()
diff --git a/include/py/homekit/camera/types.py b/include/py/homekit/camera/types.py
index ec4663b..0aaadef 100644
--- a/include/py/homekit/camera/types.py
+++ b/include/py/homekit/camera/types.py
@@ -44,6 +44,9 @@ class CameraType(Enum):
def is_hikvision(self) -> bool:
return self in (CameraType.HIKVISION_264, CameraType.HIKVISION_265)
+ def is_ali(self) -> bool:
+ return self == CameraType.ALIEXPRESS_NONAME
+
class TimeFilterType(Enum):
FIX = 'fix'
diff --git a/include/py/homekit/http/http.py b/include/py/homekit/http/http.py
index 82c5aae..8819c46 100644
--- a/include/py/homekit/http/http.py
+++ b/include/py/homekit/http/http.py
@@ -113,3 +113,4 @@ class HTTPServer:
class HTTPMethod(Enum):
GET = 'GET'
POST = 'POST'
+ PUT = 'PUT'