diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2024-02-25 23:07:46 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2024-02-25 23:07:46 +0300 |
commit | c4f87ddad4058c0f331446fdfd8d762b8fc26c18 (patch) | |
tree | 4c64e4b685670939c6401b4bdfdbeb6e4ed6a9fd /include/py/homekit/camera | |
parent | d43ca74063d8d931325c4498b02f40bb03e9e104 (diff) |
homekit: move hikvision and xmeye api stuff out of homekit package
Diffstat (limited to 'include/py/homekit/camera')
-rw-r--r-- | include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py | 42 | ||||
-rw-r--r-- | include/py/homekit/camera/alinoname/__init__.py | 1 | ||||
-rwxr-xr-x | include/py/homekit/camera/alinoname/nwipcam.py | 326 | ||||
-rw-r--r-- | include/py/homekit/camera/hikvision/__init__.py | 1 | ||||
-rw-r--r-- | include/py/homekit/camera/hikvision/isapi.py | 137 | ||||
-rw-r--r-- | include/py/homekit/camera/hikvision/util.py | 48 |
6 files changed, 0 insertions, 555 deletions
diff --git a/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py b/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py deleted file mode 100644 index 9423382..0000000 --- a/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py +++ /dev/null @@ -1,42 +0,0 @@ -# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera -# Copyright (C) 2019-2019 Johannes Bauer -# -# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam). -# -# numenworld-ipcam is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; this program is ONLY licensed under -# version 3 of the License, later versions are explicitly excluded. -# -# numenworld-ipcam is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with numenworld-ipcam; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# Johannes Bauer <JohannesBauer@gmx.de> - -import hashlib - - -class HorrificallyBrokenPasswordFunction(): - @classmethod - def derive(self, passphrase): - alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" - assert (len(alphabet) == 62) - passphrase = passphrase.encode("utf-8") - hashval = hashlib.md5(passphrase).digest() - encoded = "" - for i in range(0, 16, 2): - index = (hashval[i] + hashval[i + 1]) % len(alphabet) - char = alphabet[index] - encoded += char - return encoded - - -if __name__ == "__main__": - assert (HorrificallyBrokenPasswordFunction.derive("") == "tlJwpbo6") - assert (HorrificallyBrokenPasswordFunction.derive("abc") == "LkM7s2Ht") diff --git a/include/py/homekit/camera/alinoname/__init__.py b/include/py/homekit/camera/alinoname/__init__.py deleted file mode 100644 index bce9919..0000000 --- a/include/py/homekit/camera/alinoname/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .nwipcam import XMEyeCamera diff --git a/include/py/homekit/camera/alinoname/nwipcam.py b/include/py/homekit/camera/alinoname/nwipcam.py deleted file mode 100755 index e54ec62..0000000 --- a/include/py/homekit/camera/alinoname/nwipcam.py +++ /dev/null @@ -1,326 +0,0 @@ -#!/usr/bin/python3 -# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera -# Copyright (C) 2019-2019 Johannes Bauer -# -# Changes and improvements: -# Copyright (C) 2024 Evgeny Zinoviev -# -# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam). -# -# numenworld-ipcam is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; this program is ONLY licensed under -# version 3 of the License, later versions are explicitly excluded. -# -# numenworld-ipcam is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with numenworld-ipcam; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# Johannes Bauer <JohannesBauer@gmx.de> - -import collections -import struct -import socket -import enum -import json -import subprocess - -from .HorrificallyBrokenPasswordFunction import HorrificallyBrokenPasswordFunction - - -class XMEyeMsgCode(enum.IntEnum): - LoginCmd = 1000 - LoginReply = LoginCmd + 1 - - KeepAliveCmd = 1006 - KeepAliveReply = KeepAliveCmd + 1 - - SetConfigCmd = 1040 - SetConfigReply = SetConfigCmd + 1 - - GetConfigCmd = 1042 - GetConfidReply = GetConfigCmd + 1 - - GetSystemInfoCmd = 1020 - GetSystemInfoReply = GetSystemInfoCmd + 1 - - ChannelTitleCmd = 1048 - ChannelTitleReply = ChannelTitleCmd + 1 - - SystemFunctionCmd = 1360 - SystemFunctionReply = SystemFunctionCmd + 1 - - OPMonitorStartStopCmd = 1410 - OPMonitorStartStopReply = OPMonitorStartStopCmd + 1 - - OPMonitorClaimCmd = 1413 - OPMonitorClaimReply = OPMonitorClaimCmd + 1 - - OPTimeQueryCmd = 1452 - OPTimeQueryReply = OPTimeQueryCmd + 1 - - VideoStreamData = 1412 - - -class AudioVideoDataType(enum.IntEnum): - VideoIncomplete = 0xfc - VideoComplete = 0xfd - AudioComplete = 0xfa - - -class AudioVideoPayload(): - _HeaderFields = collections.namedtuple("HeaderFields", ["unknown1", "channel", "datatype", "unknown2", "length"]) - _HeaderStruct = struct.Struct("< H B B H H") - - def __init__(self, payload, hint=""): - self._header = self._HeaderFields(*self._HeaderStruct.unpack(payload[:self._HeaderStruct.size])) - print( - "%20s [%5d]: %s %s" % (hint, len(payload), " ".join("%02x" % (c) for c in payload[:8]), str(self._header))) - # if len(payload) != (self._HeaderStruct.size + self._header.length): - # raise Exception("Unexpected AV payload, expected %d bytes but got %d." % (self._HeaderStruct.size + self._header.length, len(payload))) - # print(self._header) - self._data = payload[self._HeaderStruct.size:] - - @property - def data(self): - return self._data - - -class XMEyeMessage(): - _HeaderFields = collections.namedtuple("HeaderFields", ["station", "session", "unknown", "msgcode", "length"]) - _HeaderStruct = struct.Struct("< L L 6s H L") - - def __init__(self, station, session, msgcode, message): - if isinstance(message, (bytes, bytearray)): - self._message = bytes(message) - else: - self._message = json.dumps(message).encode("ascii") - self._header = self._HeaderFields(station=station, - session=session, - unknown=bytes(6), - msgcode=msgcode, - length=len(self._message)) - - @property - def header(self): - return self._header - - @property - def message(self): - return self._message - - @property - def payload(self): - msg = self.message.rstrip(bytes(1)) - try: - data = json.loads(msg) - except (json.JSONDecodeError, UnicodeError): - return self.message - return data - - @property - def length(self): - return len(self.message) - - def __bytes__(self): - header = self._HeaderStruct.pack(*self._header) - return header + self._message - - @classmethod - def deserialize(cls, msg): - header_data = msg[:20] - header = cls._HeaderFields(*cls._HeaderStruct.unpack(header_data)) - payload = msg[20: 20 + header.length] - if len(payload) < header.length: - payload += bytes(header.length - len(payload)) - - try: - msgcode = XMEyeMsgCode(header.msgcode) - except ValueError: - msgcode = header.msgcode - return cls(station=header.station, session=header.session, msgcode=msgcode, message=payload) - - @classmethod - def deserialize_all(cls, msg): - msg = bytearray(msg) - while len(msg) >= 20: - next_msg = cls.deserialize(msg) - yield next_msg - msg = msg[20 + next_msg.length:] - - def dump(self): - print("%s (%d bytes payload):" % (self._header.msgcode, self.length)) - if isinstance(self.payload, bytes): - print(self.payload) - else: - print(json.dumps(self.payload, indent=4, sort_keys=True)) - print() - - def __repr__(self): - return "Msg(%s): %s" % (self.header, str(self.payload)) - - -class XMEyeCamera(): - def __init__(self, hostname, username="admin", password="", port=34567): - self._hostname = hostname - self._conn = socket.create_connection((hostname, port)) - self._session = None - self._username = username - self._password = password - - @property - def derived_password(self): - return HorrificallyBrokenPasswordFunction.derive(self._password) - - @property - def rtsp_uri(self): - rtsp_port = 554 - return "rtsp://%s:%d/user=%s&password=%s&channel=" % ( - self._hostname, rtsp_port, self._username, self.derived_password) - - def _rx_bytes(self, length): - result = bytearray() - while len(result) < length: - remaining_bytes = length - len(result) - rx_data = self._conn.recv(remaining_bytes) - result += rx_data - return result - - def _rx(self): - response_header = self._rx_bytes(20) - header = XMEyeMessage.deserialize(response_header) - payload_data = self._rx_bytes(header.length) - - response_data = response_header + payload_data - response = XMEyeMessage.deserialize(response_data) - # print("<-", response) - return response - - def _tx(self, command): - # print("->", command) - data = bytes(command) - self._conn.send(data) - - def _tx_rx(self, command): - self._tx(command) - return self._rx() - - def login(self): - data = { - "EncryptType": "MD5", - "LoginType": "DVRIP-Web", - "UserName": self._username, - "PassWord": self.derived_password, - } - command = XMEyeMessage(station=0xff, session=0, msgcode=XMEyeMsgCode.LoginCmd, message=data) - response = self._tx_rx(command) - if int(response.payload["Ret"]) == 100: - # Login successful - self._session = int(response.payload["SessionID"], 16) - else: - raise Exception("Login failed:", response) - - def _generic_cmd(self, name, msgcode): - data = { - "Name": name, - "SessionID": "0x%x" % (self._session,), - } - command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data) - return self._tx_rx(command) - - def get_systeminfo(self): - return self._generic_cmd("SystemInfo", XMEyeMsgCode.GetSystemInfoCmd) - - def get_channel_title(self): - return self._generic_cmd("ChannelTitle", XMEyeMsgCode.ChannelTitleCmd) - - def get_system_function(self): - return self._generic_cmd("SystemFunction", XMEyeMsgCode.SystemFunctionCmd) - - def get_talk_audio_format(self): - return self._generic_cmd("TalkAudioFormat", XMEyeMsgCode.SystemFunctionCmd) - - def get_ntp_server(self): - ntp_config = self._generic_cmd("NetWork.NetNTP", XMEyeMsgCode.GetConfigCmd) - return ntp_config.payload['NetWork.NetNTP']['Server']['Name'] - - def set_ntp_server(self, - ntp_host: str, - ntp_port: int = 123) -> None: - data = { - 'Name': 'NetWork.NetNTP', - 'NetWork.NetNTP': { - 'Enable': True, - 'Server': { - 'Address': '0x00000000', - 'Anonymity': False, - 'Name': ntp_host, - 'Password': '', - 'Port': ntp_port, - 'UserName': '' - }, - 'TimeZone': 13, # Moscow time - 'UpdatePeriod': 60 - }, - "SessionID": "0x%x" % (self._session,), - } - command = XMEyeMessage(station=0xff, session=self._session, msgcode=XMEyeMsgCode.SetConfigCmd, message=data) - self._tx_rx(command) - - def _opmonitor_cmd(self, action, msgcode, rx_msg=True): - data = { - "Name": "OPMonitor", - "OPMonitor": { - "Action": action, - "Parameter": { - "Channel": 0, - "CombinMode": "CONNECT_ALL", - "StreamType": "Main", - "TransMode": "TCP", - }, - }, - "SessionID": "0x%x" % (self._session,), - } - command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data) - if rx_msg: - return self._tx_rx(command) - else: - self._tx(command) - - def get_stream(self, packet_callback): - self._opmonitor_cmd("Claim", XMEyeMsgCode.OPMonitorClaimCmd) - self._opmonitor_cmd("Start", XMEyeMsgCode.OPMonitorStartStopCmd, rx_msg=False) - while True: - rx_pkt = self._rx() - packet_callback(rx_pkt) - - # def playback_stream(self): - # mplayer_process = subprocess.Popen(["mplayer", "-demuxer", "h264es", "-"], stdin=subprocess.PIPE) - # with open("audio.raw", "wb") as f, open("video.raw", "wb") as video_f: - # def pkt_callback(pkt): - # if (pkt.header.station == 511) and (pkt.header.msgcode == XMEyeMsgCode.VideoStreamData): - # avpayload = AudioVideoPayload(pkt.payload, hint="video") - # mplayer_process.stdin.raw.write(pkt.payload) - # video_f.write(pkt.payload) - # elif pkt.header.msgcode == XMEyeMsgCode.VideoStreamData: - # # Audio data? - # avpayload = AudioVideoPayload(pkt.payload, hint="audio") - # f.write(avpayload.data) - # elif pkt.header.msgcode != XMEyeMsgCode.VideoStreamData: - # print(pkt) - # - # self.get_stream(pkt_callback) - - -if __name__ == '__main__': - cam = XMEyeCamera("192.168.0.47", password="DerPr03ess") - cam.login() - print(cam.get_systeminfo()) - print(cam.get_channel_title()) - # print(cam.get_talk_audio_format()) - # print(cam.get_system_function()) diff --git a/include/py/homekit/camera/hikvision/__init__.py b/include/py/homekit/camera/hikvision/__init__.py deleted file mode 100644 index 72d6ae3..0000000 --- a/include/py/homekit/camera/hikvision/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .isapi import ISAPIClient, ResponseError, AuthError diff --git a/include/py/homekit/camera/hikvision/isapi.py b/include/py/homekit/camera/hikvision/isapi.py deleted file mode 100644 index 6cc34f8..0000000 --- a/include/py/homekit/camera/hikvision/isapi.py +++ /dev/null @@ -1,137 +0,0 @@ -import requests - -from time import time -from .util import xml_to_dict, sha256_hex -from ...util import validate_ipv4 -from ...http import HTTPMethod -from typing import Optional, Union - - -class ResponseError(RuntimeError): - pass - - -class AuthError(ResponseError): - pass - - -class ISAPIClient: - def __init__(self, host): - self.host = host - self.cookies = {} - - def call(self, - path: str, - method: HTTPMethod = HTTPMethod.GET, - data: Optional[Union[dict, str, bytes]] = None, - raise_for_status=True, - check_put_response=False): - f = getattr(requests, method.value.lower()) - - headers = { - 'X-Requested-With': 'XMLHttpRequest', - } - if method in (HTTPMethod.PUT, HTTPMethod.POST): - headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' - - kwargs = {} - if data: - kwargs['data' if method is not HTTPMethod.GET else 'params'] = data - if len(self.cookies) > 0: - kwargs['cookies'] = self.cookies - - r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs) - - if raise_for_status or check_put_response: - r.raise_for_status() - - parsed_xml = None - if check_put_response: - parsed_xml = xml_to_dict(r.text) - resp = parsed_xml['ResponseStatus'] - status_code = int(resp['statusCode'][0]) - status_string = resp['statusString'][0] - if status_code != 1 or status_string.lower() != 'ok': - raise ResponseError('response status looks bad') - - self.cookies.update(r.cookies.get_dict()) - - if parsed_xml is None: - parsed_xml = xml_to_dict(r.text) - - return parsed_xml - - def auth(self, username: str, password: str): - xml = self.call('Security/sessionLogin/capabilities', data={'username': username}) - caps = xml['SessionLoginCap'] - is_irreversible = caps['isIrreversible'][0].lower() == 'true' - - # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl - # also look into webAuth.js and utils.js - - if 'salt' in caps and is_irreversible: - p = sha256_hex(username + caps['salt'][0] + password) - p = sha256_hex(p + caps['challenge'][0]) - for i in range(int(caps['iterations'][0])-2): - p = sha256_hex(p) - else: - p = sha256_hex(password) + caps['challenge'][0] - for i in range(int(caps['iterations'][0])-1): - p = sha256_hex(p) - - data = '<SessionLogin>' - data += f'<userName>{username}</userName>' - data += f'<password>{p}</password>' - data += f'<sessionID>{caps["sessionID"][0]}</sessionID>' - data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>' - data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>' - data += '</SessionLogin>' - - resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin'] - status_value = int(resp['statusValue'][0]) - status_string = resp['statusString'][0] - if status_value != 200: - raise AuthError(f'{status_value}: {status_string}') - - def get_ntp_server(self) -> str: - try: - # works on newer 1080p cams - xml = self.call('System/time/ntpServers/capabilities') - ntp_server = xml['NTPServerList']['NTPServer'][0] - if ntp_server['addressingFormatType'][0]['#text'] == 'hostname': - ntp_host = ntp_server['hostName'][0] - else: - ntp_host = ntp_server['ipAddress'][0] - - except requests.exceptions.HTTPError: - # works on older 720p cams - ntp_server = self.call('System/time/ntpServers/1')['NTPServer'] - if ntp_server['addressingFormatType'][0] == 'hostname': - ntp_host = ntp_server['hostName'][0] - else: - ntp_host = ntp_server['ipAddress'][0] - - return ntp_host - - def set_timezone(self): - data = '<?xml version="1.0" encoding="UTF-8"?>' - data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>' - self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True) - - def set_ntp_server(self, - ntp_host: str, - ntp_port: int = 123): - format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname' - - # test ntp server first - data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>' - resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult'] - error_code = int(resp['errorCode'][0]) - error_description = resp['errorDescription'][0] - if error_code != 0 or error_description.lower() != 'ok': - raise ResponseError('response status looks bad') - - # then set it - data = '<?xml version="1.0" encoding="UTF-8"?>' - data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>' - self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True) diff --git a/include/py/homekit/camera/hikvision/util.py b/include/py/homekit/camera/hikvision/util.py deleted file mode 100644 index 581c6ea..0000000 --- a/include/py/homekit/camera/hikvision/util.py +++ /dev/null @@ -1,48 +0,0 @@ -import requests -import hashlib -import xml.etree.ElementTree as ET - - -def xml_to_dict(xml_data: str) -> dict: - # Parse the XML data - root = ET.fromstring(xml_data) - - # Function to remove namespace from the tag name - def remove_namespace(tag): - return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace - - # Function to recursively convert XML elements to a dictionary - def elem_to_dict(elem): - tag = remove_namespace(elem.tag) - elem_dict = {tag: {}} - - # If the element has attributes, add them to the dictionary - elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()}) - - # Handle the element's text content, if present and not just whitespace - text = elem.text.strip() if elem.text and elem.text.strip() else None - if text: - elem_dict[tag]['#text'] = text - - # Process child elements - for child in elem: - child_dict = elem_to_dict(child) - child_tag = remove_namespace(child.tag) - if child_tag not in elem_dict[tag]: - elem_dict[tag][child_tag] = [] - elem_dict[tag][child_tag].append(child_dict[child_tag]) - - # Simplify structure if there's only text or no children and no attributes - if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]: - return {tag: elem_dict[tag]['#text']} - elif not elem_dict[tag]: - return {tag: ''} - - return elem_dict - - # Convert the root element to dictionary - return elem_to_dict(root) - - -def sha256_hex(input_string: str) -> str: - return hashlib.sha256(input_string.encode()).hexdigest() |