#!/usr/bin/env python3
import asyncio
import jinja2
import aiohttp_jinja2
import json
import re
import inverterd
import phonenumbers
import __py_include
from io import StringIO
from aiohttp.web import HTTPFound
from typing import Optional, Union
from homekit.config import config, AppConfigUnit
from homekit.util import homekit_path, filesize_fmt, seconds_to_human_readable_string
from homekit.modem import E3372, ModemsConfig, MacroNetWorkType
from homekit.inverter.config import InverterdConfig
from homekit.relay.sunxi_h3_client import RelayClient
from homekit import http
class WebKbnConfig(AppConfigUnit):
NAME = 'web_kbn'
@classmethod
def schema(cls) -> Optional[dict]:
return {
'listen_addr': cls._addr_schema(required=True),
'assets_public_path': {'type': 'string'},
'pump_addr': cls._addr_schema(required=True),
'inverter_grafana_url': {'type': 'string'},
'sensors_grafana_url': {'type': 'string'},
}
STATIC_FILES = [
'bootstrap.min.css',
'bootstrap.min.js',
'polyfills.js',
'app.js',
'app.css'
]
def get_js_link(file, version) -> str:
if version:
file += f'?version={version}'
return f''
def get_css_link(file, version) -> str:
if version:
file += f'?version={version}'
return f''
def get_head_static() -> str:
buf = StringIO()
for file in STATIC_FILES:
v = 2
try:
q_ind = file.index('?')
v = file[q_ind+1:]
file = file[:file.index('?')]
except ValueError:
pass
if file.endswith('.js'):
buf.write(get_js_link(file, v))
else:
buf.write(get_css_link(file, v))
return buf.getvalue()
def get_modem_client(modem_cfg: dict) -> E3372:
return E3372(modem_cfg['ip'], legacy_token_auth=modem_cfg['legacy_auth'])
def get_modem_data(modem_cfg: dict, get_raw=False) -> Union[dict, tuple]:
cl = get_modem_client(modem_cfg)
signal = cl.device_signal
status = cl.monitoring_status
traffic = cl.traffic_stats
if get_raw:
device_info = cl.device_information
dialup_conn = cl.dialup_connection
return signal, status, traffic, device_info, dialup_conn
else:
network_type_label = re.sub('^MACRO_NET_WORK_TYPE(_EX)?_', '', MacroNetWorkType(int(status['CurrentNetworkType'])).name)
return {
'type': network_type_label,
'level': int(status['SignalIcon']) if 'SignalIcon' in status else 0,
'rssi': signal['rssi'],
'sinr': signal['sinr'],
'connected_time': seconds_to_human_readable_string(int(traffic['CurrentConnectTime'])),
'downloaded': filesize_fmt(int(traffic['CurrentDownload'])),
'uploaded': filesize_fmt(int(traffic['CurrentUpload']))
}
def get_pump_client() -> RelayClient:
addr = config.app_config['pump_addr']
cl = RelayClient(host=addr.host, port=addr.port)
cl.connect()
return cl
def get_inverter_client() -> inverterd.Client:
cl = inverterd.Client(host=InverterdConfig()['remote_addr'].host)
cl.connect()
cl.format(inverterd.Format.JSON)
return cl
def get_inverter_data() -> tuple:
cl = get_inverter_client()
status = json.loads(cl.exec('get-status'))['data']
rated = json.loads(cl.exec('get-rated'))['data']
power_direction = status['battery_power_direction'].lower()
power_direction = re.sub('ge$', 'ging', power_direction)
charging_rate = ''
if power_direction == 'charging':
charging_rate = ' @ %s %s' % (
status['battery_charge_current']['value'],
status['battery_charge_current']['unit'])
elif power_direction == 'discharging':
charging_rate = ' @ %s %s' % (
status['battery_discharge_current']['value'],
status['battery_discharge_current']['unit'])
html = 'Battery: %s %s' % (
status['battery_voltage']['value'],
status['battery_voltage']['unit'])
html += ' (%s%s, ' % (
status['battery_capacity']['value'],
status['battery_capacity']['unit'])
html += '%s%s)' % (power_direction, charging_rate)
html += "\n"
html += 'Load: %s %s' % (
status['ac_output_active_power']['value'],
status['ac_output_active_power']['unit'])
html += ' (%s%%)' % (status['output_load_percent']['value'],)
if status['pv1_input_power']['value'] > 0:
html += "\n"
html += 'Input power: %s %s' % (
status['pv1_input_power']['value'],
status['pv1_input_power']['unit'])
if status['grid_voltage']['value'] > 0 or status['grid_freq']['value'] > 0:
html += "\n"
html += 'AC input: %s %s' % (
status['grid_voltage']['value'],
status['grid_voltage']['unit'])
html += ', %s %s' % (
status['grid_freq']['value'],
status['grid_freq']['unit'])
html += "\n"
html += 'Priority: %s' % (rated['output_source_priority'],)
html = html.replace("\n", '
')
return status, rated, html
class WebSite(http.HTTPServer):
_modems_config: ModemsConfig
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._modems_config = ModemsConfig()
aiohttp_jinja2.setup(
self.app,
loader=jinja2.FileSystemLoader(homekit_path('web', 'kbn_templates')),
autoescape=jinja2.select_autoescape(['html', 'xml']),
)
env = aiohttp_jinja2.get_env(self.app)
env.filters['tojson'] = lambda obj: json.dumps(obj, separators=(',', ':'))
self.app.router.add_static('/assets/', path=homekit_path('web', 'kbn_assets'))
self.get('/main.cgi', self.index)
self.get('/modems.cgi', self.modems)
self.get('/modems/info.ajx', self.modems_ajx)
self.get('/modems/verbose.cgi', self.modems_verbose)
self.get('/inverter.cgi', self.inverter)
self.get('/inverter.ajx', self.inverter_ajx)
self.get('/pump.cgi', self.pump)
self.get('/sms.cgi', self.sms)
self.post('/sms.cgi', self.sms_post)
async def render_page(self,
req: http.Request,
template_name: str,
title: Optional[str] = None,
context: Optional[dict] = None):
if context is None:
context = {}
context = {
**context,
'head_static': get_head_static()
}
if title is not None:
context['title'] = title
response = aiohttp_jinja2.render_template(template_name+'.j2', req, context=context)
return response
async def index(self, req: http.Request):
ctx = {}
for k in 'inverter', 'sensors':
ctx[f'{k}_grafana_url'] = config.app_config[f'{k}_grafana_url']
return await self.render_page(req, 'index',
title="Home web site",
context=ctx)
async def modems(self, req: http.Request):
return await self.render_page(req, 'modems',
title='Состояние модемов',
context=dict(modems=self._modems_config))
async def modems_ajx(self, req: http.Request):
modem = req.query.get('id', None)
if modem not in self._modems_config.keys():
raise ValueError('invalid modem id')
modem_cfg = self._modems_config.get(modem)
loop = asyncio.get_event_loop()
modem_data = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg))
html = aiohttp_jinja2.render_string('modem_data.j2', req, context=dict(
modem_data=modem_data,
modem=modem
))
return self.ok({'html': html})
async def modems_verbose(self, req: http.Request):
modem = req.query.get('id', None)
if modem not in self._modems_config.keys():
raise ValueError('invalid modem id')
modem_cfg = self._modems_config.get(modem)
loop = asyncio.get_event_loop()
signal, status, traffic, device, dialup_conn = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg, True))
data = [
['Signal', signal],
['Connection', status],
['Traffic', traffic],
['Device info', device],
['Dialup connection', dialup_conn]
]
modem_name = self._modems_config.getfullname(modem)
return await self.render_page(req, 'modem_verbose',
title=f'Подробная информация о модеме "{modem_name}"',
context=dict(data=data, modem_name=modem_name))
async def sms(self, req: http.Request):
modem = req.query.get('id', list(self._modems_config.keys())[0])
is_outbox = int(req.query.get('outbox', 0)) == 1
error = req.query.get('error', None)
sent = int(req.query.get('sent', 0)) == 1
cl = get_modem_client(self._modems_config[modem])
messages = cl.sms_list(1, 20, is_outbox)
return await self.render_page(req, 'sms',
title=f"SMS-сообщения ({'исходящие' if is_outbox else 'входящие'}, {modem})",
context=dict(
modems=self._modems_config,
selected_modem=modem,
is_outbox=is_outbox,
error=error,
is_sent=sent,
messages=messages
))
async def sms_post(self, req: http.Request):
modem = req.query.get('id', list(self._modems_config.keys())[0])
is_outbox = int(req.query.get('outbox', 0)) == 1
fd = await req.post()
phone = fd.get('phone', None)
text = fd.get('text', None)
return_url = f'/sms.cgi?id={modem}&outbox={int(is_outbox)}'
phone = re.sub('\s+', '', phone)
if len(phone) > 4:
country = None
if not phone.startswith('+'):
country = 'RU'
number = phonenumbers.parse(phone, country)
if not phonenumbers.is_valid_number(number):
raise HTTPFound(f'{return_url}&error=Неверный+номер')
phone = phonenumbers.format_number(number, phonenumbers.PhoneNumberFormat.E164)
cl = get_modem_client(self._modems_config[modem])
cl.sms_send(phone, text)
raise HTTPFound(return_url)
async def inverter(self, req: http.Request):
action = req.query.get('do', None)
if action == 'set-osp':
val = req.query.get('value')
if val not in ('sub', 'sbu'):
raise ValueError('invalid osp value')
cl = get_inverter_client()
cl.exec('set-output-source-priority',
arguments=(val.upper(),))
raise HTTPFound('/inverter.cgi')
status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data)
return await self.render_page(req, 'inverter',
title='Инвертор',
context=dict(status=status, rated=rated, html=html))
async def inverter_ajx(self, req: http.Request):
status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data)
return self.ok({'html': html})
async def pump(self, req: http.Request):
# TODO
# these are blocking calls
# should be rewritten using aio
cl = get_pump_client()
action = req.query.get('set', None)
if action in ('on', 'off'):
getattr(cl, action)()
raise HTTPFound('/pump.cgi')
status = cl.status()
return await self.render_page(req, 'pump',
title='Насос',
context=dict(status=status))
if __name__ == '__main__':
config.load_app(WebKbnConfig)
server = WebSite(config.app_config['listen_addr'])
server.run()