summaryrefslogtreecommitdiff
path: root/bin/web_kbn.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/web_kbn.py')
-rw-r--r--bin/web_kbn.py354
1 files changed, 354 insertions, 0 deletions
diff --git a/bin/web_kbn.py b/bin/web_kbn.py
new file mode 100644
index 0000000..c21269b
--- /dev/null
+++ b/bin/web_kbn.py
@@ -0,0 +1,354 @@
+#!/usr/bin/env python3
+import asyncio
+import jinja2
+import aiohttp_jinja2
+import json
+import re
+import inverterd
+import phonenumbers
+import __py_include
+
+from io import StringIO
+from aiohttp.web import HTTPFound
+from typing import Optional, Union
+from homekit.config import config, AppConfigUnit
+from homekit.util import homekit_path, filesize_fmt, seconds_to_human_readable_string
+from homekit.modem import E3372, ModemsConfig, MacroNetWorkType
+from homekit.inverter.config import InverterdConfig
+from homekit.relay.sunxi_h3_client import RelayClient
+from homekit import http
+
+
+class WebKbnConfig(AppConfigUnit):
+ NAME = 'web_kbn'
+
+ @classmethod
+ def schema(cls) -> Optional[dict]:
+ return {
+ 'listen_addr': cls._addr_schema(required=True),
+ 'assets_public_path': {'type': 'string'},
+ 'pump_addr': cls._addr_schema(required=True),
+ 'inverter_grafana_url': {'type': 'string'},
+ 'sensors_grafana_url': {'type': 'string'},
+ }
+
+
+STATIC_FILES = [
+ 'bootstrap.min.css',
+ 'bootstrap.min.js',
+ 'polyfills.js',
+ 'app.js',
+ 'app.css'
+]
+
+
+def get_js_link(file, version) -> str:
+ if version:
+ file += f'?version={version}'
+ return f'<script src="{config.app_config["assets_public_path"]}/{file}" type="text/javascript"></script>'
+
+
+def get_css_link(file, version) -> str:
+ if version:
+ file += f'?version={version}'
+ return f'<link rel="stylesheet" type="text/css" href="{config.app_config["assets_public_path"]}/{file}">'
+
+
+def get_head_static() -> str:
+ buf = StringIO()
+ for file in STATIC_FILES:
+ v = 2
+ try:
+ q_ind = file.index('?')
+ v = file[q_ind+1:]
+ file = file[:file.index('?')]
+ except ValueError:
+ pass
+
+ if file.endswith('.js'):
+ buf.write(get_js_link(file, v))
+ else:
+ buf.write(get_css_link(file, v))
+ return buf.getvalue()
+
+
+def get_modem_client(modem_cfg: dict) -> E3372:
+ return E3372(modem_cfg['ip'], legacy_token_auth=modem_cfg['legacy_auth'])
+
+
+def get_modem_data(modem_cfg: dict, get_raw=False) -> Union[dict, tuple]:
+ cl = get_modem_client(modem_cfg)
+
+ signal = cl.device_signal
+ status = cl.monitoring_status
+ traffic = cl.traffic_stats
+
+ if get_raw:
+ device_info = cl.device_information
+ dialup_conn = cl.dialup_connection
+ return signal, status, traffic, device_info, dialup_conn
+ else:
+ network_type_label = re.sub('^MACRO_NET_WORK_TYPE(_EX)?_', '', MacroNetWorkType(int(status['CurrentNetworkType'])).name)
+ return {
+ 'type': network_type_label,
+ 'level': int(status['SignalIcon']) if 'SignalIcon' in status else 0,
+ 'rssi': signal['rssi'],
+ 'sinr': signal['sinr'],
+ 'connected_time': seconds_to_human_readable_string(int(traffic['CurrentConnectTime'])),
+ 'downloaded': filesize_fmt(int(traffic['CurrentDownload'])),
+ 'uploaded': filesize_fmt(int(traffic['CurrentUpload']))
+ }
+
+
+def get_pump_client() -> RelayClient:
+ addr = config.app_config['pump_addr']
+ cl = RelayClient(host=addr.host, port=addr.port)
+ cl.connect()
+ return cl
+
+
+def get_inverter_client() -> inverterd.Client:
+ cl = inverterd.Client(host=InverterdConfig()['remote_addr'].host)
+ cl.connect()
+ cl.format(inverterd.Format.JSON)
+ return cl
+
+
+def get_inverter_data() -> tuple:
+ cl = get_inverter_client()
+
+ status = json.loads(cl.exec('get-status'))['data']
+ rated = json.loads(cl.exec('get-rated'))['data']
+
+ power_direction = status['battery_power_direction'].lower()
+ power_direction = re.sub('ge$', 'ging', power_direction)
+
+ charging_rate = ''
+ if power_direction == 'charging':
+ charging_rate = ' @ %s %s' % (
+ status['battery_charge_current']['value'],
+ status['battery_charge_current']['unit'])
+ elif power_direction == 'discharging':
+ charging_rate = ' @ %s %s' % (
+ status['battery_discharge_current']['value'],
+ status['battery_discharge_current']['unit'])
+
+ html = '<b>Battery:</b> %s %s' % (
+ status['battery_voltage']['value'],
+ status['battery_voltage']['unit'])
+ html += ' (%s%s, ' % (
+ status['battery_capacity']['value'],
+ status['battery_capacity']['unit'])
+ html += '%s%s)' % (power_direction, charging_rate)
+
+ html += "\n"
+ html += '<b>Load:</b> %s %s' % (
+ status['ac_output_active_power']['value'],
+ status['ac_output_active_power']['unit'])
+ html += ' (%s%%)' % (status['output_load_percent']['value'],)
+
+ if status['pv1_input_power']['value'] > 0:
+ html += "\n"
+ html += '<b>Input power:</b> %s %s' % (
+ status['pv1_input_power']['value'],
+ status['pv1_input_power']['unit'])
+
+ if status['grid_voltage']['value'] > 0 or status['grid_freq']['value'] > 0:
+ html += "\n"
+ html += '<b>AC input:</b> %s %s' % (
+ status['grid_voltage']['value'],
+ status['grid_voltage']['unit'])
+ html += ', %s %s' % (
+ status['grid_freq']['value'],
+ status['grid_freq']['unit'])
+
+ html += "\n"
+ html += '<b>Priority:</b> %s' % (rated['output_source_priority'],)
+
+ html = html.replace("\n", '<br>')
+
+ return status, rated, html
+
+
+class WebSite(http.HTTPServer):
+ _modems_config: ModemsConfig
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self._modems_config = ModemsConfig()
+
+ aiohttp_jinja2.setup(
+ self.app,
+ loader=jinja2.FileSystemLoader(homekit_path('web', 'kbn_templates')),
+ autoescape=jinja2.select_autoescape(['html', 'xml']),
+ )
+ env = aiohttp_jinja2.get_env(self.app)
+ env.filters['tojson'] = lambda obj: json.dumps(obj, separators=(',', ':'))
+
+ self.app.router.add_static('/assets/', path=homekit_path('web', 'kbn_assets'))
+
+ self.get('/main.cgi', self.index)
+
+ self.get('/modems.cgi', self.modems)
+ self.get('/modems/info.ajx', self.modems_ajx)
+ self.get('/modems/verbose.cgi', self.modems_verbose)
+
+ self.get('/inverter.cgi', self.inverter)
+ self.get('/inverter.ajx', self.inverter_ajx)
+ self.get('/pump.cgi', self.pump)
+ self.get('/sms.cgi', self.sms)
+ self.post('/sms.cgi', self.sms_post)
+
+ async def render_page(self,
+ req: http.Request,
+ template_name: str,
+ title: Optional[str] = None,
+ context: Optional[dict] = None):
+ if context is None:
+ context = {}
+ context = {
+ **context,
+ 'head_static': get_head_static()
+ }
+ if title is not None:
+ context['title'] = title
+ response = aiohttp_jinja2.render_template(template_name+'.j2', req, context=context)
+ return response
+
+ async def index(self, req: http.Request):
+ ctx = {}
+ for k in 'inverter', 'sensors':
+ ctx[f'{k}_grafana_url'] = config.app_config[f'{k}_grafana_url']
+ return await self.render_page(req, 'index',
+ title="Home web site",
+ context=ctx)
+
+ async def modems(self, req: http.Request):
+ return await self.render_page(req, 'modems',
+ title='Состояние модемов',
+ context=dict(modems=self._modems_config))
+
+ async def modems_ajx(self, req: http.Request):
+ modem = req.query.get('id', None)
+ if modem not in self._modems_config.keys():
+ raise ValueError('invalid modem id')
+
+ modem_cfg = self._modems_config.get(modem)
+ loop = asyncio.get_event_loop()
+ modem_data = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg))
+
+ html = aiohttp_jinja2.render_string('modem_data.j2', req, context=dict(
+ modem_data=modem_data,
+ modem=modem
+ ))
+
+ return self.ok({'html': html})
+
+ async def modems_verbose(self, req: http.Request):
+ modem = req.query.get('id', None)
+ if modem not in self._modems_config.keys():
+ raise ValueError('invalid modem id')
+
+ modem_cfg = self._modems_config.get(modem)
+ loop = asyncio.get_event_loop()
+ signal, status, traffic, device, dialup_conn = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg, True))
+ data = [
+ ['Signal', signal],
+ ['Connection', status],
+ ['Traffic', traffic],
+ ['Device info', device],
+ ['Dialup connection', dialup_conn]
+ ]
+
+ modem_name = self._modems_config.getfullname(modem)
+ return await self.render_page(req, 'modem_verbose',
+ title=f'Подробная информация о модеме "{modem_name}"',
+ context=dict(data=data, modem_name=modem_name))
+
+ async def sms(self, req: http.Request):
+ modem = req.query.get('id', list(self._modems_config.keys())[0])
+ is_outbox = int(req.query.get('outbox', 0)) == 1
+ error = req.query.get('error', None)
+ sent = int(req.query.get('sent', 0)) == 1
+
+ cl = get_modem_client(self._modems_config[modem])
+ messages = cl.sms_list(1, 20, is_outbox)
+ return await self.render_page(req, 'sms',
+ title=f"SMS-сообщения ({'исходящие' if is_outbox else 'входящие'}, {modem})",
+ context=dict(
+ modems=self._modems_config,
+ selected_modem=modem,
+ is_outbox=is_outbox,
+ error=error,
+ is_sent=sent,
+ messages=messages
+ ))
+
+ async def sms_post(self, req: http.Request):
+ modem = req.query.get('id', list(self._modems_config.keys())[0])
+ is_outbox = int(req.query.get('outbox', 0)) == 1
+
+ fd = await req.post()
+ phone = fd.get('phone', None)
+ text = fd.get('text', None)
+
+ return_url = f'/sms.cgi?id={modem}&outbox={int(is_outbox)}'
+ phone = re.sub('\s+', '', phone)
+
+ if len(phone) > 4:
+ country = None
+ if not phone.startswith('+'):
+ country = 'RU'
+ number = phonenumbers.parse(phone, country)
+ if not phonenumbers.is_valid_number(number):
+ raise HTTPFound(f'{return_url}&error=Неверный+номер')
+ phone = phonenumbers.format_number(number, phonenumbers.PhoneNumberFormat.E164)
+
+ cl = get_modem_client(self._modems_config[modem])
+ cl.sms_send(phone, text)
+ raise HTTPFound(return_url)
+
+ async def inverter(self, req: http.Request):
+ action = req.query.get('do', None)
+ if action == 'set-osp':
+ val = req.query.get('value')
+ if val not in ('sub', 'sbu'):
+ raise ValueError('invalid osp value')
+ cl = get_inverter_client()
+ cl.exec('set-output-source-priority',
+ arguments=(val.upper(),))
+ raise HTTPFound('/inverter.cgi')
+
+ status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data)
+ return await self.render_page(req, 'inverter',
+ title='Инвертор',
+ context=dict(status=status, rated=rated, html=html))
+
+ async def inverter_ajx(self, req: http.Request):
+ status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data)
+ return self.ok({'html': html})
+
+ async def pump(self, req: http.Request):
+ # TODO
+ # these are blocking calls
+ # should be rewritten using aio
+
+ cl = get_pump_client()
+
+ action = req.query.get('set', None)
+ if action in ('on', 'off'):
+ getattr(cl, action)()
+ raise HTTPFound('/pump.cgi')
+
+ status = cl.status()
+ return await self.render_page(req, 'pump',
+ title='Насос',
+ context=dict(status=status))
+
+
+if __name__ == '__main__':
+ config.load_app(WebKbnConfig)
+
+ server = WebSite(config.app_config['listen_addr'])
+ server.run()