diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2024-02-19 01:44:02 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2024-02-19 01:44:11 +0300 |
commit | 3741f7cf78a288e967415ccb6736c888a21c211b (patch) | |
tree | a48d8331c9936d6c108de4d0f9179a089b1e56e6 /include/py/homekit/openwrt | |
parent | d79309e498cdc1358c81367ce2a93a5731e517d1 (diff) |
web_kbn: almost completely ported lws to python
Diffstat (limited to 'include/py/homekit/openwrt')
-rw-r--r-- | include/py/homekit/openwrt/__init__.py | 9 | ||||
-rw-r--r-- | include/py/homekit/openwrt/config.py | 14 | ||||
-rw-r--r-- | include/py/homekit/openwrt/openwrt.py | 90 |
3 files changed, 113 insertions, 0 deletions
diff --git a/include/py/homekit/openwrt/__init__.py b/include/py/homekit/openwrt/__init__.py new file mode 100644 index 0000000..b233b00 --- /dev/null +++ b/include/py/homekit/openwrt/__init__.py @@ -0,0 +1,9 @@ +from .config import OpenwrtConfig +from .openwrt import ( + ipset_list_all, + ipset_add, + ipset_del, + set_upstream, + get_default_route, + get_dhcp_leases +) diff --git a/include/py/homekit/openwrt/config.py b/include/py/homekit/openwrt/config.py new file mode 100644 index 0000000..bd75d1c --- /dev/null +++ b/include/py/homekit/openwrt/config.py @@ -0,0 +1,14 @@ +from typing import Optional + +from homekit.config import ConfigUnit + + +class OpenwrtConfig(ConfigUnit): + NAME = 'openwrt' + + @classmethod + def schema(cls) -> Optional[dict]: + return { + 'ip': cls._addr_schema(only_ip=True, required=True), + 'command_id': {'type': 'string', 'required': True} + }
\ No newline at end of file diff --git a/include/py/homekit/openwrt/openwrt.py b/include/py/homekit/openwrt/openwrt.py new file mode 100644 index 0000000..d5f949c --- /dev/null +++ b/include/py/homekit/openwrt/openwrt.py @@ -0,0 +1,90 @@ +import requests +import logging + +from datetime import datetime +from collections import namedtuple +from urllib.parse import quote_plus +from .config import OpenwrtConfig +from ..modem.config import ModemsConfig + +DHCPLease = namedtuple('DHCPLease', 'time, time_s, mac, ip, hostname') +_config = OpenwrtConfig() +_modems_config = ModemsConfig() +_logger = logging.getLogger(__name__) + + +def ipset_list_all() -> list: + args = ['ipset-list-all'] + args += _modems_config.keys() + lines = _to_list(_call(args)) + sets = {} + cur_set = None + for line in lines: + if line.startswith('>'): + cur_set = line[1:] + if cur_set not in sets: + sets[cur_set] = [] + continue + + if cur_set is None: + _logger.error('ipset_list_all: cur_set is not set') + continue + + sets[cur_set].append(line) + + return sets + + +def ipset_add(set_name: str, ip: str): + return _call(['ipset-add', set_name, ip]) + + +def ipset_del(set_name: str, ip: str): + return _call(['ipset-del', set_name, ip]) + + +def set_upstream(ip: str): + return _call(['homekit-set-default-upstream', ip]) + + +def get_default_route(): + return _call(['get-default-route']) + + +def get_dhcp_leases() -> list[DHCPLease]: + return list(map(lambda item: _to_dhcp_lease(item), _to_list(_call(['dhcp-leases'])))) + + +def _call(arguments: list[str]) -> str: + url = _get_link(arguments) + r = requests.get(url) + r.raise_for_status() + return r.text.strip() + + +def _get_link(arguments: list[str]) -> str: + url = f'http://{_config["ip"]}/cgi-bin/luci/command/{_config["command_id"]}' + if arguments: + url += '/' + url += '%20'.join(list(map(lambda arg: quote_plus(arg.replace('/', '_')), arguments))) + return url + + +def _to_list(s: str) -> list: + return [] if s == '' else s.split('\n') + + +def _to_dhcp_lease(s: str) -> DHCPLease: + words = s.split(' ') + time = int(words.pop(0)) + mac = words.pop(0) + ip = words.pop(0) + words.pop() + hostname = (' '.join(words)).strip() + if not hostname or hostname == '*': + hostname = '?' + return DHCPLease(time=time, + time_s=datetime.fromtimestamp(time).strftime('%d %b, %H:%M:%S'), + mac=mac, + ip=ip, + hostname=hostname)
\ No newline at end of file |