From 7058d0f5063dc9b065248d0a906cf874788caecf Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Wed, 13 Sep 2023 09:34:49 +0300 Subject: save --- include/py/homekit/modem/__init__.py | 1 + include/py/homekit/modem/config.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+) create mode 100644 include/py/homekit/modem/__init__.py create mode 100644 include/py/homekit/modem/config.py (limited to 'include/py/homekit/modem') diff --git a/include/py/homekit/modem/__init__.py b/include/py/homekit/modem/__init__.py new file mode 100644 index 0000000..20e75b7 --- /dev/null +++ b/include/py/homekit/modem/__init__.py @@ -0,0 +1 @@ +from .config import ModemsConfig \ No newline at end of file diff --git a/include/py/homekit/modem/config.py b/include/py/homekit/modem/config.py new file mode 100644 index 0000000..16d1ba0 --- /dev/null +++ b/include/py/homekit/modem/config.py @@ -0,0 +1,29 @@ +from ..config import ConfigUnit, Translation +from typing import Optional + + +class ModemsConfig(ConfigUnit): + NAME = 'modems' + + _strings: Translation + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._strings = Translation('modems') + + @classmethod + def schema(cls) -> Optional[dict]: + return { + 'type': 'dict', + 'schema': { + 'ip': cls._addr_schema(required=True, only_ip=True), + 'gateway_ip': cls._addr_schema(required=False, only_ip=True), + 'legacy_auth': {'type': 'boolean', 'required': True} + } + } + + def getshortname(self, modem: str, lang=Translation.DEFAULT_LANGUAGE): + return self._strings.get(lang)[modem]['short'] + + def getfullname(self, modem: str, lang=Translation.DEFAULT_LANGUAGE): + return self._strings.get(lang)[modem]['full'] \ No newline at end of file -- cgit v1.2.3 From da5db8bc280deab0e2081f39d2f32aabb2372afe Mon Sep 17 00:00:00 2001 From: Evgeny Sorokin Date: Tue, 16 Jan 2024 02:05:00 +0300 Subject: wip --- include/py/homekit/modem/__init__.py | 3 +- include/py/homekit/modem/e3372.py | 253 +++++++++++++++++++++++++++++++++++ 2 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 include/py/homekit/modem/e3372.py (limited to 'include/py/homekit/modem') diff --git a/include/py/homekit/modem/__init__.py b/include/py/homekit/modem/__init__.py index 20e75b7..ea0930e 100644 --- a/include/py/homekit/modem/__init__.py +++ b/include/py/homekit/modem/__init__.py @@ -1 +1,2 @@ -from .config import ModemsConfig \ No newline at end of file +from .config import ModemsConfig +from .e3372 import E3372, MacroNetWorkType diff --git a/include/py/homekit/modem/e3372.py b/include/py/homekit/modem/e3372.py new file mode 100644 index 0000000..f68db5a --- /dev/null +++ b/include/py/homekit/modem/e3372.py @@ -0,0 +1,253 @@ +import requests +import xml.etree.ElementTree as ElementTree + +from ..util import Addr +from enum import Enum +from ..http import HTTPMethod +from typing import Union + + +class Error(Enum): + ERROR_SYSTEM_NO_SUPPORT = 100002 + ERROR_SYSTEM_NO_RIGHTS = 100003 + ERROR_SYSTEM_BUSY = 100004 + ERROR_LOGIN_USERNAME_WRONG = 108001 + ERROR_LOGIN_PASSWORD_WRONG = 108002 + ERROR_LOGIN_ALREADY_LOGIN = 108003 + ERROR_LOGIN_USERNAME_PWD_WRONG = 108006 + ERROR_LOGIN_USERNAME_PWD_ORERRUN = 108007 + ERROR_LOGIN_TOUCH_ALREADY_LOGIN = 108009 + ERROR_VOICE_BUSY = 120001 + ERROR_WRONG_TOKEN = 125001 + ERROR_WRONG_SESSION = 125002 + ERROR_WRONG_SESSION_TOKEN = 125003 + + +class WifiStatus(Enum): + WIFI_CONNECTING = '900' + WIFI_CONNECTED = '901' + WIFI_DISCONNECTED = '902' + WIFI_DISCONNECTING = '903' + + +class Cradle(Enum): + CRADLE_CONNECTING = '900' + CRADLE_CONNECTED = '901' + CRADLE_DISCONNECTED = '902' + CRADLE_DISCONNECTING = '903' + CRADLE_CONNECTFAILED = '904' + CRADLE_CONNECTSTATUSNULL = '905' + CRANDLE_CONNECTSTATUSERRO = '906' + + +class MacroEVDOLevel(Enum): + MACRO_EVDO_LEVEL_ZERO = '0' + MACRO_EVDO_LEVEL_ONE = '1' + MACRO_EVDO_LEVEL_TWO = '2' + MACRO_EVDO_LEVEL_THREE = '3' + MACRO_EVDO_LEVEL_FOUR = '4' + MACRO_EVDO_LEVEL_FIVE = '5' + + +class MacroNetWorkType(Enum): + MACRO_NET_WORK_TYPE_NOSERVICE = 0 + MACRO_NET_WORK_TYPE_GSM = 1 + MACRO_NET_WORK_TYPE_GPRS = 2 + MACRO_NET_WORK_TYPE_EDGE = 3 + MACRO_NET_WORK_TYPE_WCDMA = 4 + MACRO_NET_WORK_TYPE_HSDPA = 5 + MACRO_NET_WORK_TYPE_HSUPA = 6 + MACRO_NET_WORK_TYPE_HSPA = 7 + MACRO_NET_WORK_TYPE_TDSCDMA = 8 + MACRO_NET_WORK_TYPE_HSPA_PLUS = 9 + MACRO_NET_WORK_TYPE_EVDO_REV_0 = 10 + MACRO_NET_WORK_TYPE_EVDO_REV_A = 11 + MACRO_NET_WORK_TYPE_EVDO_REV_B = 12 + MACRO_NET_WORK_TYPE_1xRTT = 13 + MACRO_NET_WORK_TYPE_UMB = 14 + MACRO_NET_WORK_TYPE_1xEVDV = 15 + MACRO_NET_WORK_TYPE_3xRTT = 16 + MACRO_NET_WORK_TYPE_HSPA_PLUS_64QAM = 17 + MACRO_NET_WORK_TYPE_HSPA_PLUS_MIMO = 18 + MACRO_NET_WORK_TYPE_LTE = 19 + MACRO_NET_WORK_TYPE_EX_NOSERVICE = 0 + MACRO_NET_WORK_TYPE_EX_GSM = 1 + MACRO_NET_WORK_TYPE_EX_GPRS = 2 + MACRO_NET_WORK_TYPE_EX_EDGE = 3 + MACRO_NET_WORK_TYPE_EX_IS95A = 21 + MACRO_NET_WORK_TYPE_EX_IS95B = 22 + MACRO_NET_WORK_TYPE_EX_CDMA_1x = 23 + MACRO_NET_WORK_TYPE_EX_EVDO_REV_0 = 24 + MACRO_NET_WORK_TYPE_EX_EVDO_REV_A = 25 + MACRO_NET_WORK_TYPE_EX_EVDO_REV_B = 26 + MACRO_NET_WORK_TYPE_EX_HYBRID_CDMA_1x = 27 + MACRO_NET_WORK_TYPE_EX_HYBRID_EVDO_REV_0 = 28 + MACRO_NET_WORK_TYPE_EX_HYBRID_EVDO_REV_A = 29 + MACRO_NET_WORK_TYPE_EX_HYBRID_EVDO_REV_B = 30 + MACRO_NET_WORK_TYPE_EX_EHRPD_REL_0 = 31 + MACRO_NET_WORK_TYPE_EX_EHRPD_REL_A = 32 + MACRO_NET_WORK_TYPE_EX_EHRPD_REL_B = 33 + MACRO_NET_WORK_TYPE_EX_HYBRID_EHRPD_REL_0 = 34 + MACRO_NET_WORK_TYPE_EX_HYBRID_EHRPD_REL_A = 35 + MACRO_NET_WORK_TYPE_EX_HYBRID_EHRPD_REL_B = 36 + MACRO_NET_WORK_TYPE_EX_WCDMA = 41 + MACRO_NET_WORK_TYPE_EX_HSDPA = 42 + MACRO_NET_WORK_TYPE_EX_HSUPA = 43 + MACRO_NET_WORK_TYPE_EX_HSPA = 44 + MACRO_NET_WORK_TYPE_EX_HSPA_PLUS = 45 + MACRO_NET_WORK_TYPE_EX_DC_HSPA_PLUS = 46 + MACRO_NET_WORK_TYPE_EX_TD_SCDMA = 61 + MACRO_NET_WORK_TYPE_EX_TD_HSDPA = 62 + MACRO_NET_WORK_TYPE_EX_TD_HSUPA = 63 + MACRO_NET_WORK_TYPE_EX_TD_HSPA = 64 + MACRO_NET_WORK_TYPE_EX_TD_HSPA_PLUS = 65 + MACRO_NET_WORK_TYPE_EX_802_16E = 81 + MACRO_NET_WORK_TYPE_EX_LTE = 101 + + +def post_data_to_xml(data: dict, depth: int = 1) -> str: + if depth == 1: + return ''+post_data_to_xml({'request': data}, depth+1) + + items = [] + for k, v in data.items(): + if isinstance(v, dict): + v = post_data_to_xml(v, depth+1) + elif isinstance(v, list): + raise TypeError('list type is unsupported here') + items.append(f'<{k}>{v}') + + return ''.join(items) + + +class E3372: + _addr: Addr + _need_auth: bool + _legacy_token_auth: bool + _get_raw_data: bool + _headers: dict[str, str] + _authorized: bool + + def __init__(self, + addr: Addr, + need_auth: bool = True, + legacy_token_auth: bool = False, + get_raw_data: bool = False): + self._addr = addr + self._need_auth = need_auth + self._legacy_token_auth = legacy_token_auth + self._get_raw_data = get_raw_data + self._authorized = False + self._headers = {} + + @property + def device_information(self): + self.auth() + return self.request('device/information') + + @property + def device_signal(self): + self.auth() + return self.request('device/signal') + + @property + def monitoring_status(self): + self.auth() + return self.request('monitoring/status') + + @property + def notifications(self): + self.auth() + return self.request('monitoring/check-notifications') + + @property + def dialup_connection(self): + self.auth() + return self.request('dialup/connection') + + @property + def traffic_stats(self): + self.auth() + return self.request('monitoring/traffic-statistics') + + @property + def sms_count(self): + self.auth() + return self.request('sms/sms-count') + + def sms_send(self, phone: str, text: str): + self.auth() + return self.request('sms/send-sms', HTTPMethod.POST, { + 'Index': -1, + 'Phones': { + 'Phone': phone + }, + 'Sca': '', + 'Content': text, + 'Length': -1, + 'Reserved': 1, + 'Date': -1 + }) + + def sms_list(self, page: int = 1, count: int = 20, outbox: bool = False): + self.auth() + xml = self.request('sms/sms-list', HTTPMethod.POST, { + 'PageIndex': page, + 'ReadCount': count, + 'BoxType': 1 if not outbox else 2, + 'SortType': 0, + 'Ascending': 0, + 'UnreadPreferred': 1 if not outbox else 0 + }, return_body=True) + + root = ElementTree.fromstring(xml) + messages = [] + for message_elem in root.find('Messages').findall('Message'): + message_dict = {child.tag: child.text for child in message_elem} + messages.append(message_dict) + return messages + + def auth(self): + if self._authorized: + return + + if not self._legacy_token_auth: + data = self.request('webserver/SesTokInfo') + self._headers = { + 'Cookie': data['SesInfo'], + '__RequestVerificationToken': data['TokInfo'], + 'Content-Type': 'text/xml' + } + else: + data = self.request('webserver/token') + self._headers = { + '__RequestVerificationToken': data['token'], + 'Content-Type': 'text/xml' + } + + self._authorized = True + + def request(self, + method: str, + http_method: HTTPMethod = HTTPMethod.GET, + data: dict = {}, + return_body: bool = False) -> Union[str, dict]: + url = f'http://{self._addr}/api/{method}' + if http_method == HTTPMethod.POST: + data = post_data_to_xml(data) + f = requests.post + else: + data = None + f = requests.get + r = f(url, data=data, headers=self._headers) + r.raise_for_status() + r.encoding = 'utf-8' + + if return_body: + return r.text + + root = ElementTree.fromstring(r.text) + data_dict = {} + for elem in root: + data_dict[elem.tag] = elem.text + return data_dict -- cgit v1.2.3