summaryrefslogtreecommitdiff
path: root/include/py/homekit/openwrt
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2024-02-19 01:44:02 +0300
committerEvgeny Zinoviev <me@ch1p.io>2024-02-19 01:44:11 +0300
commit3741f7cf78a288e967415ccb6736c888a21c211b (patch)
treea48d8331c9936d6c108de4d0f9179a089b1e56e6 /include/py/homekit/openwrt
parentd79309e498cdc1358c81367ce2a93a5731e517d1 (diff)
web_kbn: almost completely ported lws to python
Diffstat (limited to 'include/py/homekit/openwrt')
-rw-r--r--include/py/homekit/openwrt/__init__.py9
-rw-r--r--include/py/homekit/openwrt/config.py14
-rw-r--r--include/py/homekit/openwrt/openwrt.py90
3 files changed, 113 insertions, 0 deletions
diff --git a/include/py/homekit/openwrt/__init__.py b/include/py/homekit/openwrt/__init__.py
new file mode 100644
index 0000000..b233b00
--- /dev/null
+++ b/include/py/homekit/openwrt/__init__.py
@@ -0,0 +1,9 @@
+from .config import OpenwrtConfig
+from .openwrt import (
+ ipset_list_all,
+ ipset_add,
+ ipset_del,
+ set_upstream,
+ get_default_route,
+ get_dhcp_leases
+)
diff --git a/include/py/homekit/openwrt/config.py b/include/py/homekit/openwrt/config.py
new file mode 100644
index 0000000..bd75d1c
--- /dev/null
+++ b/include/py/homekit/openwrt/config.py
@@ -0,0 +1,14 @@
+from typing import Optional
+
+from homekit.config import ConfigUnit
+
+
+class OpenwrtConfig(ConfigUnit):
+ NAME = 'openwrt'
+
+ @classmethod
+ def schema(cls) -> Optional[dict]:
+ return {
+ 'ip': cls._addr_schema(only_ip=True, required=True),
+ 'command_id': {'type': 'string', 'required': True}
+ } \ No newline at end of file
diff --git a/include/py/homekit/openwrt/openwrt.py b/include/py/homekit/openwrt/openwrt.py
new file mode 100644
index 0000000..d5f949c
--- /dev/null
+++ b/include/py/homekit/openwrt/openwrt.py
@@ -0,0 +1,90 @@
+import requests
+import logging
+
+from datetime import datetime
+from collections import namedtuple
+from urllib.parse import quote_plus
+from .config import OpenwrtConfig
+from ..modem.config import ModemsConfig
+
+DHCPLease = namedtuple('DHCPLease', 'time, time_s, mac, ip, hostname')
+_config = OpenwrtConfig()
+_modems_config = ModemsConfig()
+_logger = logging.getLogger(__name__)
+
+
+def ipset_list_all() -> list:
+ args = ['ipset-list-all']
+ args += _modems_config.keys()
+ lines = _to_list(_call(args))
+ sets = {}
+ cur_set = None
+ for line in lines:
+ if line.startswith('>'):
+ cur_set = line[1:]
+ if cur_set not in sets:
+ sets[cur_set] = []
+ continue
+
+ if cur_set is None:
+ _logger.error('ipset_list_all: cur_set is not set')
+ continue
+
+ sets[cur_set].append(line)
+
+ return sets
+
+
+def ipset_add(set_name: str, ip: str):
+ return _call(['ipset-add', set_name, ip])
+
+
+def ipset_del(set_name: str, ip: str):
+ return _call(['ipset-del', set_name, ip])
+
+
+def set_upstream(ip: str):
+ return _call(['homekit-set-default-upstream', ip])
+
+
+def get_default_route():
+ return _call(['get-default-route'])
+
+
+def get_dhcp_leases() -> list[DHCPLease]:
+ return list(map(lambda item: _to_dhcp_lease(item), _to_list(_call(['dhcp-leases']))))
+
+
+def _call(arguments: list[str]) -> str:
+ url = _get_link(arguments)
+ r = requests.get(url)
+ r.raise_for_status()
+ return r.text.strip()
+
+
+def _get_link(arguments: list[str]) -> str:
+ url = f'http://{_config["ip"]}/cgi-bin/luci/command/{_config["command_id"]}'
+ if arguments:
+ url += '/'
+ url += '%20'.join(list(map(lambda arg: quote_plus(arg.replace('/', '_')), arguments)))
+ return url
+
+
+def _to_list(s: str) -> list:
+ return [] if s == '' else s.split('\n')
+
+
+def _to_dhcp_lease(s: str) -> DHCPLease:
+ words = s.split(' ')
+ time = int(words.pop(0))
+ mac = words.pop(0)
+ ip = words.pop(0)
+ words.pop()
+ hostname = (' '.join(words)).strip()
+ if not hostname or hostname == '*':
+ hostname = '?'
+ return DHCPLease(time=time,
+ time_s=datetime.fromtimestamp(time).strftime('%d %b, %H:%M:%S'),
+ mac=mac,
+ ip=ip,
+ hostname=hostname) \ No newline at end of file