From 70b4a4f044cac8052bb0af7c585572e54489ea2f Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sat, 17 Feb 2024 23:20:49 +0300 Subject: ipcam_ntp_util: support chinese noname cameras --- include/py/homekit/camera/alinoname/nwipcam.py | 326 +++++++++++++++++++++++++ 1 file changed, 326 insertions(+) create mode 100755 include/py/homekit/camera/alinoname/nwipcam.py (limited to 'include/py/homekit/camera/alinoname/nwipcam.py') diff --git a/include/py/homekit/camera/alinoname/nwipcam.py b/include/py/homekit/camera/alinoname/nwipcam.py new file mode 100755 index 0000000..e54ec62 --- /dev/null +++ b/include/py/homekit/camera/alinoname/nwipcam.py @@ -0,0 +1,326 @@ +#!/usr/bin/python3 +# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera +# Copyright (C) 2019-2019 Johannes Bauer +# +# Changes and improvements: +# Copyright (C) 2024 Evgeny Zinoviev +# +# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam). +# +# numenworld-ipcam is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; this program is ONLY licensed under +# version 3 of the License, later versions are explicitly excluded. +# +# numenworld-ipcam is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with numenworld-ipcam; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Johannes Bauer + +import collections +import struct +import socket +import enum +import json +import subprocess + +from .HorrificallyBrokenPasswordFunction import HorrificallyBrokenPasswordFunction + + +class XMEyeMsgCode(enum.IntEnum): + LoginCmd = 1000 + LoginReply = LoginCmd + 1 + + KeepAliveCmd = 1006 + KeepAliveReply = KeepAliveCmd + 1 + + SetConfigCmd = 1040 + SetConfigReply = SetConfigCmd + 1 + + GetConfigCmd = 1042 + GetConfidReply = GetConfigCmd + 1 + + GetSystemInfoCmd = 1020 + GetSystemInfoReply = GetSystemInfoCmd + 1 + + ChannelTitleCmd = 1048 + ChannelTitleReply = ChannelTitleCmd + 1 + + SystemFunctionCmd = 1360 + SystemFunctionReply = SystemFunctionCmd + 1 + + OPMonitorStartStopCmd = 1410 + OPMonitorStartStopReply = OPMonitorStartStopCmd + 1 + + OPMonitorClaimCmd = 1413 + OPMonitorClaimReply = OPMonitorClaimCmd + 1 + + OPTimeQueryCmd = 1452 + OPTimeQueryReply = OPTimeQueryCmd + 1 + + VideoStreamData = 1412 + + +class AudioVideoDataType(enum.IntEnum): + VideoIncomplete = 0xfc + VideoComplete = 0xfd + AudioComplete = 0xfa + + +class AudioVideoPayload(): + _HeaderFields = collections.namedtuple("HeaderFields", ["unknown1", "channel", "datatype", "unknown2", "length"]) + _HeaderStruct = struct.Struct("< H B B H H") + + def __init__(self, payload, hint=""): + self._header = self._HeaderFields(*self._HeaderStruct.unpack(payload[:self._HeaderStruct.size])) + print( + "%20s [%5d]: %s %s" % (hint, len(payload), " ".join("%02x" % (c) for c in payload[:8]), str(self._header))) + # if len(payload) != (self._HeaderStruct.size + self._header.length): + # raise Exception("Unexpected AV payload, expected %d bytes but got %d." % (self._HeaderStruct.size + self._header.length, len(payload))) + # print(self._header) + self._data = payload[self._HeaderStruct.size:] + + @property + def data(self): + return self._data + + +class XMEyeMessage(): + _HeaderFields = collections.namedtuple("HeaderFields", ["station", "session", "unknown", "msgcode", "length"]) + _HeaderStruct = struct.Struct("< L L 6s H L") + + def __init__(self, station, session, msgcode, message): + if isinstance(message, (bytes, bytearray)): + self._message = bytes(message) + else: + self._message = json.dumps(message).encode("ascii") + self._header = self._HeaderFields(station=station, + session=session, + unknown=bytes(6), + msgcode=msgcode, + length=len(self._message)) + + @property + def header(self): + return self._header + + @property + def message(self): + return self._message + + @property + def payload(self): + msg = self.message.rstrip(bytes(1)) + try: + data = json.loads(msg) + except (json.JSONDecodeError, UnicodeError): + return self.message + return data + + @property + def length(self): + return len(self.message) + + def __bytes__(self): + header = self._HeaderStruct.pack(*self._header) + return header + self._message + + @classmethod + def deserialize(cls, msg): + header_data = msg[:20] + header = cls._HeaderFields(*cls._HeaderStruct.unpack(header_data)) + payload = msg[20: 20 + header.length] + if len(payload) < header.length: + payload += bytes(header.length - len(payload)) + + try: + msgcode = XMEyeMsgCode(header.msgcode) + except ValueError: + msgcode = header.msgcode + return cls(station=header.station, session=header.session, msgcode=msgcode, message=payload) + + @classmethod + def deserialize_all(cls, msg): + msg = bytearray(msg) + while len(msg) >= 20: + next_msg = cls.deserialize(msg) + yield next_msg + msg = msg[20 + next_msg.length:] + + def dump(self): + print("%s (%d bytes payload):" % (self._header.msgcode, self.length)) + if isinstance(self.payload, bytes): + print(self.payload) + else: + print(json.dumps(self.payload, indent=4, sort_keys=True)) + print() + + def __repr__(self): + return "Msg(%s): %s" % (self.header, str(self.payload)) + + +class XMEyeCamera(): + def __init__(self, hostname, username="admin", password="", port=34567): + self._hostname = hostname + self._conn = socket.create_connection((hostname, port)) + self._session = None + self._username = username + self._password = password + + @property + def derived_password(self): + return HorrificallyBrokenPasswordFunction.derive(self._password) + + @property + def rtsp_uri(self): + rtsp_port = 554 + return "rtsp://%s:%d/user=%s&password=%s&channel=" % ( + self._hostname, rtsp_port, self._username, self.derived_password) + + def _rx_bytes(self, length): + result = bytearray() + while len(result) < length: + remaining_bytes = length - len(result) + rx_data = self._conn.recv(remaining_bytes) + result += rx_data + return result + + def _rx(self): + response_header = self._rx_bytes(20) + header = XMEyeMessage.deserialize(response_header) + payload_data = self._rx_bytes(header.length) + + response_data = response_header + payload_data + response = XMEyeMessage.deserialize(response_data) + # print("<-", response) + return response + + def _tx(self, command): + # print("->", command) + data = bytes(command) + self._conn.send(data) + + def _tx_rx(self, command): + self._tx(command) + return self._rx() + + def login(self): + data = { + "EncryptType": "MD5", + "LoginType": "DVRIP-Web", + "UserName": self._username, + "PassWord": self.derived_password, + } + command = XMEyeMessage(station=0xff, session=0, msgcode=XMEyeMsgCode.LoginCmd, message=data) + response = self._tx_rx(command) + if int(response.payload["Ret"]) == 100: + # Login successful + self._session = int(response.payload["SessionID"], 16) + else: + raise Exception("Login failed:", response) + + def _generic_cmd(self, name, msgcode): + data = { + "Name": name, + "SessionID": "0x%x" % (self._session,), + } + command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data) + return self._tx_rx(command) + + def get_systeminfo(self): + return self._generic_cmd("SystemInfo", XMEyeMsgCode.GetSystemInfoCmd) + + def get_channel_title(self): + return self._generic_cmd("ChannelTitle", XMEyeMsgCode.ChannelTitleCmd) + + def get_system_function(self): + return self._generic_cmd("SystemFunction", XMEyeMsgCode.SystemFunctionCmd) + + def get_talk_audio_format(self): + return self._generic_cmd("TalkAudioFormat", XMEyeMsgCode.SystemFunctionCmd) + + def get_ntp_server(self): + ntp_config = self._generic_cmd("NetWork.NetNTP", XMEyeMsgCode.GetConfigCmd) + return ntp_config.payload['NetWork.NetNTP']['Server']['Name'] + + def set_ntp_server(self, + ntp_host: str, + ntp_port: int = 123) -> None: + data = { + 'Name': 'NetWork.NetNTP', + 'NetWork.NetNTP': { + 'Enable': True, + 'Server': { + 'Address': '0x00000000', + 'Anonymity': False, + 'Name': ntp_host, + 'Password': '', + 'Port': ntp_port, + 'UserName': '' + }, + 'TimeZone': 13, # Moscow time + 'UpdatePeriod': 60 + }, + "SessionID": "0x%x" % (self._session,), + } + command = XMEyeMessage(station=0xff, session=self._session, msgcode=XMEyeMsgCode.SetConfigCmd, message=data) + self._tx_rx(command) + + def _opmonitor_cmd(self, action, msgcode, rx_msg=True): + data = { + "Name": "OPMonitor", + "OPMonitor": { + "Action": action, + "Parameter": { + "Channel": 0, + "CombinMode": "CONNECT_ALL", + "StreamType": "Main", + "TransMode": "TCP", + }, + }, + "SessionID": "0x%x" % (self._session,), + } + command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data) + if rx_msg: + return self._tx_rx(command) + else: + self._tx(command) + + def get_stream(self, packet_callback): + self._opmonitor_cmd("Claim", XMEyeMsgCode.OPMonitorClaimCmd) + self._opmonitor_cmd("Start", XMEyeMsgCode.OPMonitorStartStopCmd, rx_msg=False) + while True: + rx_pkt = self._rx() + packet_callback(rx_pkt) + + # def playback_stream(self): + # mplayer_process = subprocess.Popen(["mplayer", "-demuxer", "h264es", "-"], stdin=subprocess.PIPE) + # with open("audio.raw", "wb") as f, open("video.raw", "wb") as video_f: + # def pkt_callback(pkt): + # if (pkt.header.station == 511) and (pkt.header.msgcode == XMEyeMsgCode.VideoStreamData): + # avpayload = AudioVideoPayload(pkt.payload, hint="video") + # mplayer_process.stdin.raw.write(pkt.payload) + # video_f.write(pkt.payload) + # elif pkt.header.msgcode == XMEyeMsgCode.VideoStreamData: + # # Audio data? + # avpayload = AudioVideoPayload(pkt.payload, hint="audio") + # f.write(avpayload.data) + # elif pkt.header.msgcode != XMEyeMsgCode.VideoStreamData: + # print(pkt) + # + # self.get_stream(pkt_callback) + + +if __name__ == '__main__': + cam = XMEyeCamera("192.168.0.47", password="DerPr03ess") + cam.login() + print(cam.get_systeminfo()) + print(cam.get_channel_title()) + # print(cam.get_talk_audio_format()) + # print(cam.get_system_function()) -- cgit v1.2.3