diff options
-rw-r--r-- | Makefile | 3 | ||||
-rw-r--r-- | requirements.txt | 1 | ||||
-rw-r--r-- | src/home/api/web_api_client.py | 6 | ||||
-rw-r--r-- | src/home/config/config.py | 2 | ||||
-rw-r--r-- | src/home/database/clickhouse.py | 29 | ||||
-rw-r--r-- | src/home/database/inverter.py | 31 | ||||
-rw-r--r-- | src/home/database/sensors.py | 8 | ||||
-rw-r--r-- | src/home/http/__init__.py | 2 | ||||
-rw-r--r-- | src/home/web_api/__init__.py | 1 | ||||
-rw-r--r-- | src/home/web_api/web_api.py | 213 | ||||
-rwxr-xr-x | src/web_api.py | 199 | ||||
-rwxr-xr-x | src/web_api_uwsgi.py | 8 |
12 files changed, 236 insertions, 267 deletions
@@ -7,6 +7,7 @@ else USER_PREFIX = $(HOME)/.local endif +# TODO drop or rewrite PROGRAMS = admin_bot inverter_bot pump_bot sensors_bot PROGRAMS += inverter_mqtt_receiver inverter_mqtt_sender PROGRAMS += sensors_mqtt_receiver sensors_mqtt_sender @@ -29,7 +30,7 @@ venv: . ./venv/bin/activate && pip3 install -r requirements.txt web-api-dev: - . ./venv/bin/activate && FLASK_ENV=development python3 src/web_api.py + . ./venv/bin/activate && HK_MODE=dev python3 src/web_api.py install: check-root for name in @(PROGRAMS); do ln -s src/${name}.py $(USER_PREFIX)/bin/$name; done diff --git a/requirements.txt b/requirements.txt index 1a58e8c..dd5d185 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,6 @@ paho-mqtt~=1.5.1 inverterd~=1.0.3 clickhouse-driver~=0.2.0 toml~=0.10.2 -Flask~=2.0.2 mysql-connector-python~=8.0.27 Werkzeug~=2.0.2 uwsgi~=2.0.20 diff --git a/src/home/api/web_api_client.py b/src/home/api/web_api_client.py index d6c9dc7..299bb6e 100644 --- a/src/home/api/web_api_client.py +++ b/src/home/api/web_api_client.py @@ -59,7 +59,7 @@ class WebAPIClient: bot: BotType, user_id: int, message: str): - return self._post('logs/bot-request/', { + return self._post('log/bot-request/', { 'bot': bot.value, 'user_id': str(user_id), 'message': message @@ -67,7 +67,7 @@ class WebAPIClient: def log_openwrt(self, lines: List[Tuple[int, str]]): - return self._post('logs/openwrt', { + return self._post('log/openwrt/', { 'logs': stringify(lines) }) @@ -159,7 +159,7 @@ class WebAPIClient: kwargs['files'] = fd try: - r = f(f'https://{domain}/api/{name}', + r = f(f'https://{domain}/{name}', headers={'X-Token': self.token}, timeout=self.timeout, **kwargs) diff --git a/src/home/config/config.py b/src/home/config/config.py index 9882bfa..7d18f99 100644 --- a/src/home/config/config.py +++ b/src/home/config/config.py @@ -113,7 +113,7 @@ config = ConfigStore() def is_development_mode() -> bool: - if 'FLASK_ENV' in os.environ and os.environ['FLASK_ENV'] == 'development': + if 'HK_MODE' in os.environ and os.environ['HK_MODE'] == 'dev': return True return ('logging' in config) and ('verbose' in config['logging']) and (config['logging']['verbose'] is True) diff --git a/src/home/database/clickhouse.py b/src/home/database/clickhouse.py index 4a2a247..ca81628 100644 --- a/src/home/database/clickhouse.py +++ b/src/home/database/clickhouse.py @@ -1,4 +1,9 @@ +import logging + +from zoneinfo import ZoneInfo +from datetime import datetime, timedelta from clickhouse_driver import Client as ClickhouseClient +from ..config import is_development_mode _links = {} @@ -8,3 +13,27 @@ def get_clickhouse(db: str) -> ClickhouseClient: _links[db] = ClickhouseClient.from_url(f'clickhouse://localhost/{db}') return _links[db] + + +class ClickhouseDatabase: + def __init__(self, db: str): + self.db = get_clickhouse(db) + + self.server_timezone = self.db.execute('SELECT timezone()')[0][0] + self.logger = logging.getLogger(self.__class__.__name__) + + def query(self, *args, **kwargs): + settings = {'use_client_time_zone': True} + kwargs['settings'] = settings + + if 'no_tz_fix' not in kwargs and len(args) > 1 and isinstance(args[1], dict): + for k, v in args[1].items(): + if isinstance(v, datetime): + args[1][k] = v.astimezone(tz=ZoneInfo(self.server_timezone)) + + result = self.db.execute(*args, **kwargs) + + if is_development_mode(): + self.logger.debug(args[0] if len(args) == 1 else args[0] % args[1]) + + return result diff --git a/src/home/database/inverter.py b/src/home/database/inverter.py index 756186c..1e967c4 100644 --- a/src/home/database/inverter.py +++ b/src/home/database/inverter.py @@ -1,40 +1,17 @@ -import logging - -from zoneinfo import ZoneInfo from time import time -from datetime import datetime, timedelta +from datetime import datetime from typing import Optional from collections import namedtuple -from ..config import is_development_mode -from .clickhouse import get_clickhouse +from .clickhouse import ClickhouseDatabase IntervalList = list[list[Optional[datetime]]] -class InverterDatabase: +class InverterDatabase(ClickhouseDatabase): def __init__(self): - self.db = get_clickhouse('solarmon') - self.server_timezone = self.query('SELECT timezone()')[0][0] - - self.logger = logging.getLogger(self.__class__.__name__) - - def query(self, *args, **kwargs): - settings = {'use_client_time_zone': True} - kwargs['settings'] = settings - - if 'no_tz_fix' not in kwargs and len(args) > 1 and isinstance(args[1], dict): - for k, v in args[1].items(): - if isinstance(v, datetime): - args[1][k] = v.astimezone(tz=ZoneInfo(self.server_timezone)) - - result = self.db.execute(*args, **kwargs) - - if is_development_mode(): - self.logger.debug(args[0] if len(args) == 1 else args[0] % args[1]) - - return result + super().__init__('solarmon') def add_generation(self, home_id: int, client_time: int, watts: int) -> None: self.db.execute( diff --git a/src/home/database/sensors.py b/src/home/database/sensors.py index 4cfaa08..8155108 100644 --- a/src/home/database/sensors.py +++ b/src/home/database/sensors.py @@ -1,7 +1,7 @@ from time import time from datetime import datetime from typing import Tuple, List -from .clickhouse import get_clickhouse +from .clickhouse import ClickhouseDatabase from ..api.types import TemperatureSensorLocation @@ -25,9 +25,9 @@ def get_temperature_table(sensor: TemperatureSensorLocation) -> str: return 'temp_spb1' -class SensorsDatabase: +class SensorsDatabase(ClickhouseDatabase): def __init__(self): - self.db = get_clickhouse('home') + super().__init__('home') def add_temperature(self, home_id: int, @@ -62,7 +62,7 @@ class SensorsDatabase: ORDER BY ClientTime""" dt_from, dt_to = time_range - data = self.db.execute(sql, { + data = self.query(sql, { 'from': dt_from, 'to': dt_to }) diff --git a/src/home/http/__init__.py b/src/home/http/__init__.py index 963e13c..6030e95 100644 --- a/src/home/http/__init__.py +++ b/src/home/http/__init__.py @@ -1,2 +1,2 @@ from .http import serve, ok, routes, HTTPServer -from aiohttp.web import FileResponse, StreamResponse, Request
\ No newline at end of file +from aiohttp.web import FileResponse, StreamResponse, Request, Response diff --git a/src/home/web_api/__init__.py b/src/home/web_api/__init__.py deleted file mode 100644 index 20655da..0000000 --- a/src/home/web_api/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .web_api import get_app
\ No newline at end of file diff --git a/src/home/web_api/web_api.py b/src/home/web_api/web_api.py deleted file mode 100644 index 6b8c54e..0000000 --- a/src/home/web_api/web_api.py +++ /dev/null @@ -1,213 +0,0 @@ -import logging -import json -import os.path - -from datetime import datetime, timedelta -from typing import Optional - -from werkzeug.exceptions import HTTPException -from flask import Flask, request, Response - -from ..config import config, is_development_mode -from ..database import BotsDatabase, SensorsDatabase -from ..util import stringify, format_tb -from ..api.types import BotType, TemperatureSensorLocation, SoundSensorLocation -from ..media import SoundRecordStorage - -db: Optional[BotsDatabase] = None -sensors_db: Optional[SensorsDatabase] = None -app = Flask(__name__) -logger = logging.getLogger(__name__) - - -class AuthError(Exception): - def __init__(self, message: str): - super().__init__() - self.message = message - - -# api methods -# ----------- - -@app.route("/") -def hello(): - message = "nothing here, keep lurking" - if is_development_mode(): - message += ' (dev mode)' - return message - - -@app.route('/api/sensors/data/', methods=['GET']) -def sensors_data(): - hours = request.args.get('hours', type=int, default=1) - sensor = TemperatureSensorLocation(request.args.get('sensor', type=int)) - - if hours < 1 or hours > 24: - raise ValueError('invalid hours value') - - dt_to = datetime.now() - dt_from = dt_to - timedelta(hours=hours) - - data = sensors_db.get_temperature_recordings(sensor, (dt_from, dt_to)) - return ok(data) - - -@app.route('/api/sound_sensors/hits/', methods=['GET']) -def get_sound_sensors_hits(): - location = SoundSensorLocation(request.args.get('location', type=int)) - - after = request.args.get('after', type=int) - kwargs = {} - if after is None: - last = request.args.get('last', type=int) - if last is None: - raise ValueError('you must pass `after` or `last` params') - else: - if not 0 < last < 100: - raise ValueError('invalid last value: must be between 0 and 100') - kwargs['last'] = last - else: - kwargs['after'] = datetime.fromtimestamp(after) - - data = db.get_sound_hits(location, **kwargs) - return ok(data) - - -@app.route('/api/sound_sensors/hits/', methods=['POST']) -def post_sound_sensors_hits(): - hits = [] - for hit, count in json.loads(request.form.get('hits', type=str)): - if not hasattr(SoundSensorLocation, hit.upper()): - raise ValueError('invalid sensor location') - if count < 1: - raise ValueError(f'invalid count: {count}') - hits.append((SoundSensorLocation[hit.upper()], count)) - - db.add_sound_hits(hits, datetime.now()) - return ok() - - -@app.route('/api/logs/bot-request/', methods=['POST']) -def log_bot_request(): - user_id = request.form.get('user_id', type=int, default=0) - message = request.form.get('message', type=str, default='') - bot = BotType(request.form.get('bot', type=int)) - - # validate message - if message.strip() == '': - raise ValueError('message can\'t be empty') - - # add record to the database - db.add_request(bot, user_id, message) - - return ok() - - -@app.route('/api/logs/openwrt/', methods=['POST']) -def log_openwrt(): - logs = request.form.get('logs', type=str, default='') - - # validate it - logs = json.loads(logs) - assert type(logs) is list, "invalid json data (list expected)" - - lines = [] - for line in logs: - assert type(line) is list, "invalid line type (list expected)" - assert len(line) == 2, f"expected 2 items in line, got {len(line)}" - assert type(line[0]) is int, "invalid line[0] type (int expected)" - assert type(line[1]) is str, "invalid line[1] type (str expected)" - - lines.append(( - datetime.fromtimestamp(line[0]), - line[1] - )) - - db.add_openwrt_logs(lines) - return ok() - - -@app.route('/api/recordings/list/', methods=['GET']) -def recordings_list(): - extended = request.args.get('extended', type=bool, default=False) - node = request.args.get('node', type=str) - - root = os.path.join(config['recordings']['directory'], node) - if not os.path.isdir(root): - raise ValueError(f'invalid node {node}: no such directory') - - storage = SoundRecordStorage(root) - files = storage.getfiles(as_objects=extended) - if extended: - files = list(map(lambda file: file.__dict__(), files)) - - return ok(files) - - -# internal functions -# ------------------ - -def ok(data=None) -> Response: - response = {'result': 'ok'} - if data is not None: - response['data'] = data - return Response(stringify(response), - mimetype='application/json') - - -def err(e) -> Response: - error = { - 'type': e.__class__.__name__, - 'message': e.message if hasattr(e, 'message') else str(e) - } - if is_development_mode(): - tb = format_tb(e) - if tb: - error['stacktrace'] = tb - data = { - 'result': 'error', - 'error': error - } - return Response(stringify(data), mimetype='application/json') - - -def get_token() -> Optional[str]: - name = 'X-Token' - if name in request.headers: - return request.headers[name] - - token = request.args.get('token', default='', type=str) - if token != '': - return token - - return None - - -@app.errorhandler(Exception) -def handle_exception(e): - if isinstance(e, HTTPException): - return e - return err(e), 500 - - -@app.before_request -def validate_token() -> None: - if request.path.startswith('/api/') and not is_development_mode(): - token = get_token() - if not token: - raise AuthError(f'token is missing') - - if token != config['api']['token']: - raise AuthError('invalid token') - - -def get_app(): - global db, sensors_db - - config.load('web_api') - app.config.from_mapping(**config['flask']) - - db = BotsDatabase() - sensors_db = SensorsDatabase() - - return app diff --git a/src/web_api.py b/src/web_api.py index beaab57..2a3dfcd 100755 --- a/src/web_api.py +++ b/src/web_api.py @@ -1,13 +1,198 @@ #!/usr/bin/env python3 -from home.web_api import get_app -from typing import Optional -from flask import Flask +import asyncio +import json +import os -app: Optional[Flask] = None +from datetime import datetime, timedelta +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from aiohttp import web +from home import http +from home.util import parse_addr +from home.config import config, is_development_mode +from home.database import BotsDatabase, SensorsDatabase, InverterDatabase +from home.api.types import BotType, TemperatureSensorLocation, SoundSensorLocation +from home.media import SoundRecordStorage -if __name__ in ('__main__', 'app'): - app = get_app() + +class AuthError(Exception): + def __init__(self, message: str): + super().__init__() + self.message = message + + +class WebAPIServer(http.HTTPServer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.app.middlewares.append(self.validate_auth) + + self.get('/', self.get_index) + self.get('/sensors/data/', self.GET_sensors_data) + self.get('/sound-sensors/hits/', self.GET_sound_sensors_hits) + self.post('/sound-sensors/hits/', self.POST_sound_sensors_hits) + + self.post('/log/bot-request/', self.POST_bot_request_log) + self.post('/log/openwrt/', self.POST_openwrt_log) + + self.get('/recordings/list/', self.GET_recordings_list) + + @staticmethod + @web.middleware + async def validate_auth(req: http.Request, handler): + def get_token() -> str: + name = 'X-Token' + if name in req.headers: + return req.headers[name] + + return req.query['token'] + + try: + token = get_token() + except KeyError: + raise AuthError('no token') + + if token != config['api']['token']: + raise AuthError('invalid token') + + return await handler(req) + + @staticmethod + async def get_index(req: http.Request): + message = "nothing here, keep lurking" + if is_development_mode(): + message += ' (dev mode)' + return http.Response(text=message, content_type='text/plain') + + async def GET_sensors_data(self, req: http.Request): + try: + hours = int(req.query['hours']) + if hours < 1 or hours > 24: + raise ValueError('invalid hours value') + except KeyError: + hours = 1 + + sensor = TemperatureSensorLocation(int(req.query['sensor'])) + + dt_to = datetime.now() + dt_from = dt_to - timedelta(hours=hours) + + db = SensorsDatabase() + data = db.get_temperature_recordings(sensor, (dt_from, dt_to)) + return self.ok(data) + + async def GET_sound_sensors_hits(self, req: http.Request): + location = SoundSensorLocation(int(req.query['location'])) + + after = int(req.query['after']) + kwargs = {} + if after is None: + last = int(req.query['last']) + if last is None: + raise ValueError('you must pass `after` or `last` params') + else: + if not 0 < last < 100: + raise ValueError('invalid last value: must be between 0 and 100') + kwargs['last'] = last + else: + kwargs['after'] = datetime.fromtimestamp(after) + + data = BotsDatabase().get_sound_hits(location, **kwargs) + return self.ok(data) + + async def POST_sound_sensors_hits(self, req: http.Request): + hits = [] + data = await req.post() + for hit, count in json.loads(data['hits']): + if not hasattr(SoundSensorLocation, hit.upper()): + raise ValueError('invalid sensor location') + if count < 1: + raise ValueError(f'invalid count: {count}') + hits.append((SoundSensorLocation[hit.upper()], count)) + + BotsDatabase().add_sound_hits(hits, datetime.now()) + return self.ok() + + async def POST_bot_request_log(self, req: http.Request): + data = await req.post() + + try: + user_id = int(data['user_id']) + except KeyError: + user_id = 0 + + try: + message = data['message'] + except KeyError: + message = '' + + bot = BotType(int(data['bot'])) + + # validate message + if message.strip() == '': + raise ValueError('message can\'t be empty') + + # add record to the database + BotsDatabase().add_request(bot, user_id, message) + + return self.ok() + + async def POST_openwrt_log(self, req: http.Request): + data = await req.post() + + try: + logs = data['logs'] + except KeyError: + logs = '' + + # validate it + logs = json.loads(logs) + assert type(logs) is list, "invalid json data (list expected)" + + lines = [] + for line in logs: + assert type(line) is list, "invalid line type (list expected)" + assert len(line) == 2, f"expected 2 items in line, got {len(line)}" + assert type(line[0]) is int, "invalid line[0] type (int expected)" + assert type(line[1]) is str, "invalid line[1] type (str expected)" + + lines.append(( + datetime.fromtimestamp(line[0]), + line[1] + )) + + BotsDatabase().add_openwrt_logs(lines) + return self.ok() + + async def GET_recordings_list(self, req: http.Request): + data = await req.post() + + try: + extended = bool(int(data['extended'])) + except KeyError: + extended = False + + node = data['node'] + + root = os.path.join(config['recordings']['directory'], node) + if not os.path.isdir(root): + raise ValueError(f'invalid node {node}: no such directory') + + storage = SoundRecordStorage(root) + files = storage.getfiles(as_objects=extended) + if extended: + files = list(map(lambda file: file.__dict__(), files)) + + return self.ok(files) + + +# start of the program +# -------------------- if __name__ == '__main__': - app.run(host='0.0.0.0') + config.load('web_api') + + loop = asyncio.get_event_loop() + + server = WebAPIServer(parse_addr(config['server']['listen'])) + server.run() diff --git a/src/web_api_uwsgi.py b/src/web_api_uwsgi.py deleted file mode 100755 index e46f518..0000000 --- a/src/web_api_uwsgi.py +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env python3 -from home.web_api import get_app - -app = get_app() - - -if __name__ == '__main__': - app.run() |