summaryrefslogtreecommitdiff
path: root/include/py/homekit/camera/alinoname/nwipcam.py
diff options
context:
space:
mode:
Diffstat (limited to 'include/py/homekit/camera/alinoname/nwipcam.py')
-rwxr-xr-xinclude/py/homekit/camera/alinoname/nwipcam.py326
1 files changed, 326 insertions, 0 deletions
diff --git a/include/py/homekit/camera/alinoname/nwipcam.py b/include/py/homekit/camera/alinoname/nwipcam.py
new file mode 100755
index 0000000..e54ec62
--- /dev/null
+++ b/include/py/homekit/camera/alinoname/nwipcam.py
@@ -0,0 +1,326 @@
+#!/usr/bin/python3
+# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera
+# Copyright (C) 2019-2019 Johannes Bauer
+#
+# Changes and improvements:
+# Copyright (C) 2024 Evgeny Zinoviev
+#
+# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam).
+#
+# numenworld-ipcam is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; this program is ONLY licensed under
+# version 3 of the License, later versions are explicitly excluded.
+#
+# numenworld-ipcam is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with numenworld-ipcam; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Johannes Bauer <JohannesBauer@gmx.de>
+
+import collections
+import struct
+import socket
+import enum
+import json
+import subprocess
+
+from .HorrificallyBrokenPasswordFunction import HorrificallyBrokenPasswordFunction
+
+
+class XMEyeMsgCode(enum.IntEnum):
+ LoginCmd = 1000
+ LoginReply = LoginCmd + 1
+
+ KeepAliveCmd = 1006
+ KeepAliveReply = KeepAliveCmd + 1
+
+ SetConfigCmd = 1040
+ SetConfigReply = SetConfigCmd + 1
+
+ GetConfigCmd = 1042
+ GetConfidReply = GetConfigCmd + 1
+
+ GetSystemInfoCmd = 1020
+ GetSystemInfoReply = GetSystemInfoCmd + 1
+
+ ChannelTitleCmd = 1048
+ ChannelTitleReply = ChannelTitleCmd + 1
+
+ SystemFunctionCmd = 1360
+ SystemFunctionReply = SystemFunctionCmd + 1
+
+ OPMonitorStartStopCmd = 1410
+ OPMonitorStartStopReply = OPMonitorStartStopCmd + 1
+
+ OPMonitorClaimCmd = 1413
+ OPMonitorClaimReply = OPMonitorClaimCmd + 1
+
+ OPTimeQueryCmd = 1452
+ OPTimeQueryReply = OPTimeQueryCmd + 1
+
+ VideoStreamData = 1412
+
+
+class AudioVideoDataType(enum.IntEnum):
+ VideoIncomplete = 0xfc
+ VideoComplete = 0xfd
+ AudioComplete = 0xfa
+
+
+class AudioVideoPayload():
+ _HeaderFields = collections.namedtuple("HeaderFields", ["unknown1", "channel", "datatype", "unknown2", "length"])
+ _HeaderStruct = struct.Struct("< H B B H H")
+
+ def __init__(self, payload, hint=""):
+ self._header = self._HeaderFields(*self._HeaderStruct.unpack(payload[:self._HeaderStruct.size]))
+ print(
+ "%20s [%5d]: %s %s" % (hint, len(payload), " ".join("%02x" % (c) for c in payload[:8]), str(self._header)))
+ # if len(payload) != (self._HeaderStruct.size + self._header.length):
+ # raise Exception("Unexpected AV payload, expected %d bytes but got %d." % (self._HeaderStruct.size + self._header.length, len(payload)))
+ # print(self._header)
+ self._data = payload[self._HeaderStruct.size:]
+
+ @property
+ def data(self):
+ return self._data
+
+
+class XMEyeMessage():
+ _HeaderFields = collections.namedtuple("HeaderFields", ["station", "session", "unknown", "msgcode", "length"])
+ _HeaderStruct = struct.Struct("< L L 6s H L")
+
+ def __init__(self, station, session, msgcode, message):
+ if isinstance(message, (bytes, bytearray)):
+ self._message = bytes(message)
+ else:
+ self._message = json.dumps(message).encode("ascii")
+ self._header = self._HeaderFields(station=station,
+ session=session,
+ unknown=bytes(6),
+ msgcode=msgcode,
+ length=len(self._message))
+
+ @property
+ def header(self):
+ return self._header
+
+ @property
+ def message(self):
+ return self._message
+
+ @property
+ def payload(self):
+ msg = self.message.rstrip(bytes(1))
+ try:
+ data = json.loads(msg)
+ except (json.JSONDecodeError, UnicodeError):
+ return self.message
+ return data
+
+ @property
+ def length(self):
+ return len(self.message)
+
+ def __bytes__(self):
+ header = self._HeaderStruct.pack(*self._header)
+ return header + self._message
+
+ @classmethod
+ def deserialize(cls, msg):
+ header_data = msg[:20]
+ header = cls._HeaderFields(*cls._HeaderStruct.unpack(header_data))
+ payload = msg[20: 20 + header.length]
+ if len(payload) < header.length:
+ payload += bytes(header.length - len(payload))
+
+ try:
+ msgcode = XMEyeMsgCode(header.msgcode)
+ except ValueError:
+ msgcode = header.msgcode
+ return cls(station=header.station, session=header.session, msgcode=msgcode, message=payload)
+
+ @classmethod
+ def deserialize_all(cls, msg):
+ msg = bytearray(msg)
+ while len(msg) >= 20:
+ next_msg = cls.deserialize(msg)
+ yield next_msg
+ msg = msg[20 + next_msg.length:]
+
+ def dump(self):
+ print("%s (%d bytes payload):" % (self._header.msgcode, self.length))
+ if isinstance(self.payload, bytes):
+ print(self.payload)
+ else:
+ print(json.dumps(self.payload, indent=4, sort_keys=True))
+ print()
+
+ def __repr__(self):
+ return "Msg(%s): %s" % (self.header, str(self.payload))
+
+
+class XMEyeCamera():
+ def __init__(self, hostname, username="admin", password="", port=34567):
+ self._hostname = hostname
+ self._conn = socket.create_connection((hostname, port))
+ self._session = None
+ self._username = username
+ self._password = password
+
+ @property
+ def derived_password(self):
+ return HorrificallyBrokenPasswordFunction.derive(self._password)
+
+ @property
+ def rtsp_uri(self):
+ rtsp_port = 554
+ return "rtsp://%s:%d/user=%s&password=%s&channel=" % (
+ self._hostname, rtsp_port, self._username, self.derived_password)
+
+ def _rx_bytes(self, length):
+ result = bytearray()
+ while len(result) < length:
+ remaining_bytes = length - len(result)
+ rx_data = self._conn.recv(remaining_bytes)
+ result += rx_data
+ return result
+
+ def _rx(self):
+ response_header = self._rx_bytes(20)
+ header = XMEyeMessage.deserialize(response_header)
+ payload_data = self._rx_bytes(header.length)
+
+ response_data = response_header + payload_data
+ response = XMEyeMessage.deserialize(response_data)
+ # print("<-", response)
+ return response
+
+ def _tx(self, command):
+ # print("->", command)
+ data = bytes(command)
+ self._conn.send(data)
+
+ def _tx_rx(self, command):
+ self._tx(command)
+ return self._rx()
+
+ def login(self):
+ data = {
+ "EncryptType": "MD5",
+ "LoginType": "DVRIP-Web",
+ "UserName": self._username,
+ "PassWord": self.derived_password,
+ }
+ command = XMEyeMessage(station=0xff, session=0, msgcode=XMEyeMsgCode.LoginCmd, message=data)
+ response = self._tx_rx(command)
+ if int(response.payload["Ret"]) == 100:
+ # Login successful
+ self._session = int(response.payload["SessionID"], 16)
+ else:
+ raise Exception("Login failed:", response)
+
+ def _generic_cmd(self, name, msgcode):
+ data = {
+ "Name": name,
+ "SessionID": "0x%x" % (self._session,),
+ }
+ command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
+ return self._tx_rx(command)
+
+ def get_systeminfo(self):
+ return self._generic_cmd("SystemInfo", XMEyeMsgCode.GetSystemInfoCmd)
+
+ def get_channel_title(self):
+ return self._generic_cmd("ChannelTitle", XMEyeMsgCode.ChannelTitleCmd)
+
+ def get_system_function(self):
+ return self._generic_cmd("SystemFunction", XMEyeMsgCode.SystemFunctionCmd)
+
+ def get_talk_audio_format(self):
+ return self._generic_cmd("TalkAudioFormat", XMEyeMsgCode.SystemFunctionCmd)
+
+ def get_ntp_server(self):
+ ntp_config = self._generic_cmd("NetWork.NetNTP", XMEyeMsgCode.GetConfigCmd)
+ return ntp_config.payload['NetWork.NetNTP']['Server']['Name']
+
+ def set_ntp_server(self,
+ ntp_host: str,
+ ntp_port: int = 123) -> None:
+ data = {
+ 'Name': 'NetWork.NetNTP',
+ 'NetWork.NetNTP': {
+ 'Enable': True,
+ 'Server': {
+ 'Address': '0x00000000',
+ 'Anonymity': False,
+ 'Name': ntp_host,
+ 'Password': '',
+ 'Port': ntp_port,
+ 'UserName': ''
+ },
+ 'TimeZone': 13, # Moscow time
+ 'UpdatePeriod': 60
+ },
+ "SessionID": "0x%x" % (self._session,),
+ }
+ command = XMEyeMessage(station=0xff, session=self._session, msgcode=XMEyeMsgCode.SetConfigCmd, message=data)
+ self._tx_rx(command)
+
+ def _opmonitor_cmd(self, action, msgcode, rx_msg=True):
+ data = {
+ "Name": "OPMonitor",
+ "OPMonitor": {
+ "Action": action,
+ "Parameter": {
+ "Channel": 0,
+ "CombinMode": "CONNECT_ALL",
+ "StreamType": "Main",
+ "TransMode": "TCP",
+ },
+ },
+ "SessionID": "0x%x" % (self._session,),
+ }
+ command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
+ if rx_msg:
+ return self._tx_rx(command)
+ else:
+ self._tx(command)
+
+ def get_stream(self, packet_callback):
+ self._opmonitor_cmd("Claim", XMEyeMsgCode.OPMonitorClaimCmd)
+ self._opmonitor_cmd("Start", XMEyeMsgCode.OPMonitorStartStopCmd, rx_msg=False)
+ while True:
+ rx_pkt = self._rx()
+ packet_callback(rx_pkt)
+
+ # def playback_stream(self):
+ # mplayer_process = subprocess.Popen(["mplayer", "-demuxer", "h264es", "-"], stdin=subprocess.PIPE)
+ # with open("audio.raw", "wb") as f, open("video.raw", "wb") as video_f:
+ # def pkt_callback(pkt):
+ # if (pkt.header.station == 511) and (pkt.header.msgcode == XMEyeMsgCode.VideoStreamData):
+ # avpayload = AudioVideoPayload(pkt.payload, hint="video")
+ # mplayer_process.stdin.raw.write(pkt.payload)
+ # video_f.write(pkt.payload)
+ # elif pkt.header.msgcode == XMEyeMsgCode.VideoStreamData:
+ # # Audio data?
+ # avpayload = AudioVideoPayload(pkt.payload, hint="audio")
+ # f.write(avpayload.data)
+ # elif pkt.header.msgcode != XMEyeMsgCode.VideoStreamData:
+ # print(pkt)
+ #
+ # self.get_stream(pkt_callback)
+
+
+if __name__ == '__main__':
+ cam = XMEyeCamera("192.168.0.47", password="DerPr03ess")
+ cam.login()
+ print(cam.get_systeminfo())
+ print(cam.get_channel_title())
+ # print(cam.get_talk_audio_format())
+ # print(cam.get_system_function())