#!/usr/bin/python3 # numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera # Copyright (C) 2019-2019 Johannes Bauer # # Changes and improvements: # Copyright (C) 2024 Evgeny Zinoviev # # This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam). # # numenworld-ipcam is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; this program is ONLY licensed under # version 3 of the License, later versions are explicitly excluded. # # numenworld-ipcam is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with numenworld-ipcam; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Johannes Bauer import collections import struct import socket import enum import json import subprocess from .HorrificallyBrokenPasswordFunction import HorrificallyBrokenPasswordFunction class XMEyeMsgCode(enum.IntEnum): LoginCmd = 1000 LoginReply = LoginCmd + 1 KeepAliveCmd = 1006 KeepAliveReply = KeepAliveCmd + 1 SetConfigCmd = 1040 SetConfigReply = SetConfigCmd + 1 GetConfigCmd = 1042 GetConfidReply = GetConfigCmd + 1 GetSystemInfoCmd = 1020 GetSystemInfoReply = GetSystemInfoCmd + 1 ChannelTitleCmd = 1048 ChannelTitleReply = ChannelTitleCmd + 1 SystemFunctionCmd = 1360 SystemFunctionReply = SystemFunctionCmd + 1 OPMonitorStartStopCmd = 1410 OPMonitorStartStopReply = OPMonitorStartStopCmd + 1 OPMonitorClaimCmd = 1413 OPMonitorClaimReply = OPMonitorClaimCmd + 1 OPTimeQueryCmd = 1452 OPTimeQueryReply = OPTimeQueryCmd + 1 VideoStreamData = 1412 class AudioVideoDataType(enum.IntEnum): VideoIncomplete = 0xfc VideoComplete = 0xfd AudioComplete = 0xfa class AudioVideoPayload(): _HeaderFields = collections.namedtuple("HeaderFields", ["unknown1", "channel", "datatype", "unknown2", "length"]) _HeaderStruct = struct.Struct("< H B B H H") def __init__(self, payload, hint=""): self._header = self._HeaderFields(*self._HeaderStruct.unpack(payload[:self._HeaderStruct.size])) print( "%20s [%5d]: %s %s" % (hint, len(payload), " ".join("%02x" % (c) for c in payload[:8]), str(self._header))) # if len(payload) != (self._HeaderStruct.size + self._header.length): # raise Exception("Unexpected AV payload, expected %d bytes but got %d." % (self._HeaderStruct.size + self._header.length, len(payload))) # print(self._header) self._data = payload[self._HeaderStruct.size:] @property def data(self): return self._data class XMEyeMessage(): _HeaderFields = collections.namedtuple("HeaderFields", ["station", "session", "unknown", "msgcode", "length"]) _HeaderStruct = struct.Struct("< L L 6s H L") def __init__(self, station, session, msgcode, message): if isinstance(message, (bytes, bytearray)): self._message = bytes(message) else: self._message = json.dumps(message).encode("ascii") self._header = self._HeaderFields(station=station, session=session, unknown=bytes(6), msgcode=msgcode, length=len(self._message)) @property def header(self): return self._header @property def message(self): return self._message @property def payload(self): msg = self.message.rstrip(bytes(1)) try: data = json.loads(msg) except (json.JSONDecodeError, UnicodeError): return self.message return data @property def length(self): return len(self.message) def __bytes__(self): header = self._HeaderStruct.pack(*self._header) return header + self._message @classmethod def deserialize(cls, msg): header_data = msg[:20] header = cls._HeaderFields(*cls._HeaderStruct.unpack(header_data)) payload = msg[20: 20 + header.length] if len(payload) < header.length: payload += bytes(header.length - len(payload)) try: msgcode = XMEyeMsgCode(header.msgcode) except ValueError: msgcode = header.msgcode return cls(station=header.station, session=header.session, msgcode=msgcode, message=payload) @classmethod def deserialize_all(cls, msg): msg = bytearray(msg) while len(msg) >= 20: next_msg = cls.deserialize(msg) yield next_msg msg = msg[20 + next_msg.length:] def dump(self): print("%s (%d bytes payload):" % (self._header.msgcode, self.length)) if isinstance(self.payload, bytes): print(self.payload) else: print(json.dumps(self.payload, indent=4, sort_keys=True)) print() def __repr__(self): return "Msg(%s): %s" % (self.header, str(self.payload)) class XMEyeCamera(): def __init__(self, hostname, username="admin", password="", port=34567): self._hostname = hostname self._conn = socket.create_connection((hostname, port)) self._session = None self._username = username self._password = password @property def derived_password(self): return HorrificallyBrokenPasswordFunction.derive(self._password) @property def rtsp_uri(self): rtsp_port = 554 return "rtsp://%s:%d/user=%s&password=%s&channel=" % ( self._hostname, rtsp_port, self._username, self.derived_password) def _rx_bytes(self, length): result = bytearray() while len(result) < length: remaining_bytes = length - len(result) rx_data = self._conn.recv(remaining_bytes) result += rx_data return result def _rx(self): response_header = self._rx_bytes(20) header = XMEyeMessage.deserialize(response_header) payload_data = self._rx_bytes(header.length) response_data = response_header + payload_data response = XMEyeMessage.deserialize(response_data) # print("<-", response) return response def _tx(self, command): # print("->", command) data = bytes(command) self._conn.send(data) def _tx_rx(self, command): self._tx(command) return self._rx() def login(self): data = { "EncryptType": "MD5", "LoginType": "DVRIP-Web", "UserName": self._username, "PassWord": self.derived_password, } command = XMEyeMessage(station=0xff, session=0, msgcode=XMEyeMsgCode.LoginCmd, message=data) response = self._tx_rx(command) if int(response.payload["Ret"]) == 100: # Login successful self._session = int(response.payload["SessionID"], 16) else: raise Exception("Login failed:", response) def _generic_cmd(self, name, msgcode): data = { "Name": name, "SessionID": "0x%x" % (self._session,), } command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data) return self._tx_rx(command) def get_systeminfo(self): return self._generic_cmd("SystemInfo", XMEyeMsgCode.GetSystemInfoCmd) def get_channel_title(self): return self._generic_cmd("ChannelTitle", XMEyeMsgCode.ChannelTitleCmd) def get_system_function(self): return self._generic_cmd("SystemFunction", XMEyeMsgCode.SystemFunctionCmd) def get_talk_audio_format(self): return self._generic_cmd("TalkAudioFormat", XMEyeMsgCode.SystemFunctionCmd) def get_ntp_server(self): ntp_config = self._generic_cmd("NetWork.NetNTP", XMEyeMsgCode.GetConfigCmd) return ntp_config.payload['NetWork.NetNTP']['Server']['Name'] def set_ntp_server(self, ntp_host: str, ntp_port: int = 123) -> None: data = { 'Name': 'NetWork.NetNTP', 'NetWork.NetNTP': { 'Enable': True, 'Server': { 'Address': '0x00000000', 'Anonymity': False, 'Name': ntp_host, 'Password': '', 'Port': ntp_port, 'UserName': '' }, 'TimeZone': 13, # Moscow time 'UpdatePeriod': 60 }, "SessionID": "0x%x" % (self._session,), } command = XMEyeMessage(station=0xff, session=self._session, msgcode=XMEyeMsgCode.SetConfigCmd, message=data) self._tx_rx(command) def _opmonitor_cmd(self, action, msgcode, rx_msg=True): data = { "Name": "OPMonitor", "OPMonitor": { "Action": action, "Parameter": { "Channel": 0, "CombinMode": "CONNECT_ALL", "StreamType": "Main", "TransMode": "TCP", }, }, "SessionID": "0x%x" % (self._session,), } command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data) if rx_msg: return self._tx_rx(command) else: self._tx(command) def get_stream(self, packet_callback): self._opmonitor_cmd("Claim", XMEyeMsgCode.OPMonitorClaimCmd) self._opmonitor_cmd("Start", XMEyeMsgCode.OPMonitorStartStopCmd, rx_msg=False) while True: rx_pkt = self._rx() packet_callback(rx_pkt) # def playback_stream(self): # mplayer_process = subprocess.Popen(["mplayer", "-demuxer", "h264es", "-"], stdin=subprocess.PIPE) # with open("audio.raw", "wb") as f, open("video.raw", "wb") as video_f: # def pkt_callback(pkt): # if (pkt.header.station == 511) and (pkt.header.msgcode == XMEyeMsgCode.VideoStreamData): # avpayload = AudioVideoPayload(pkt.payload, hint="video") # mplayer_process.stdin.raw.write(pkt.payload) # video_f.write(pkt.payload) # elif pkt.header.msgcode == XMEyeMsgCode.VideoStreamData: # # Audio data? # avpayload = AudioVideoPayload(pkt.payload, hint="audio") # f.write(avpayload.data) # elif pkt.header.msgcode != XMEyeMsgCode.VideoStreamData: # print(pkt) # # self.get_stream(pkt_callback) if __name__ == '__main__': cam = XMEyeCamera("192.168.0.47", password="DerPr03ess") cam.login() print(cam.get_systeminfo()) print(cam.get_channel_title()) # print(cam.get_talk_audio_format()) # print(cam.get_system_function())