#!/usr/bin/env python3 import include_homekit import asyncio import logging import jinja2 import aiohttp_jinja2 import json import re import inverterd import phonenumbers import time import os.path from io import StringIO from aiohttp import web from typing import Optional, Union from urllib.parse import quote_plus from contextvars import ContextVar from homekit.config import config, AppConfigUnit, is_development_mode, Translation, Language from homekit.camera import IpcamConfig from homekit.util import homekit_path, filesize_fmt, seconds_to_human_readable_string, json_serial, validate_ipv4 from homekit.modem import E3372, ModemsConfig, MacroNetWorkType from homekit.inverter.config import InverterdConfig from homekit.relay.sunxi_h3_client import RelayClient from homekit import openwrt, http class WebKbnConfig(AppConfigUnit): NAME = 'web_kbn' @classmethod def schema(cls) -> Optional[dict]: return { 'listen_addr': cls._addr_schema(required=True), 'assets_public_path': {'type': 'string'}, 'pump_addr': cls._addr_schema(required=True), 'hls_local_host': cls._addr_schema(required=True, only_ip=True), 'inverter_grafana_url': {'type': 'string'}, 'sensors_grafana_url': {'type': 'string'}, } # files marked with + at the beginning are included by default common_static_files = { '+bootstrap.min.css': 1, '+bootstrap.bundle.min.js': 1, '+polyfills.js': 1, '+app.js': 8, '+app.css': 6, 'hls.js': 1 } routes = web.RouteTableDef() logger = logging.getLogger(__name__) lang_context_var = ContextVar('lang', default=Translation.DEFAULT_LANGUAGE) def get_js_link(file, version) -> str: if is_development_mode(): version = int(time.time()) file += f'?version={version}' return f'' def get_css_link(file, version) -> str: if is_development_mode(): version = int(time.time()) file += f'?version={version}' return f'' def get_head_static(additional_files=None) -> str: buf = StringIO() if additional_files is None: additional_files = [] for file, version in common_static_files.items(): enabled_by_default = file.startswith('+') if not enabled_by_default and file not in additional_files: continue if enabled_by_default: file = file[1:] if file.endswith('.js'): buf.write(get_js_link(file, version)) else: buf.write(get_css_link(file, version)) return buf.getvalue() def get_modem_client(modem_cfg: dict) -> E3372: return E3372(modem_cfg['ip'], legacy_token_auth=modem_cfg['legacy_auth']) def get_modem_data(modem_cfg: dict, get_raw=False) -> Union[dict, tuple]: cl = get_modem_client(modem_cfg) signal = cl.device_signal status = cl.monitoring_status traffic = cl.traffic_stats if get_raw: device_info = cl.device_information dialup_conn = cl.dialup_connection return signal, status, traffic, device_info, dialup_conn else: network_type_label = re.sub('^MACRO_NET_WORK_TYPE(_EX)?_', '', MacroNetWorkType(int(status['CurrentNetworkType'])).name) return { 'type': network_type_label, 'level': int(status['SignalIcon']) if 'SignalIcon' in status else 0, 'rssi': signal['rssi'], 'sinr': signal['sinr'], 'connected_time': seconds_to_human_readable_string(int(traffic['CurrentConnectTime'])), 'downloaded': filesize_fmt(int(traffic['CurrentDownload'])), 'uploaded': filesize_fmt(int(traffic['CurrentUpload'])) } def get_pump_client() -> RelayClient: addr = config.app_config['pump_addr'] cl = RelayClient(host=addr.host, port=addr.port) cl.connect() return cl def get_inverter_client() -> inverterd.Client: cl = inverterd.Client(host=InverterdConfig()['remote_addr'].host) cl.connect() cl.format(inverterd.Format.JSON) return cl def get_inverter_data() -> tuple: cl = get_inverter_client() status = json.loads(cl.exec('get-status'))['data'] rated = json.loads(cl.exec('get-rated'))['data'] power_direction = status['battery_power_direction'].lower() power_direction = re.sub('ge$', 'ging', power_direction) charging_rate = '' if power_direction == 'charging': charging_rate = ' @ %s %s' % ( status['battery_charge_current']['value'], status['battery_charge_current']['unit']) elif power_direction == 'discharging': charging_rate = ' @ %s %s' % ( status['battery_discharge_current']['value'], status['battery_discharge_current']['unit']) html = 'Battery: %s %s' % ( status['battery_voltage']['value'], status['battery_voltage']['unit']) html += ' (%s%s, ' % ( status['battery_capacity']['value'], status['battery_capacity']['unit']) html += '%s%s)' % (power_direction, charging_rate) html += "\n" html += 'Load: %s %s' % ( status['ac_output_active_power']['value'], status['ac_output_active_power']['unit']) html += ' (%s%%)' % (status['output_load_percent']['value'],) if status['pv1_input_power']['value'] > 0: html += "\n" html += 'Input power: %s %s' % ( status['pv1_input_power']['value'], status['pv1_input_power']['unit']) if status['grid_voltage']['value'] > 0 or status['grid_freq']['value'] > 0: html += "\n" html += 'AC input: %s %s' % ( status['grid_voltage']['value'], status['grid_voltage']['unit']) html += ', %s %s' % ( status['grid_freq']['value'], status['grid_freq']['unit']) html += "\n" html += 'Priority: %s' % (rated['output_source_priority'],) html = html.replace("\n", '
') return status, rated, html def get_current_upstream() -> str: r = openwrt.get_default_route() logger.info(f'default route: {r}') mc = ModemsConfig() for k, v in mc.items(): if 'gateway_ip' in v and v['gateway_ip'] == r: r = v['ip'] break upstream = None for k, v in mc.items(): if r == v['ip']: upstream = k if not upstream: raise RuntimeError('failed to determine current upstream!') return upstream def get_preferred_lang(req: web.Request) -> Language: lang_cookie = req.cookies.get('lang', None) if lang_cookie is None: return Translation.DEFAULT_LANGUAGE try: return Language(lang_cookie) except ValueError: logger.debug(f"unsupported lang_cookie value: {lang_cookie}") return Translation.DEFAULT_LANGUAGE @web.middleware async def language_middleware(request, handler): lang_context_var.set(get_preferred_lang(request)) return await handler(request) def lang(key, unit='web_kbn'): strings = Translation(unit) if isinstance(key, str) and '.' in key: return strings.get(lang_context_var.get()).get(key) else: return strings.get(lang_context_var.get())[key] async def render(req: web.Request, template_name: str, title: Optional[str] = None, context: Optional[dict] = None, assets: Optional[list] = None): if context is None: context = {} context = { **context, 'head_static': get_head_static(assets), 'user_lang': lang_context_var.get().value } if title is not None: context['title'] = title response = aiohttp_jinja2.render_template(template_name+'.j2', req, context=context) return response @routes.get('/') async def index0(req: web.Request): raise web.HTTPFound('main.cgi') @routes.get('/main.cgi') async def index(req: web.Request): tabs = ['zones', 'list'] tab = req.query.get('tab', None) if tab and (tab not in tabs or tab == tabs[0]): raise web.HTTPFound('main.cgi') if tab is None: tab = tabs[0] ctx = {} for k in 'inverter', 'sensors': ctx[f'{k}_grafana_url'] = config.app_config[f'{k}_grafana_url'] cc = IpcamConfig() ctx['camzones'] = cc['zones'].keys() ctx['allcams'] = cc.get_all_cam_names() ctx['lang_enum'] = Language ctx['lang_selected'] = lang_context_var.get() ctx['tab_selected'] = tab ctx['tabs'] = tabs return await render(req, 'index', title=lang('sitename'), context=ctx) @routes.get('/modems.cgi') async def modems(req: web.Request): return await render(req, 'modems', title=lang('modem_statuses'), context=dict(modems=ModemsConfig())) @routes.get('/modems_info.ajx') async def modems_ajx(req: web.Request): mc = ModemsConfig() modem = req.query.get('id', None) if modem not in mc.keys(): raise ValueError('invalid modem id') modem_cfg = mc.get(modem) loop = asyncio.get_event_loop() modem_data = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg)) html = aiohttp_jinja2.render_string('modem_data.j2', req, context=dict( modem_data=modem_data, modem=modem )) return http.ajax_ok({'html': html}) @routes.get('/modems_verbose.cgi') async def modems_verbose(req: web.Request): modem = req.query.get('id', None) if modem not in ModemsConfig().keys(): raise ValueError('invalid modem id') modem_cfg = ModemsConfig().get(modem) loop = asyncio.get_event_loop() signal, status, traffic, device, dialup_conn = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg, True)) data = [ ['Signal', signal], ['Connection', status], ['Traffic', traffic], ['Device info', device], ['Dialup connection', dialup_conn] ] modem_name = Translation('modems').get(lang_context_var.get())[modem]['full'] return await render(req, 'modem_verbose', title=lang('modem_verbose_info_about_modem') % (modem_name,), context=dict(data=data, modem_name=modem_name)) @routes.get('/sms.cgi') async def sms(req: web.Request): modem = req.query.get('id', list(ModemsConfig().keys())[0]) is_outbox = int(req.query.get('outbox', 0)) == 1 error = req.query.get('error', None) sent = int(req.query.get('sent', 0)) == 1 cl = get_modem_client(ModemsConfig()[modem]) messages = cl.sms_list(1, 20, is_outbox) return await render(req, 'sms', title=lang('sms_page_title') % (lang('sms_outbox') if is_outbox else lang('sms_inbox'), modem), context=dict( modems=ModemsConfig(), selected_modem=modem, is_outbox=is_outbox, error=error, is_sent=sent, messages=messages )) @routes.post('/sms.cgi') async def sms_post(req: web.Request): modem = req.query.get('id', list(ModemsConfig().keys())[0]) is_outbox = int(req.query.get('outbox', 0)) == 1 fd = await req.post() phone = fd.get('phone', None) text = fd.get('text', None) return_url = f'sms.cgi?id={modem}&outbox={int(is_outbox)}' phone = re.sub('\s+', '', phone) if len(phone) > 4: country = None if not phone.startswith('+'): country = 'RU' number = phonenumbers.parse(phone, country) if not phonenumbers.is_valid_number(number): raise web.HTTPFound(f'{return_url}&error=Неверный+номер') phone = phonenumbers.format_number(number, phonenumbers.PhoneNumberFormat.E164) cl = get_modem_client(ModemsConfig()[modem]) cl.sms_send(phone, text) raise web.HTTPFound(return_url) @routes.get('/inverter.cgi') async def inverter(req: web.Request): action = req.query.get('do', None) if action == 'set-osp': val = req.query.get('value') if val not in ('sub', 'sbu'): raise ValueError('invalid osp value') cl = get_inverter_client() cl.exec('set-output-source-priority', arguments=(val.upper(),)) raise web.HTTPFound('inverter.cgi') status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data) return await render(req, 'inverter', title=lang('inverter'), context=dict(status=status, rated=rated, html=html)) @routes.get('/inverter.ajx') async def inverter_ajx(req: web.Request): status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data) return http.ajax_ok({'html': html}) @routes.get('/pump.cgi') async def pump(req: web.Request): # TODO # these are blocking calls # should be rewritten using aio cl = get_pump_client() action = req.query.get('set', None) if action in ('on', 'off'): getattr(cl, action)() raise web.HTTPFound('pump.cgi') status = cl.status() return await render(req, 'pump', title=lang('pump'), context=dict(status=status)) @routes.get('/cams.cgi') async def cams(req: web.Request): cc = IpcamConfig() cam = req.query.get('id', None) zone = req.query.get('zone', None) debug_hls = bool(req.query.get('debug_hls', False)) debug_video_events = bool(req.query.get('debug_video_events', False)) if cam is not None: if not cc.has_camera(int(cam)): raise ValueError('invalid camera id') cams = [int(cam)] mode = {'type': 'single', 'cam': int(cam)} elif zone is not None: if not cc.has_zone(zone): raise ValueError('invalid zone') cams = cc['zones'][zone] mode = {'type': 'zone', 'zone': zone} else: cams = cc.get_all_cam_names() mode = {'type': 'all'} if req.headers.get('Host').endswith('.manor.id'): hls_pfx = 'https://'+req.headers.get('Host') hls_pfx += re.sub(r'/home/?$', '/ipcam/', os.path.dirname(req.headers.get('X-Real-URI'))) else: hls_pfx = 'http://'+config.app_config['hls_local_host']+'/ipcam/' js_config = { 'pfx': hls_pfx, # 'host': config.app_config['hls_local_host'], # 'proto': 'http', 'cams': cams, 'hlsConfig': { 'opts': { 'startPosition': -1, # https://github.com/video-dev/hls.js/issues/3884#issuecomment-842380784 'liveSyncDuration': 2, 'liveMaxLatencyDuration': 3, 'maxLiveSyncPlaybackRate': 2, 'liveDurationInfinity': True }, 'debugVideoEvents': debug_video_events, 'debug': debug_hls } } return await render(req, 'cams', title=lang('cams'), assets=['hls.js'], context=dict( mode=mode, js_config=js_config, )) @routes.get('/routing_main.cgi') async def routing_main(req: web.Request): upstream = get_current_upstream() set_upstream_to = req.query.get('set-upstream-to', None) mc = ModemsConfig() if set_upstream_to and set_upstream_to in mc and set_upstream_to != upstream: modem = mc[set_upstream_to] new_upstream = str(modem['gateway_ip'] if 'gateway_ip' in modem else modem['ip']) openwrt.set_upstream(new_upstream) raise web.HTTPFound('routing_main.cgi') context = dict( upstream=upstream, selected_tab='main', modems=mc.keys() ) return await render(req, 'routing_main', title=lang('routing'), context=context) @routes.get('/routing_rules.cgi') async def routing_rules(req: web.Request): mc = ModemsConfig() action = req.query.get('action', None) error = req.query.get('error', None) set_name = req.query.get('set', None) ip = req.query.get('ip', None) def validate_input(): # validate set if not set_name or set_name not in mc: raise ValueError(f'invalid set \'{set_name}\'') # validate ip if not isinstance(ip, str): raise ValueError('invalid ip') slash_pos = None try: slash_pos = ip.index('/') except ValueError: pass if slash_pos is not None: ip_without_mask = ip[0:slash_pos] else: ip_without_mask = ip if not validate_ipv4(ip_without_mask): raise ValueError(f'invalid ip \'{ip}\'') base_url = 'routing_rules.cgi' if action in ('add', 'del'): try: validate_input() except ValueError as e: raise web.HTTPFound(f'{base_url}?error='+quote_plus(str(e))) f = getattr(openwrt, f'ipset_{action}') output = f(set_name, ip) url = base_url if output != '': url += '?error='+quote_plus(output) raise web.HTTPFound(url) ipsets = openwrt.ipset_list_all() context = dict( sets=ipsets, selected_tab='rules', error=error ) return await render(req, 'routing_rules', title=lang('routing') + ' // ' + lang('routing_rules'), context=context) @routes.get('/routing_dhcp.cgi') async def routing_dhcp(req: web.Request): leases = openwrt.get_dhcp_leases() return await render(req, 'routing_dhcp', title=lang('routing') + ' // DHCP', context=dict(leases=leases, selected_tab='dhcp')) @routes.get('/debug.cgi') async def debug(req: web.Request): info = dict( headers=dict(req.headers), host=req.headers.get('Host'), url=str(req.url), method=req.method, ) return http.ajax_ok(info) def init_web_app(app: web.Application): app.middlewares.append(language_middleware) aiohttp_jinja2.setup( app, loader=jinja2.FileSystemLoader(homekit_path('web', 'kbn_templates')), autoescape=jinja2.select_autoescape(['html', 'xml']), ) env = aiohttp_jinja2.get_env(app) # @pass_context is used only to prevent jinja2 from caching the result of lang filter results of constant values. # as of now i don't know a better way of doing it @jinja2.pass_context def filter_lang(ctx, key, unit='web_kbn'): return lang(key, unit) env.filters['tojson'] = lambda obj: json.dumps(obj, separators=(',', ':'), default=json_serial) env.filters['lang'] = filter_lang app.router.add_static('/assets/', path=homekit_path('web', 'kbn_assets')) if __name__ == '__main__': config.load_app(WebKbnConfig) http.serve(addr=config.app_config['listen_addr'], routes=routes, before_start=init_web_app)