aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2024-02-17 23:20:49 +0300
committerEvgeny Zinoviev <me@ch1p.io>2024-02-17 23:20:49 +0300
commit70b4a4f044cac8052bb0af7c585572e54489ea2f (patch)
tree5611bfb19af4d847b017df5d79f89b4bbb50e9f1
parent77b80dd9b3500539b24b7dc6258c02c23fd4d015 (diff)
ipcam_ntp_util: support chinese noname cameras
-rw-r--r--LICENSE2
-rwxr-xr-xbin/ipcam_ntp_util.py267
-rw-r--r--include/py/homekit/camera/__init__.py2
-rw-r--r--include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py42
-rw-r--r--include/py/homekit/camera/alinoname/__init__.py1
-rwxr-xr-xinclude/py/homekit/camera/alinoname/nwipcam.py326
-rw-r--r--include/py/homekit/camera/config.py9
-rw-r--r--include/py/homekit/camera/hikvision/__init__.py1
-rw-r--r--include/py/homekit/camera/hikvision/isapi.py137
-rw-r--r--include/py/homekit/camera/hikvision/util.py48
-rw-r--r--include/py/homekit/camera/types.py3
-rw-r--r--include/py/homekit/http/http.py1
12 files changed, 635 insertions, 204 deletions
diff --git a/LICENSE b/LICENSE
index 0de44f6..589a31b 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright 2022, Evgeny Zinoviev
+Copyright 2022-2024, Evgeny Zinoviev
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
diff --git a/bin/ipcam_ntp_util.py b/bin/ipcam_ntp_util.py
index b27995c..81f6fe0 100755
--- a/bin/ipcam_ntp_util.py
+++ b/bin/ipcam_ntp_util.py
@@ -1,16 +1,13 @@
#!/usr/bin/env python3
import __py_include
-import requests
-import hashlib
-import xml.etree.ElementTree as ET
+import homekit.camera.hikvision as hikvision
+import homekit.camera.alinoname as alinoname
from enum import Enum, auto
-from time import time
from typing import Optional
from argparse import ArgumentParser, ArgumentError
-from homekit.util import validate_ipv4, validate_ipv4_or_hostname
-from homekit.camera import IpcamConfig
-
+from homekit.util import validate_ipv4_or_hostname
+from homekit.camera import IpcamConfig, CameraType
ipcam_config = IpcamConfig()
@@ -20,216 +17,63 @@ class Action(Enum):
SET_NTP = auto()
-def xml_to_dict(xml_data: str) -> dict:
- # Parse the XML data
- root = ET.fromstring(xml_data)
-
- # Function to remove namespace from the tag name
- def remove_namespace(tag):
- return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
-
- # Function to recursively convert XML elements to a dictionary
- def elem_to_dict(elem):
- tag = remove_namespace(elem.tag)
- elem_dict = {tag: {}}
-
- # If the element has attributes, add them to the dictionary
- elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
-
- # Handle the element's text content, if present and not just whitespace
- text = elem.text.strip() if elem.text and elem.text.strip() else None
- if text:
- elem_dict[tag]['#text'] = text
-
- # Process child elements
- for child in elem:
- child_dict = elem_to_dict(child)
- child_tag = remove_namespace(child.tag)
- if child_tag not in elem_dict[tag]:
- elem_dict[tag][child_tag] = []
- elem_dict[tag][child_tag].append(child_dict[child_tag])
-
- # Simplify structure if there's only text or no children and no attributes
- if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
- return {tag: elem_dict[tag]['#text']}
- elif not elem_dict[tag]:
- return {tag: ''}
-
- return elem_dict
-
- # Convert the root element to dictionary
- return elem_to_dict(root)
-
-
-def sha256_hex(input_string: str) -> str:
- return hashlib.sha256(input_string.encode()).hexdigest()
-
-
-class ResponseError(RuntimeError):
- pass
-
-
-class AuthError(ResponseError):
- pass
-
-
-class HikvisionISAPIClient:
- def __init__(self, host):
- self.host = host
- self.cookies = {}
-
- def auth(self, username: str, password: str):
- r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'),
- {'username': username},
- headers={
- 'X-Requested-With': 'XMLHttpRequest',
- })
- r.raise_for_status()
- caps = xml_to_dict(r.text)['SessionLoginCap']
- is_irreversible = caps['isIrreversible'][0].lower() == 'true'
-
- # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
- # also look into webAuth.js and utils.js
-
- if 'salt' in caps and is_irreversible:
- p = sha256_hex(username + caps['salt'][0] + password)
- p = sha256_hex(p + caps['challenge'][0])
- for i in range(int(caps['iterations'][0])-2):
- p = sha256_hex(p)
- else:
- p = sha256_hex(password) + caps['challenge'][0]
- for i in range(int(caps['iterations'][0])-1):
- p = sha256_hex(p)
-
- data = '<SessionLogin>'
- data += f'<userName>{username}</userName>'
- data += f'<password>{p}</password>'
- data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
- data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
- data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
- data += '</SessionLogin>'
-
- r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={
- 'Accept-Encoding': 'gzip, deflate',
- 'If-Modified-Since': '0',
- 'X-Requested-With': 'XMLHttpRequest',
- 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
- })
- r.raise_for_status()
- resp = xml_to_dict(r.text)['SessionLogin']
- status_value = int(resp['statusValue'][0])
- status_string = resp['statusString'][0]
- if status_value != 200:
- raise AuthError(f'{status_value}: {status_string}')
-
- self.cookies = r.cookies.get_dict()
-
- def get_ntp_server(self) -> str:
- r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies)
- r.raise_for_status()
- ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0]
-
- if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
- ntp_host = ntp_server['hostName'][0]
- else:
- ntp_host = ntp_server['ipAddress'][0]
-
- return ntp_host
-
- def set_timezone(self):
- data = '<?xml version="1.0" encoding="UTF-8"?>'
- data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
-
- r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={
- 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
- 'X-Requested-With': 'XMLHttpRequest',
- })
- self.isapi_check_put_response(r)
-
- def set_ntp_server(self,
- ntp_host: str,
- ntp_port: int = 123):
- format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
-
- # test ntp server first
- data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
- r = requests.post(self.isapi_uri('System/time/ntpServers/test'), data=data, headers={
- 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
- 'X-Requested-With': 'XMLHttpRequest',
- }, cookies=self.cookies)
- r.raise_for_status()
-
- resp = xml_to_dict(r.text)['NTPTestResult']
-
- error_code = int(resp['errorCode'][0])
- error_description = resp['errorDescription'][0]
-
- if error_code != 0 or error_description.lower() != 'ok':
- raise ResponseError('response status looks bad')
+def process_camera(host: str,
+ action: Action,
+ login: str,
+ password: str,
+ camera_type: CameraType,
+ ntp_server: Optional[str] = None):
+ if camera_type.is_hikvision():
+ client = hikvision.ISAPIClient(host)
+ try:
+ client.auth(login, password)
+ if action == Action.GET_NTP:
+ print(f'[{host}] {client.get_ntp_server()}')
+ return
- # then set it
- data = '<?xml version="1.0" encoding="UTF-8"?>'
- data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
+ client.set_ntp_server(ntp_server)
+ print(f'[{host}] done')
- r = requests.put(self.isapi_uri('System/time/ntpServers/1'),
- data=data,
- cookies=self.cookies,
- headers={
- 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
- 'X-Requested-With': 'XMLHttpRequest',
- })
- self.isapi_check_put_response(r)
+ except hikvision.AuthError as e:
+ print(f'[{host}] ({str(e)})')
- def isapi_uri(self, path: str) -> str:
- return f'http://{self.host}/ISAPI/{path}'
+ except hikvision.ResponseError as e:
+ print(f'[{host}] ({str(e)})')
- def isapi_check_put_response(self, r):
+ elif camera_type.is_ali():
try:
- r.raise_for_status()
- except requests.exceptions.HTTPError as e:
- # print(r.text)
- raise e
-
- resp = xml_to_dict(r.text)['ResponseStatus']
-
- status_code = int(resp['statusCode'][0])
- status_string = resp['statusString'][0]
+ client = alinoname.XMEyeCamera(hostname=host, username=login, password=password)
+ client.login()
- if status_code != 1 or status_string.lower() != 'ok':
- raise ResponseError('response status looks bad')
+ if action == Action.GET_NTP:
+ print(f'[{host}] {client.get_ntp_server()}')
+ return
+ client.set_ntp_server(ntp_server)
+ print(f'[{host}] done')
-def process_hikvision_camera(host: str,
- action: Action,
- login: str,
- password: str,
- ntp_server: Optional[str] = None):
- client = HikvisionISAPIClient(host)
- try:
- client.auth(login, password)
- if action == Action.GET_NTP:
- print(f'[{host}] {client.get_ntp_server()}')
- return
- client.set_ntp_server(ntp_server)
- print(f'[{host}] done')
- except AuthError as e:
- print(f'[{host}] ({str(e)})')
- except ResponseError as e:
- print(f'[{host}] ({str(e)})')
+ except OSError as e:
+ print(f'[{host}] ({str(e)})')
def main():
+ camera_types = ['hikvision', 'ali']
parser = ArgumentParser()
- parser.add_argument('--host', type=str)
+ parser.add_argument('--camera', type=str)
+ parser.add_argument('--camera-type', type=str, choices=camera_types)
parser.add_argument('--all', action='store_true')
+ parser.add_argument('--all-of-type', type=str, choices=camera_types)
parser.add_argument('--get-ntp-server', action='store_true')
parser.add_argument('--set-ntp-server', type=str)
parser.add_argument('--username', type=str)
parser.add_argument('--password', type=str)
args = parser.parse_args()
- if not args.host and not args.all:
- raise ArgumentError(None, 'either --all or --host <IP> is required')
+ if args.all and args.all_of_type:
+ raise ArgumentError(None, 'you can\'t pass both --all and --all-of-type')
+
+ if not args.camera and not args.all and not args.all_of_type:
+ raise ArgumentError(None, 'either --all, --all-of-type <TYPE> or --camera <NUM> is required')
if not args.get_ntp_server and not args.set_ntp_server:
raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required')
@@ -247,13 +91,32 @@ def main():
kwargs = {}
if args.set_ntp_server:
kwargs['ntp_server'] = args.set_ntp_server
- if not args.all:
- process_hikvision_camera(args.host, action, login, password, **kwargs)
+ if not args.all and not args.all_of_type:
+ if not args.camera_type:
+ raise ArgumentError(None, '--camera-type is required')
+
+ if not ipcam_config.has_camera(int(args.camera)):
+ raise ArgumentError(None, f'invalid camera {args.camera}')
+ camera_host = ipcam_config.get_camera_ip(args.camera)
+
+ if args.camera_type == 'hikvision':
+ camera_type = CameraType.HIKVISION_264
+ elif args.camera_type == 'ali':
+ camera_type = CameraType.ALIEXPRESS_NONAME
+ else:
+ raise ValueError('invalid --camera-type')
+ process_camera(camera_host, action, login, password, camera_type, **kwargs)
else:
for cam in ipcam_config.get_all_cam_names():
- if not ipcam_config.get_camera_type(cam).is_hikvision():
+ if not ipcam_config.is_camera_enabled(cam):
+ continue
+
+ cam_type = ipcam_config.get_camera_type(cam)
+ if args.all_of_type == 'hikvision' and not cam_type.is_hikvision():
+ continue
+ if args.all_of_type == 'ali' and not ipcam_config.get_camera_type(cam).is_ali():
continue
- process_hikvision_camera(ipcam_config.get_camera_ip(cam), action, login, password, **kwargs)
+ process_camera(ipcam_config.get_camera_ip(cam), action, login, password, cam_type, **kwargs)
if __name__ == '__main__':
diff --git a/include/py/homekit/camera/__init__.py b/include/py/homekit/camera/__init__.py
index 4875031..7f8714b 100644
--- a/include/py/homekit/camera/__init__.py
+++ b/include/py/homekit/camera/__init__.py
@@ -1,2 +1,2 @@
from .types import CameraType, VideoContainerType, VideoCodecType, CaptureType
-from .config import IpcamConfig \ No newline at end of file
+from .config import IpcamConfig
diff --git a/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py b/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py
new file mode 100644
index 0000000..9423382
--- /dev/null
+++ b/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py
@@ -0,0 +1,42 @@
+# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera
+# Copyright (C) 2019-2019 Johannes Bauer
+#
+# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam).
+#
+# numenworld-ipcam is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; this program is ONLY licensed under
+# version 3 of the License, later versions are explicitly excluded.
+#
+# numenworld-ipcam is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with numenworld-ipcam; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Johannes Bauer <JohannesBauer@gmx.de>
+
+import hashlib
+
+
+class HorrificallyBrokenPasswordFunction():
+ @classmethod
+ def derive(self, passphrase):
+ alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+ assert (len(alphabet) == 62)
+ passphrase = passphrase.encode("utf-8")
+ hashval = hashlib.md5(passphrase).digest()
+ encoded = ""
+ for i in range(0, 16, 2):
+ index = (hashval[i] + hashval[i + 1]) % len(alphabet)
+ char = alphabet[index]
+ encoded += char
+ return encoded
+
+
+if __name__ == "__main__":
+ assert (HorrificallyBrokenPasswordFunction.derive("") == "tlJwpbo6")
+ assert (HorrificallyBrokenPasswordFunction.derive("abc") == "LkM7s2Ht")
diff --git a/include/py/homekit/camera/alinoname/__init__.py b/include/py/homekit/camera/alinoname/__init__.py
new file mode 100644
index 0000000..bce9919
--- /dev/null
+++ b/include/py/homekit/camera/alinoname/__init__.py
@@ -0,0 +1 @@
+from .nwipcam import XMEyeCamera
diff --git a/include/py/homekit/camera/alinoname/nwipcam.py b/include/py/homekit/camera/alinoname/nwipcam.py
new file mode 100755
index 0000000..e54ec62
--- /dev/null
+++ b/include/py/homekit/camera/alinoname/nwipcam.py
@@ -0,0 +1,326 @@
+#!/usr/bin/python3
+# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera
+# Copyright (C) 2019-2019 Johannes Bauer
+#
+# Changes and improvements:
+# Copyright (C) 2024 Evgeny Zinoviev
+#
+# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam).
+#
+# numenworld-ipcam is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; this program is ONLY licensed under
+# version 3 of the License, later versions are explicitly excluded.
+#
+# numenworld-ipcam is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with numenworld-ipcam; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Johannes Bauer <JohannesBauer@gmx.de>
+
+import collections
+import struct
+import socket
+import enum
+import json
+import subprocess
+
+from .HorrificallyBrokenPasswordFunction import HorrificallyBrokenPasswordFunction
+
+
+class XMEyeMsgCode(enum.IntEnum):
+ LoginCmd = 1000
+ LoginReply = LoginCmd + 1
+
+ KeepAliveCmd = 1006
+ KeepAliveReply = KeepAliveCmd + 1
+
+ SetConfigCmd = 1040
+ SetConfigReply = SetConfigCmd + 1
+
+ GetConfigCmd = 1042
+ GetConfidReply = GetConfigCmd + 1
+
+ GetSystemInfoCmd = 1020
+ GetSystemInfoReply = GetSystemInfoCmd + 1
+
+ ChannelTitleCmd = 1048
+ ChannelTitleReply = ChannelTitleCmd + 1
+
+ SystemFunctionCmd = 1360
+ SystemFunctionReply = SystemFunctionCmd + 1
+
+ OPMonitorStartStopCmd = 1410
+ OPMonitorStartStopReply = OPMonitorStartStopCmd + 1
+
+ OPMonitorClaimCmd = 1413
+ OPMonitorClaimReply = OPMonitorClaimCmd + 1
+
+ OPTimeQueryCmd = 1452
+ OPTimeQueryReply = OPTimeQueryCmd + 1
+
+ VideoStreamData = 1412
+
+
+class AudioVideoDataType(enum.IntEnum):
+ VideoIncomplete = 0xfc
+ VideoComplete = 0xfd
+ AudioComplete = 0xfa
+
+
+class AudioVideoPayload():
+ _HeaderFields = collections.namedtuple("HeaderFields", ["unknown1", "channel", "datatype", "unknown2", "length"])
+ _HeaderStruct = struct.Struct("< H B B H H")
+
+ def __init__(self, payload, hint=""):
+ self._header = self._HeaderFields(*self._HeaderStruct.unpack(payload[:self._HeaderStruct.size]))
+ print(
+ "%20s [%5d]: %s %s" % (hint, len(payload), " ".join("%02x" % (c) for c in payload[:8]), str(self._header)))
+ # if len(payload) != (self._HeaderStruct.size + self._header.length):
+ # raise Exception("Unexpected AV payload, expected %d bytes but got %d." % (self._HeaderStruct.size + self._header.length, len(payload)))
+ # print(self._header)
+ self._data = payload[self._HeaderStruct.size:]
+
+ @property
+ def data(self):
+ return self._data
+
+
+class XMEyeMessage():
+ _HeaderFields = collections.namedtuple("HeaderFields", ["station", "session", "unknown", "msgcode", "length"])
+ _HeaderStruct = struct.Struct("< L L 6s H L")
+
+ def __init__(self, station, session, msgcode, message):
+ if isinstance(message, (bytes, bytearray)):
+ self._message = bytes(message)
+ else:
+ self._message = json.dumps(message).encode("ascii")
+ self._header = self._HeaderFields(station=station,
+ session=session,
+ unknown=bytes(6),
+ msgcode=msgcode,
+ length=len(self._message))
+
+ @property
+ def header(self):
+ return self._header
+
+ @property
+ def message(self):
+ return self._message
+
+ @property
+ def payload(self):
+ msg = self.message.rstrip(bytes(1))
+ try:
+ data = json.loads(msg)
+ except (json.JSONDecodeError, UnicodeError):
+ return self.message
+ return data
+
+ @property
+ def length(self):
+ return len(self.message)
+
+ def __bytes__(self):
+ header = self._HeaderStruct.pack(*self._header)
+ return header + self._message
+
+ @classmethod
+ def deserialize(cls, msg):
+ header_data = msg[:20]
+ header = cls._HeaderFields(*cls._HeaderStruct.unpack(header_data))
+ payload = msg[20: 20 + header.length]
+ if len(payload) < header.length:
+ payload += bytes(header.length - len(payload))
+
+ try:
+ msgcode = XMEyeMsgCode(header.msgcode)
+ except ValueError:
+ msgcode = header.msgcode
+ return cls(station=header.station, session=header.session, msgcode=msgcode, message=payload)
+
+ @classmethod
+ def deserialize_all(cls, msg):
+ msg = bytearray(msg)
+ while len(msg) >= 20:
+ next_msg = cls.deserialize(msg)
+ yield next_msg
+ msg = msg[20 + next_msg.length:]
+
+ def dump(self):
+ print("%s (%d bytes payload):" % (self._header.msgcode, self.length))
+ if isinstance(self.payload, bytes):
+ print(self.payload)
+ else:
+ print(json.dumps(self.payload, indent=4, sort_keys=True))
+ print()
+
+ def __repr__(self):
+ return "Msg(%s): %s" % (self.header, str(self.payload))
+
+
+class XMEyeCamera():
+ def __init__(self, hostname, username="admin", password="", port=34567):
+ self._hostname = hostname
+ self._conn = socket.create_connection((hostname, port))
+ self._session = None
+ self._username = username
+ self._password = password
+
+ @property
+ def derived_password(self):
+ return HorrificallyBrokenPasswordFunction.derive(self._password)
+
+ @property
+ def rtsp_uri(self):
+ rtsp_port = 554
+ return "rtsp://%s:%d/user=%s&password=%s&channel=" % (
+ self._hostname, rtsp_port, self._username, self.derived_password)
+
+ def _rx_bytes(self, length):
+ result = bytearray()
+ while len(result) < length:
+ remaining_bytes = length - len(result)
+ rx_data = self._conn.recv(remaining_bytes)
+ result += rx_data
+ return result
+
+ def _rx(self):
+ response_header = self._rx_bytes(20)
+ header = XMEyeMessage.deserialize(response_header)
+ payload_data = self._rx_bytes(header.length)
+
+ response_data = response_header + payload_data
+ response = XMEyeMessage.deserialize(response_data)
+ # print("<-", response)
+ return response
+
+ def _tx(self, command):
+ # print("->", command)
+ data = bytes(command)
+ self._conn.send(data)
+
+ def _tx_rx(self, command):
+ self._tx(command)
+ return self._rx()
+
+ def login(self):
+ data = {
+ "EncryptType": "MD5",
+ "LoginType": "DVRIP-Web",
+ "UserName": self._username,
+ "PassWord": self.derived_password,
+ }
+ command = XMEyeMessage(station=0xff, session=0, msgcode=XMEyeMsgCode.LoginCmd, message=data)
+ response = self._tx_rx(command)
+ if int(response.payload["Ret"]) == 100:
+ # Login successful
+ self._session = int(response.payload["SessionID"], 16)
+ else:
+ raise Exception("Login failed:", response)
+
+ def _generic_cmd(self, name, msgcode):
+ data = {
+ "Name": name,
+ "SessionID": "0x%x" % (self._session,),
+ }
+ command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
+ return self._tx_rx(command)
+
+ def get_systeminfo(self):
+ return self._generic_cmd("SystemInfo", XMEyeMsgCode.GetSystemInfoCmd)
+
+ def get_channel_title(self):
+ return self._generic_cmd("ChannelTitle", XMEyeMsgCode.ChannelTitleCmd)
+
+ def get_system_function(self):
+ return self._generic_cmd("SystemFunction", XMEyeMsgCode.SystemFunctionCmd)
+
+ def get_talk_audio_format(self):
+ return self._generic_cmd("TalkAudioFormat", XMEyeMsgCode.SystemFunctionCmd)
+
+ def get_ntp_server(self):
+ ntp_config = self._generic_cmd("NetWork.NetNTP", XMEyeMsgCode.GetConfigCmd)
+ return ntp_config.payload['NetWork.NetNTP']['Server']['Name']
+
+ def set_ntp_server(self,
+ ntp_host: str,
+ ntp_port: int = 123) -> None:
+ data = {
+ 'Name': 'NetWork.NetNTP',
+ 'NetWork.NetNTP': {
+ 'Enable': True,
+ 'Server': {
+ 'Address': '0x00000000',
+ 'Anonymity': False,
+ 'Name': ntp_host,
+ 'Password': '',
+ 'Port': ntp_port,
+ 'UserName': ''
+ },
+ 'TimeZone': 13, # Moscow time
+ 'UpdatePeriod': 60
+ },
+ "SessionID": "0x%x" % (self._session,),
+ }
+ command = XMEyeMessage(station=0xff, session=self._session, msgcode=XMEyeMsgCode.SetConfigCmd, message=data)
+ self._tx_rx(command)
+
+ def _opmonitor_cmd(self, action, msgcode, rx_msg=True):
+ data = {
+ "Name": "OPMonitor",
+ "OPMonitor": {
+ "Action": action,
+ "Parameter": {
+ "Channel": 0,
+ "CombinMode": "CONNECT_ALL",
+ "StreamType": "Main",
+ "TransMode": "TCP",
+ },
+ },
+ "SessionID": "0x%x" % (self._session,),
+ }
+ command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
+ if rx_msg:
+ return self._tx_rx(command)
+ else:
+ self._tx(command)
+
+ def get_stream(self, packet_callback):
+ self._opmonitor_cmd("Claim", XMEyeMsgCode.OPMonitorClaimCmd)
+ self._opmonitor_cmd("Start", XMEyeMsgCode.OPMonitorStartStopCmd, rx_msg=False)
+ while True:
+ rx_pkt = self._rx()
+ packet_callback(rx_pkt)
+
+ # def playback_stream(self):
+ # mplayer_process = subprocess.Popen(["mplayer", "-demuxer", "h264es", "-"], stdin=subprocess.PIPE)
+ # with open("audio.raw", "wb") as f, open("video.raw", "wb") as video_f:
+ # def pkt_callback(pkt):
+ # if (pkt.header.station == 511) and (pkt.header.msgcode == XMEyeMsgCode.VideoStreamData):
+ # avpayload = AudioVideoPayload(pkt.payload, hint="video")
+ # mplayer_process.stdin.raw.write(pkt.payload)
+ # video_f.write(pkt.payload)
+ # elif pkt.header.msgcode == XMEyeMsgCode.VideoStreamData:
+ # # Audio data?
+ # avpayload = AudioVideoPayload(pkt.payload, hint="audio")
+ # f.write(avpayload.data)
+ # elif pkt.header.msgcode != XMEyeMsgCode.VideoStreamData:
+ # print(pkt)
+ #
+ # self.get_stream(pkt_callback)
+
+
+if __name__ == '__main__':
+ cam = XMEyeCamera("192.168.0.47", password="DerPr03ess")
+ cam.login()
+ print(cam.get_systeminfo())
+ print(cam.get_channel_title())
+ # print(cam.get_talk_audio_format())
+ # print(cam.get_system_function())
diff --git a/include/py/homekit/camera/config.py b/include/py/homekit/camera/config.py
index 0ed75cf..9685cab 100644
--- a/include/py/homekit/camera/config.py
+++ b/include/py/homekit/camera/config.py
@@ -132,6 +132,9 @@ class IpcamConfig(ConfigUnit):
# def get_cam_server_and_disk(self, cam: int) -> tuple[str, int]:
# return self['cameras'][cam]['server'], self['cameras'][cam]['disk']
+ def has_camera(self, camera: int) -> bool:
+ return camera in tuple(self['cameras'].keys())
+
def get_camera_container(self, camera: int) -> VideoContainerType:
return self.get_camera_type(camera).get_container()
@@ -143,3 +146,9 @@ class IpcamConfig(ConfigUnit):
def get_camera_ip(self, camera: int) -> str:
return self['camera_ip_template'] % (str(camera),)
+
+ def is_camera_enabled(self, camera: int) -> bool:
+ try:
+ return self['cameras'][camera]['enabled']
+ except KeyError:
+ return True
diff --git a/include/py/homekit/camera/hikvision/__init__.py b/include/py/homekit/camera/hikvision/__init__.py
new file mode 100644
index 0000000..72d6ae3
--- /dev/null
+++ b/include/py/homekit/camera/hikvision/__init__.py
@@ -0,0 +1 @@
+from .isapi import ISAPIClient, ResponseError, AuthError
diff --git a/include/py/homekit/camera/hikvision/isapi.py b/include/py/homekit/camera/hikvision/isapi.py
new file mode 100644
index 0000000..6cc34f8
--- /dev/null
+++ b/include/py/homekit/camera/hikvision/isapi.py
@@ -0,0 +1,137 @@
+import requests
+
+from time import time
+from .util import xml_to_dict, sha256_hex
+from ...util import validate_ipv4
+from ...http import HTTPMethod
+from typing import Optional, Union
+
+
+class ResponseError(RuntimeError):
+ pass
+
+
+class AuthError(ResponseError):
+ pass
+
+
+class ISAPIClient:
+ def __init__(self, host):
+ self.host = host
+ self.cookies = {}
+
+ def call(self,
+ path: str,
+ method: HTTPMethod = HTTPMethod.GET,
+ data: Optional[Union[dict, str, bytes]] = None,
+ raise_for_status=True,
+ check_put_response=False):
+ f = getattr(requests, method.value.lower())
+
+ headers = {
+ 'X-Requested-With': 'XMLHttpRequest',
+ }
+ if method in (HTTPMethod.PUT, HTTPMethod.POST):
+ headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
+
+ kwargs = {}
+ if data:
+ kwargs['data' if method is not HTTPMethod.GET else 'params'] = data
+ if len(self.cookies) > 0:
+ kwargs['cookies'] = self.cookies
+
+ r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs)
+
+ if raise_for_status or check_put_response:
+ r.raise_for_status()
+
+ parsed_xml = None
+ if check_put_response:
+ parsed_xml = xml_to_dict(r.text)
+ resp = parsed_xml['ResponseStatus']
+ status_code = int(resp['statusCode'][0])
+ status_string = resp['statusString'][0]
+ if status_code != 1 or status_string.lower() != 'ok':
+ raise ResponseError('response status looks bad')
+
+ self.cookies.update(r.cookies.get_dict())
+
+ if parsed_xml is None:
+ parsed_xml = xml_to_dict(r.text)
+
+ return parsed_xml
+
+ def auth(self, username: str, password: str):
+ xml = self.call('Security/sessionLogin/capabilities', data={'username': username})
+ caps = xml['SessionLoginCap']
+ is_irreversible = caps['isIrreversible'][0].lower() == 'true'
+
+ # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
+ # also look into webAuth.js and utils.js
+
+ if 'salt' in caps and is_irreversible:
+ p = sha256_hex(username + caps['salt'][0] + password)
+ p = sha256_hex(p + caps['challenge'][0])
+ for i in range(int(caps['iterations'][0])-2):
+ p = sha256_hex(p)
+ else:
+ p = sha256_hex(password) + caps['challenge'][0]
+ for i in range(int(caps['iterations'][0])-1):
+ p = sha256_hex(p)
+
+ data = '<SessionLogin>'
+ data += f'<userName>{username}</userName>'
+ data += f'<password>{p}</password>'
+ data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
+ data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
+ data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
+ data += '</SessionLogin>'
+
+ resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin']
+ status_value = int(resp['statusValue'][0])
+ status_string = resp['statusString'][0]
+ if status_value != 200:
+ raise AuthError(f'{status_value}: {status_string}')
+
+ def get_ntp_server(self) -> str:
+ try:
+ # works on newer 1080p cams
+ xml = self.call('System/time/ntpServers/capabilities')
+ ntp_server = xml['NTPServerList']['NTPServer'][0]
+ if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
+ ntp_host = ntp_server['hostName'][0]
+ else:
+ ntp_host = ntp_server['ipAddress'][0]
+
+ except requests.exceptions.HTTPError:
+ # works on older 720p cams
+ ntp_server = self.call('System/time/ntpServers/1')['NTPServer']
+ if ntp_server['addressingFormatType'][0] == 'hostname':
+ ntp_host = ntp_server['hostName'][0]
+ else:
+ ntp_host = ntp_server['ipAddress'][0]
+
+ return ntp_host
+
+ def set_timezone(self):
+ data = '<?xml version="1.0" encoding="UTF-8"?>'
+ data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
+ self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True)
+
+ def set_ntp_server(self,
+ ntp_host: str,
+ ntp_port: int = 123):
+ format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
+
+ # test ntp server first
+ data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
+ resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult']
+ error_code = int(resp['errorCode'][0])
+ error_description = resp['errorDescription'][0]
+ if error_code != 0 or error_description.lower() != 'ok':
+ raise ResponseError('response status looks bad')
+
+ # then set it
+ data = '<?xml version="1.0" encoding="UTF-8"?>'
+ data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
+ self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True)
diff --git a/include/py/homekit/camera/hikvision/util.py b/include/py/homekit/camera/hikvision/util.py
new file mode 100644
index 0000000..581c6ea
--- /dev/null
+++ b/include/py/homekit/camera/hikvision/util.py
@@ -0,0 +1,48 @@
+import requests
+import hashlib
+import xml.etree.ElementTree as ET
+
+
+def xml_to_dict(xml_data: str) -> dict:
+ # Parse the XML data
+ root = ET.fromstring(xml_data)
+
+ # Function to remove namespace from the tag name
+ def remove_namespace(tag):
+ return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
+
+ # Function to recursively convert XML elements to a dictionary
+ def elem_to_dict(elem):
+ tag = remove_namespace(elem.tag)
+ elem_dict = {tag: {}}
+
+ # If the element has attributes, add them to the dictionary
+ elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
+
+ # Handle the element's text content, if present and not just whitespace
+ text = elem.text.strip() if elem.text and elem.text.strip() else None
+ if text:
+ elem_dict[tag]['#text'] = text
+
+ # Process child elements
+ for child in elem:
+ child_dict = elem_to_dict(child)
+ child_tag = remove_namespace(child.tag)
+ if child_tag not in elem_dict[tag]:
+ elem_dict[tag][child_tag] = []
+ elem_dict[tag][child_tag].append(child_dict[child_tag])
+
+ # Simplify structure if there's only text or no children and no attributes
+ if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
+ return {tag: elem_dict[tag]['#text']}
+ elif not elem_dict[tag]:
+ return {tag: ''}
+
+ return elem_dict
+
+ # Convert the root element to dictionary
+ return elem_to_dict(root)
+
+
+def sha256_hex(input_string: str) -> str:
+ return hashlib.sha256(input_string.encode()).hexdigest()
diff --git a/include/py/homekit/camera/types.py b/include/py/homekit/camera/types.py
index ec4663b..0aaadef 100644
--- a/include/py/homekit/camera/types.py
+++ b/include/py/homekit/camera/types.py
@@ -44,6 +44,9 @@ class CameraType(Enum):
def is_hikvision(self) -> bool:
return self in (CameraType.HIKVISION_264, CameraType.HIKVISION_265)
+ def is_ali(self) -> bool:
+ return self == CameraType.ALIEXPRESS_NONAME
+
class TimeFilterType(Enum):
FIX = 'fix'
diff --git a/include/py/homekit/http/http.py b/include/py/homekit/http/http.py
index 82c5aae..8819c46 100644
--- a/include/py/homekit/http/http.py
+++ b/include/py/homekit/http/http.py
@@ -113,3 +113,4 @@ class HTTPServer:
class HTTPMethod(Enum):
GET = 'GET'
POST = 'POST'
+ PUT = 'PUT'