summaryrefslogtreecommitdiff
path: root/include/py/homekit/camera/alinoname/nwipcam.py
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2024-02-25 23:07:46 +0300
committerEvgeny Zinoviev <me@ch1p.io>2024-02-25 23:07:46 +0300
commitc4f87ddad4058c0f331446fdfd8d762b8fc26c18 (patch)
tree4c64e4b685670939c6401b4bdfdbeb6e4ed6a9fd /include/py/homekit/camera/alinoname/nwipcam.py
parentd43ca74063d8d931325c4498b02f40bb03e9e104 (diff)
homekit: move hikvision and xmeye api stuff out of homekit package
Diffstat (limited to 'include/py/homekit/camera/alinoname/nwipcam.py')
-rwxr-xr-xinclude/py/homekit/camera/alinoname/nwipcam.py326
1 files changed, 0 insertions, 326 deletions
diff --git a/include/py/homekit/camera/alinoname/nwipcam.py b/include/py/homekit/camera/alinoname/nwipcam.py
deleted file mode 100755
index e54ec62..0000000
--- a/include/py/homekit/camera/alinoname/nwipcam.py
+++ /dev/null
@@ -1,326 +0,0 @@
-#!/usr/bin/python3
-# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera
-# Copyright (C) 2019-2019 Johannes Bauer
-#
-# Changes and improvements:
-# Copyright (C) 2024 Evgeny Zinoviev
-#
-# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam).
-#
-# numenworld-ipcam is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; this program is ONLY licensed under
-# version 3 of the License, later versions are explicitly excluded.
-#
-# numenworld-ipcam is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with numenworld-ipcam; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-# Johannes Bauer <JohannesBauer@gmx.de>
-
-import collections
-import struct
-import socket
-import enum
-import json
-import subprocess
-
-from .HorrificallyBrokenPasswordFunction import HorrificallyBrokenPasswordFunction
-
-
-class XMEyeMsgCode(enum.IntEnum):
- LoginCmd = 1000
- LoginReply = LoginCmd + 1
-
- KeepAliveCmd = 1006
- KeepAliveReply = KeepAliveCmd + 1
-
- SetConfigCmd = 1040
- SetConfigReply = SetConfigCmd + 1
-
- GetConfigCmd = 1042
- GetConfidReply = GetConfigCmd + 1
-
- GetSystemInfoCmd = 1020
- GetSystemInfoReply = GetSystemInfoCmd + 1
-
- ChannelTitleCmd = 1048
- ChannelTitleReply = ChannelTitleCmd + 1
-
- SystemFunctionCmd = 1360
- SystemFunctionReply = SystemFunctionCmd + 1
-
- OPMonitorStartStopCmd = 1410
- OPMonitorStartStopReply = OPMonitorStartStopCmd + 1
-
- OPMonitorClaimCmd = 1413
- OPMonitorClaimReply = OPMonitorClaimCmd + 1
-
- OPTimeQueryCmd = 1452
- OPTimeQueryReply = OPTimeQueryCmd + 1
-
- VideoStreamData = 1412
-
-
-class AudioVideoDataType(enum.IntEnum):
- VideoIncomplete = 0xfc
- VideoComplete = 0xfd
- AudioComplete = 0xfa
-
-
-class AudioVideoPayload():
- _HeaderFields = collections.namedtuple("HeaderFields", ["unknown1", "channel", "datatype", "unknown2", "length"])
- _HeaderStruct = struct.Struct("< H B B H H")
-
- def __init__(self, payload, hint=""):
- self._header = self._HeaderFields(*self._HeaderStruct.unpack(payload[:self._HeaderStruct.size]))
- print(
- "%20s [%5d]: %s %s" % (hint, len(payload), " ".join("%02x" % (c) for c in payload[:8]), str(self._header)))
- # if len(payload) != (self._HeaderStruct.size + self._header.length):
- # raise Exception("Unexpected AV payload, expected %d bytes but got %d." % (self._HeaderStruct.size + self._header.length, len(payload)))
- # print(self._header)
- self._data = payload[self._HeaderStruct.size:]
-
- @property
- def data(self):
- return self._data
-
-
-class XMEyeMessage():
- _HeaderFields = collections.namedtuple("HeaderFields", ["station", "session", "unknown", "msgcode", "length"])
- _HeaderStruct = struct.Struct("< L L 6s H L")
-
- def __init__(self, station, session, msgcode, message):
- if isinstance(message, (bytes, bytearray)):
- self._message = bytes(message)
- else:
- self._message = json.dumps(message).encode("ascii")
- self._header = self._HeaderFields(station=station,
- session=session,
- unknown=bytes(6),
- msgcode=msgcode,
- length=len(self._message))
-
- @property
- def header(self):
- return self._header
-
- @property
- def message(self):
- return self._message
-
- @property
- def payload(self):
- msg = self.message.rstrip(bytes(1))
- try:
- data = json.loads(msg)
- except (json.JSONDecodeError, UnicodeError):
- return self.message
- return data
-
- @property
- def length(self):
- return len(self.message)
-
- def __bytes__(self):
- header = self._HeaderStruct.pack(*self._header)
- return header + self._message
-
- @classmethod
- def deserialize(cls, msg):
- header_data = msg[:20]
- header = cls._HeaderFields(*cls._HeaderStruct.unpack(header_data))
- payload = msg[20: 20 + header.length]
- if len(payload) < header.length:
- payload += bytes(header.length - len(payload))
-
- try:
- msgcode = XMEyeMsgCode(header.msgcode)
- except ValueError:
- msgcode = header.msgcode
- return cls(station=header.station, session=header.session, msgcode=msgcode, message=payload)
-
- @classmethod
- def deserialize_all(cls, msg):
- msg = bytearray(msg)
- while len(msg) >= 20:
- next_msg = cls.deserialize(msg)
- yield next_msg
- msg = msg[20 + next_msg.length:]
-
- def dump(self):
- print("%s (%d bytes payload):" % (self._header.msgcode, self.length))
- if isinstance(self.payload, bytes):
- print(self.payload)
- else:
- print(json.dumps(self.payload, indent=4, sort_keys=True))
- print()
-
- def __repr__(self):
- return "Msg(%s): %s" % (self.header, str(self.payload))
-
-
-class XMEyeCamera():
- def __init__(self, hostname, username="admin", password="", port=34567):
- self._hostname = hostname
- self._conn = socket.create_connection((hostname, port))
- self._session = None
- self._username = username
- self._password = password
-
- @property
- def derived_password(self):
- return HorrificallyBrokenPasswordFunction.derive(self._password)
-
- @property
- def rtsp_uri(self):
- rtsp_port = 554
- return "rtsp://%s:%d/user=%s&password=%s&channel=" % (
- self._hostname, rtsp_port, self._username, self.derived_password)
-
- def _rx_bytes(self, length):
- result = bytearray()
- while len(result) < length:
- remaining_bytes = length - len(result)
- rx_data = self._conn.recv(remaining_bytes)
- result += rx_data
- return result
-
- def _rx(self):
- response_header = self._rx_bytes(20)
- header = XMEyeMessage.deserialize(response_header)
- payload_data = self._rx_bytes(header.length)
-
- response_data = response_header + payload_data
- response = XMEyeMessage.deserialize(response_data)
- # print("<-", response)
- return response
-
- def _tx(self, command):
- # print("->", command)
- data = bytes(command)
- self._conn.send(data)
-
- def _tx_rx(self, command):
- self._tx(command)
- return self._rx()
-
- def login(self):
- data = {
- "EncryptType": "MD5",
- "LoginType": "DVRIP-Web",
- "UserName": self._username,
- "PassWord": self.derived_password,
- }
- command = XMEyeMessage(station=0xff, session=0, msgcode=XMEyeMsgCode.LoginCmd, message=data)
- response = self._tx_rx(command)
- if int(response.payload["Ret"]) == 100:
- # Login successful
- self._session = int(response.payload["SessionID"], 16)
- else:
- raise Exception("Login failed:", response)
-
- def _generic_cmd(self, name, msgcode):
- data = {
- "Name": name,
- "SessionID": "0x%x" % (self._session,),
- }
- command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
- return self._tx_rx(command)
-
- def get_systeminfo(self):
- return self._generic_cmd("SystemInfo", XMEyeMsgCode.GetSystemInfoCmd)
-
- def get_channel_title(self):
- return self._generic_cmd("ChannelTitle", XMEyeMsgCode.ChannelTitleCmd)
-
- def get_system_function(self):
- return self._generic_cmd("SystemFunction", XMEyeMsgCode.SystemFunctionCmd)
-
- def get_talk_audio_format(self):
- return self._generic_cmd("TalkAudioFormat", XMEyeMsgCode.SystemFunctionCmd)
-
- def get_ntp_server(self):
- ntp_config = self._generic_cmd("NetWork.NetNTP", XMEyeMsgCode.GetConfigCmd)
- return ntp_config.payload['NetWork.NetNTP']['Server']['Name']
-
- def set_ntp_server(self,
- ntp_host: str,
- ntp_port: int = 123) -> None:
- data = {
- 'Name': 'NetWork.NetNTP',
- 'NetWork.NetNTP': {
- 'Enable': True,
- 'Server': {
- 'Address': '0x00000000',
- 'Anonymity': False,
- 'Name': ntp_host,
- 'Password': '',
- 'Port': ntp_port,
- 'UserName': ''
- },
- 'TimeZone': 13, # Moscow time
- 'UpdatePeriod': 60
- },
- "SessionID": "0x%x" % (self._session,),
- }
- command = XMEyeMessage(station=0xff, session=self._session, msgcode=XMEyeMsgCode.SetConfigCmd, message=data)
- self._tx_rx(command)
-
- def _opmonitor_cmd(self, action, msgcode, rx_msg=True):
- data = {
- "Name": "OPMonitor",
- "OPMonitor": {
- "Action": action,
- "Parameter": {
- "Channel": 0,
- "CombinMode": "CONNECT_ALL",
- "StreamType": "Main",
- "TransMode": "TCP",
- },
- },
- "SessionID": "0x%x" % (self._session,),
- }
- command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
- if rx_msg:
- return self._tx_rx(command)
- else:
- self._tx(command)
-
- def get_stream(self, packet_callback):
- self._opmonitor_cmd("Claim", XMEyeMsgCode.OPMonitorClaimCmd)
- self._opmonitor_cmd("Start", XMEyeMsgCode.OPMonitorStartStopCmd, rx_msg=False)
- while True:
- rx_pkt = self._rx()
- packet_callback(rx_pkt)
-
- # def playback_stream(self):
- # mplayer_process = subprocess.Popen(["mplayer", "-demuxer", "h264es", "-"], stdin=subprocess.PIPE)
- # with open("audio.raw", "wb") as f, open("video.raw", "wb") as video_f:
- # def pkt_callback(pkt):
- # if (pkt.header.station == 511) and (pkt.header.msgcode == XMEyeMsgCode.VideoStreamData):
- # avpayload = AudioVideoPayload(pkt.payload, hint="video")
- # mplayer_process.stdin.raw.write(pkt.payload)
- # video_f.write(pkt.payload)
- # elif pkt.header.msgcode == XMEyeMsgCode.VideoStreamData:
- # # Audio data?
- # avpayload = AudioVideoPayload(pkt.payload, hint="audio")
- # f.write(avpayload.data)
- # elif pkt.header.msgcode != XMEyeMsgCode.VideoStreamData:
- # print(pkt)
- #
- # self.get_stream(pkt_callback)
-
-
-if __name__ == '__main__':
- cam = XMEyeCamera("192.168.0.47", password="DerPr03ess")
- cam.login()
- print(cam.get_systeminfo())
- print(cam.get_channel_title())
- # print(cam.get_talk_audio_format())
- # print(cam.get_system_function())