summaryrefslogtreecommitdiff
path: root/include/py/homekit
diff options
context:
space:
mode:
Diffstat (limited to 'include/py/homekit')
-rw-r--r--include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py42
-rw-r--r--include/py/homekit/camera/alinoname/__init__.py1
-rwxr-xr-xinclude/py/homekit/camera/alinoname/nwipcam.py326
-rw-r--r--include/py/homekit/camera/hikvision/__init__.py1
-rw-r--r--include/py/homekit/camera/hikvision/isapi.py137
-rw-r--r--include/py/homekit/camera/hikvision/util.py48
6 files changed, 0 insertions, 555 deletions
diff --git a/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py b/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py
deleted file mode 100644
index 9423382..0000000
--- a/include/py/homekit/camera/alinoname/HorrificallyBrokenPasswordFunction.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera
-# Copyright (C) 2019-2019 Johannes Bauer
-#
-# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam).
-#
-# numenworld-ipcam is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; this program is ONLY licensed under
-# version 3 of the License, later versions are explicitly excluded.
-#
-# numenworld-ipcam is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with numenworld-ipcam; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-# Johannes Bauer <JohannesBauer@gmx.de>
-
-import hashlib
-
-
-class HorrificallyBrokenPasswordFunction():
- @classmethod
- def derive(self, passphrase):
- alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
- assert (len(alphabet) == 62)
- passphrase = passphrase.encode("utf-8")
- hashval = hashlib.md5(passphrase).digest()
- encoded = ""
- for i in range(0, 16, 2):
- index = (hashval[i] + hashval[i + 1]) % len(alphabet)
- char = alphabet[index]
- encoded += char
- return encoded
-
-
-if __name__ == "__main__":
- assert (HorrificallyBrokenPasswordFunction.derive("") == "tlJwpbo6")
- assert (HorrificallyBrokenPasswordFunction.derive("abc") == "LkM7s2Ht")
diff --git a/include/py/homekit/camera/alinoname/__init__.py b/include/py/homekit/camera/alinoname/__init__.py
deleted file mode 100644
index bce9919..0000000
--- a/include/py/homekit/camera/alinoname/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-from .nwipcam import XMEyeCamera
diff --git a/include/py/homekit/camera/alinoname/nwipcam.py b/include/py/homekit/camera/alinoname/nwipcam.py
deleted file mode 100755
index e54ec62..0000000
--- a/include/py/homekit/camera/alinoname/nwipcam.py
+++ /dev/null
@@ -1,326 +0,0 @@
-#!/usr/bin/python3
-# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera
-# Copyright (C) 2019-2019 Johannes Bauer
-#
-# Changes and improvements:
-# Copyright (C) 2024 Evgeny Zinoviev
-#
-# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam).
-#
-# numenworld-ipcam is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; this program is ONLY licensed under
-# version 3 of the License, later versions are explicitly excluded.
-#
-# numenworld-ipcam is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with numenworld-ipcam; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-# Johannes Bauer <JohannesBauer@gmx.de>
-
-import collections
-import struct
-import socket
-import enum
-import json
-import subprocess
-
-from .HorrificallyBrokenPasswordFunction import HorrificallyBrokenPasswordFunction
-
-
-class XMEyeMsgCode(enum.IntEnum):
- LoginCmd = 1000
- LoginReply = LoginCmd + 1
-
- KeepAliveCmd = 1006
- KeepAliveReply = KeepAliveCmd + 1
-
- SetConfigCmd = 1040
- SetConfigReply = SetConfigCmd + 1
-
- GetConfigCmd = 1042
- GetConfidReply = GetConfigCmd + 1
-
- GetSystemInfoCmd = 1020
- GetSystemInfoReply = GetSystemInfoCmd + 1
-
- ChannelTitleCmd = 1048
- ChannelTitleReply = ChannelTitleCmd + 1
-
- SystemFunctionCmd = 1360
- SystemFunctionReply = SystemFunctionCmd + 1
-
- OPMonitorStartStopCmd = 1410
- OPMonitorStartStopReply = OPMonitorStartStopCmd + 1
-
- OPMonitorClaimCmd = 1413
- OPMonitorClaimReply = OPMonitorClaimCmd + 1
-
- OPTimeQueryCmd = 1452
- OPTimeQueryReply = OPTimeQueryCmd + 1
-
- VideoStreamData = 1412
-
-
-class AudioVideoDataType(enum.IntEnum):
- VideoIncomplete = 0xfc
- VideoComplete = 0xfd
- AudioComplete = 0xfa
-
-
-class AudioVideoPayload():
- _HeaderFields = collections.namedtuple("HeaderFields", ["unknown1", "channel", "datatype", "unknown2", "length"])
- _HeaderStruct = struct.Struct("< H B B H H")
-
- def __init__(self, payload, hint=""):
- self._header = self._HeaderFields(*self._HeaderStruct.unpack(payload[:self._HeaderStruct.size]))
- print(
- "%20s [%5d]: %s %s" % (hint, len(payload), " ".join("%02x" % (c) for c in payload[:8]), str(self._header)))
- # if len(payload) != (self._HeaderStruct.size + self._header.length):
- # raise Exception("Unexpected AV payload, expected %d bytes but got %d." % (self._HeaderStruct.size + self._header.length, len(payload)))
- # print(self._header)
- self._data = payload[self._HeaderStruct.size:]
-
- @property
- def data(self):
- return self._data
-
-
-class XMEyeMessage():
- _HeaderFields = collections.namedtuple("HeaderFields", ["station", "session", "unknown", "msgcode", "length"])
- _HeaderStruct = struct.Struct("< L L 6s H L")
-
- def __init__(self, station, session, msgcode, message):
- if isinstance(message, (bytes, bytearray)):
- self._message = bytes(message)
- else:
- self._message = json.dumps(message).encode("ascii")
- self._header = self._HeaderFields(station=station,
- session=session,
- unknown=bytes(6),
- msgcode=msgcode,
- length=len(self._message))
-
- @property
- def header(self):
- return self._header
-
- @property
- def message(self):
- return self._message
-
- @property
- def payload(self):
- msg = self.message.rstrip(bytes(1))
- try:
- data = json.loads(msg)
- except (json.JSONDecodeError, UnicodeError):
- return self.message
- return data
-
- @property
- def length(self):
- return len(self.message)
-
- def __bytes__(self):
- header = self._HeaderStruct.pack(*self._header)
- return header + self._message
-
- @classmethod
- def deserialize(cls, msg):
- header_data = msg[:20]
- header = cls._HeaderFields(*cls._HeaderStruct.unpack(header_data))
- payload = msg[20: 20 + header.length]
- if len(payload) < header.length:
- payload += bytes(header.length - len(payload))
-
- try:
- msgcode = XMEyeMsgCode(header.msgcode)
- except ValueError:
- msgcode = header.msgcode
- return cls(station=header.station, session=header.session, msgcode=msgcode, message=payload)
-
- @classmethod
- def deserialize_all(cls, msg):
- msg = bytearray(msg)
- while len(msg) >= 20:
- next_msg = cls.deserialize(msg)
- yield next_msg
- msg = msg[20 + next_msg.length:]
-
- def dump(self):
- print("%s (%d bytes payload):" % (self._header.msgcode, self.length))
- if isinstance(self.payload, bytes):
- print(self.payload)
- else:
- print(json.dumps(self.payload, indent=4, sort_keys=True))
- print()
-
- def __repr__(self):
- return "Msg(%s): %s" % (self.header, str(self.payload))
-
-
-class XMEyeCamera():
- def __init__(self, hostname, username="admin", password="", port=34567):
- self._hostname = hostname
- self._conn = socket.create_connection((hostname, port))
- self._session = None
- self._username = username
- self._password = password
-
- @property
- def derived_password(self):
- return HorrificallyBrokenPasswordFunction.derive(self._password)
-
- @property
- def rtsp_uri(self):
- rtsp_port = 554
- return "rtsp://%s:%d/user=%s&password=%s&channel=" % (
- self._hostname, rtsp_port, self._username, self.derived_password)
-
- def _rx_bytes(self, length):
- result = bytearray()
- while len(result) < length:
- remaining_bytes = length - len(result)
- rx_data = self._conn.recv(remaining_bytes)
- result += rx_data
- return result
-
- def _rx(self):
- response_header = self._rx_bytes(20)
- header = XMEyeMessage.deserialize(response_header)
- payload_data = self._rx_bytes(header.length)
-
- response_data = response_header + payload_data
- response = XMEyeMessage.deserialize(response_data)
- # print("<-", response)
- return response
-
- def _tx(self, command):
- # print("->", command)
- data = bytes(command)
- self._conn.send(data)
-
- def _tx_rx(self, command):
- self._tx(command)
- return self._rx()
-
- def login(self):
- data = {
- "EncryptType": "MD5",
- "LoginType": "DVRIP-Web",
- "UserName": self._username,
- "PassWord": self.derived_password,
- }
- command = XMEyeMessage(station=0xff, session=0, msgcode=XMEyeMsgCode.LoginCmd, message=data)
- response = self._tx_rx(command)
- if int(response.payload["Ret"]) == 100:
- # Login successful
- self._session = int(response.payload["SessionID"], 16)
- else:
- raise Exception("Login failed:", response)
-
- def _generic_cmd(self, name, msgcode):
- data = {
- "Name": name,
- "SessionID": "0x%x" % (self._session,),
- }
- command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
- return self._tx_rx(command)
-
- def get_systeminfo(self):
- return self._generic_cmd("SystemInfo", XMEyeMsgCode.GetSystemInfoCmd)
-
- def get_channel_title(self):
- return self._generic_cmd("ChannelTitle", XMEyeMsgCode.ChannelTitleCmd)
-
- def get_system_function(self):
- return self._generic_cmd("SystemFunction", XMEyeMsgCode.SystemFunctionCmd)
-
- def get_talk_audio_format(self):
- return self._generic_cmd("TalkAudioFormat", XMEyeMsgCode.SystemFunctionCmd)
-
- def get_ntp_server(self):
- ntp_config = self._generic_cmd("NetWork.NetNTP", XMEyeMsgCode.GetConfigCmd)
- return ntp_config.payload['NetWork.NetNTP']['Server']['Name']
-
- def set_ntp_server(self,
- ntp_host: str,
- ntp_port: int = 123) -> None:
- data = {
- 'Name': 'NetWork.NetNTP',
- 'NetWork.NetNTP': {
- 'Enable': True,
- 'Server': {
- 'Address': '0x00000000',
- 'Anonymity': False,
- 'Name': ntp_host,
- 'Password': '',
- 'Port': ntp_port,
- 'UserName': ''
- },
- 'TimeZone': 13, # Moscow time
- 'UpdatePeriod': 60
- },
- "SessionID": "0x%x" % (self._session,),
- }
- command = XMEyeMessage(station=0xff, session=self._session, msgcode=XMEyeMsgCode.SetConfigCmd, message=data)
- self._tx_rx(command)
-
- def _opmonitor_cmd(self, action, msgcode, rx_msg=True):
- data = {
- "Name": "OPMonitor",
- "OPMonitor": {
- "Action": action,
- "Parameter": {
- "Channel": 0,
- "CombinMode": "CONNECT_ALL",
- "StreamType": "Main",
- "TransMode": "TCP",
- },
- },
- "SessionID": "0x%x" % (self._session,),
- }
- command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
- if rx_msg:
- return self._tx_rx(command)
- else:
- self._tx(command)
-
- def get_stream(self, packet_callback):
- self._opmonitor_cmd("Claim", XMEyeMsgCode.OPMonitorClaimCmd)
- self._opmonitor_cmd("Start", XMEyeMsgCode.OPMonitorStartStopCmd, rx_msg=False)
- while True:
- rx_pkt = self._rx()
- packet_callback(rx_pkt)
-
- # def playback_stream(self):
- # mplayer_process = subprocess.Popen(["mplayer", "-demuxer", "h264es", "-"], stdin=subprocess.PIPE)
- # with open("audio.raw", "wb") as f, open("video.raw", "wb") as video_f:
- # def pkt_callback(pkt):
- # if (pkt.header.station == 511) and (pkt.header.msgcode == XMEyeMsgCode.VideoStreamData):
- # avpayload = AudioVideoPayload(pkt.payload, hint="video")
- # mplayer_process.stdin.raw.write(pkt.payload)
- # video_f.write(pkt.payload)
- # elif pkt.header.msgcode == XMEyeMsgCode.VideoStreamData:
- # # Audio data?
- # avpayload = AudioVideoPayload(pkt.payload, hint="audio")
- # f.write(avpayload.data)
- # elif pkt.header.msgcode != XMEyeMsgCode.VideoStreamData:
- # print(pkt)
- #
- # self.get_stream(pkt_callback)
-
-
-if __name__ == '__main__':
- cam = XMEyeCamera("192.168.0.47", password="DerPr03ess")
- cam.login()
- print(cam.get_systeminfo())
- print(cam.get_channel_title())
- # print(cam.get_talk_audio_format())
- # print(cam.get_system_function())
diff --git a/include/py/homekit/camera/hikvision/__init__.py b/include/py/homekit/camera/hikvision/__init__.py
deleted file mode 100644
index 72d6ae3..0000000
--- a/include/py/homekit/camera/hikvision/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-from .isapi import ISAPIClient, ResponseError, AuthError
diff --git a/include/py/homekit/camera/hikvision/isapi.py b/include/py/homekit/camera/hikvision/isapi.py
deleted file mode 100644
index 6cc34f8..0000000
--- a/include/py/homekit/camera/hikvision/isapi.py
+++ /dev/null
@@ -1,137 +0,0 @@
-import requests
-
-from time import time
-from .util import xml_to_dict, sha256_hex
-from ...util import validate_ipv4
-from ...http import HTTPMethod
-from typing import Optional, Union
-
-
-class ResponseError(RuntimeError):
- pass
-
-
-class AuthError(ResponseError):
- pass
-
-
-class ISAPIClient:
- def __init__(self, host):
- self.host = host
- self.cookies = {}
-
- def call(self,
- path: str,
- method: HTTPMethod = HTTPMethod.GET,
- data: Optional[Union[dict, str, bytes]] = None,
- raise_for_status=True,
- check_put_response=False):
- f = getattr(requests, method.value.lower())
-
- headers = {
- 'X-Requested-With': 'XMLHttpRequest',
- }
- if method in (HTTPMethod.PUT, HTTPMethod.POST):
- headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
-
- kwargs = {}
- if data:
- kwargs['data' if method is not HTTPMethod.GET else 'params'] = data
- if len(self.cookies) > 0:
- kwargs['cookies'] = self.cookies
-
- r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs)
-
- if raise_for_status or check_put_response:
- r.raise_for_status()
-
- parsed_xml = None
- if check_put_response:
- parsed_xml = xml_to_dict(r.text)
- resp = parsed_xml['ResponseStatus']
- status_code = int(resp['statusCode'][0])
- status_string = resp['statusString'][0]
- if status_code != 1 or status_string.lower() != 'ok':
- raise ResponseError('response status looks bad')
-
- self.cookies.update(r.cookies.get_dict())
-
- if parsed_xml is None:
- parsed_xml = xml_to_dict(r.text)
-
- return parsed_xml
-
- def auth(self, username: str, password: str):
- xml = self.call('Security/sessionLogin/capabilities', data={'username': username})
- caps = xml['SessionLoginCap']
- is_irreversible = caps['isIrreversible'][0].lower() == 'true'
-
- # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
- # also look into webAuth.js and utils.js
-
- if 'salt' in caps and is_irreversible:
- p = sha256_hex(username + caps['salt'][0] + password)
- p = sha256_hex(p + caps['challenge'][0])
- for i in range(int(caps['iterations'][0])-2):
- p = sha256_hex(p)
- else:
- p = sha256_hex(password) + caps['challenge'][0]
- for i in range(int(caps['iterations'][0])-1):
- p = sha256_hex(p)
-
- data = '<SessionLogin>'
- data += f'<userName>{username}</userName>'
- data += f'<password>{p}</password>'
- data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
- data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
- data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
- data += '</SessionLogin>'
-
- resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin']
- status_value = int(resp['statusValue'][0])
- status_string = resp['statusString'][0]
- if status_value != 200:
- raise AuthError(f'{status_value}: {status_string}')
-
- def get_ntp_server(self) -> str:
- try:
- # works on newer 1080p cams
- xml = self.call('System/time/ntpServers/capabilities')
- ntp_server = xml['NTPServerList']['NTPServer'][0]
- if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
- ntp_host = ntp_server['hostName'][0]
- else:
- ntp_host = ntp_server['ipAddress'][0]
-
- except requests.exceptions.HTTPError:
- # works on older 720p cams
- ntp_server = self.call('System/time/ntpServers/1')['NTPServer']
- if ntp_server['addressingFormatType'][0] == 'hostname':
- ntp_host = ntp_server['hostName'][0]
- else:
- ntp_host = ntp_server['ipAddress'][0]
-
- return ntp_host
-
- def set_timezone(self):
- data = '<?xml version="1.0" encoding="UTF-8"?>'
- data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
- self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True)
-
- def set_ntp_server(self,
- ntp_host: str,
- ntp_port: int = 123):
- format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
-
- # test ntp server first
- data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
- resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult']
- error_code = int(resp['errorCode'][0])
- error_description = resp['errorDescription'][0]
- if error_code != 0 or error_description.lower() != 'ok':
- raise ResponseError('response status looks bad')
-
- # then set it
- data = '<?xml version="1.0" encoding="UTF-8"?>'
- data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
- self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True)
diff --git a/include/py/homekit/camera/hikvision/util.py b/include/py/homekit/camera/hikvision/util.py
deleted file mode 100644
index 581c6ea..0000000
--- a/include/py/homekit/camera/hikvision/util.py
+++ /dev/null
@@ -1,48 +0,0 @@
-import requests
-import hashlib
-import xml.etree.ElementTree as ET
-
-
-def xml_to_dict(xml_data: str) -> dict:
- # Parse the XML data
- root = ET.fromstring(xml_data)
-
- # Function to remove namespace from the tag name
- def remove_namespace(tag):
- return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
-
- # Function to recursively convert XML elements to a dictionary
- def elem_to_dict(elem):
- tag = remove_namespace(elem.tag)
- elem_dict = {tag: {}}
-
- # If the element has attributes, add them to the dictionary
- elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
-
- # Handle the element's text content, if present and not just whitespace
- text = elem.text.strip() if elem.text and elem.text.strip() else None
- if text:
- elem_dict[tag]['#text'] = text
-
- # Process child elements
- for child in elem:
- child_dict = elem_to_dict(child)
- child_tag = remove_namespace(child.tag)
- if child_tag not in elem_dict[tag]:
- elem_dict[tag][child_tag] = []
- elem_dict[tag][child_tag].append(child_dict[child_tag])
-
- # Simplify structure if there's only text or no children and no attributes
- if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
- return {tag: elem_dict[tag]['#text']}
- elif not elem_dict[tag]:
- return {tag: ''}
-
- return elem_dict
-
- # Convert the root element to dictionary
- return elem_to_dict(root)
-
-
-def sha256_hex(input_string: str) -> str:
- return hashlib.sha256(input_string.encode()).hexdigest()