summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2022-10-25 02:41:46 +0300
committerEvgeny Zinoviev <me@ch1p.io>2022-10-30 01:37:35 +0300
commitdb784dc98ba0c4e15ee7b501d909425c79c825fb (patch)
tree68270179f77c95dc49ec4ab62f9305aa651c4cd4
parent0fce6c52516aba239acc81fd528dcb5051c04f68 (diff)
web_api: rewrite to aiohttp, drop flask
-rw-r--r--Makefile3
-rw-r--r--requirements.txt1
-rw-r--r--src/home/api/web_api_client.py6
-rw-r--r--src/home/config/config.py2
-rw-r--r--src/home/database/clickhouse.py29
-rw-r--r--src/home/database/inverter.py31
-rw-r--r--src/home/database/sensors.py8
-rw-r--r--src/home/http/__init__.py2
-rw-r--r--src/home/web_api/__init__.py1
-rw-r--r--src/home/web_api/web_api.py213
-rwxr-xr-xsrc/web_api.py199
-rwxr-xr-xsrc/web_api_uwsgi.py8
12 files changed, 236 insertions, 267 deletions
diff --git a/Makefile b/Makefile
index 7045001..0c8d26f 100644
--- a/Makefile
+++ b/Makefile
@@ -7,6 +7,7 @@ else
USER_PREFIX = $(HOME)/.local
endif
+# TODO drop or rewrite
PROGRAMS = admin_bot inverter_bot pump_bot sensors_bot
PROGRAMS += inverter_mqtt_receiver inverter_mqtt_sender
PROGRAMS += sensors_mqtt_receiver sensors_mqtt_sender
@@ -29,7 +30,7 @@ venv:
. ./venv/bin/activate && pip3 install -r requirements.txt
web-api-dev:
- . ./venv/bin/activate && FLASK_ENV=development python3 src/web_api.py
+ . ./venv/bin/activate && HK_MODE=dev python3 src/web_api.py
install: check-root
for name in @(PROGRAMS); do ln -s src/${name}.py $(USER_PREFIX)/bin/$name; done
diff --git a/requirements.txt b/requirements.txt
index 1a58e8c..dd5d185 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,7 +2,6 @@ paho-mqtt~=1.5.1
inverterd~=1.0.3
clickhouse-driver~=0.2.0
toml~=0.10.2
-Flask~=2.0.2
mysql-connector-python~=8.0.27
Werkzeug~=2.0.2
uwsgi~=2.0.20
diff --git a/src/home/api/web_api_client.py b/src/home/api/web_api_client.py
index d6c9dc7..299bb6e 100644
--- a/src/home/api/web_api_client.py
+++ b/src/home/api/web_api_client.py
@@ -59,7 +59,7 @@ class WebAPIClient:
bot: BotType,
user_id: int,
message: str):
- return self._post('logs/bot-request/', {
+ return self._post('log/bot-request/', {
'bot': bot.value,
'user_id': str(user_id),
'message': message
@@ -67,7 +67,7 @@ class WebAPIClient:
def log_openwrt(self,
lines: List[Tuple[int, str]]):
- return self._post('logs/openwrt', {
+ return self._post('log/openwrt/', {
'logs': stringify(lines)
})
@@ -159,7 +159,7 @@ class WebAPIClient:
kwargs['files'] = fd
try:
- r = f(f'https://{domain}/api/{name}',
+ r = f(f'https://{domain}/{name}',
headers={'X-Token': self.token},
timeout=self.timeout,
**kwargs)
diff --git a/src/home/config/config.py b/src/home/config/config.py
index 9882bfa..7d18f99 100644
--- a/src/home/config/config.py
+++ b/src/home/config/config.py
@@ -113,7 +113,7 @@ config = ConfigStore()
def is_development_mode() -> bool:
- if 'FLASK_ENV' in os.environ and os.environ['FLASK_ENV'] == 'development':
+ if 'HK_MODE' in os.environ and os.environ['HK_MODE'] == 'dev':
return True
return ('logging' in config) and ('verbose' in config['logging']) and (config['logging']['verbose'] is True)
diff --git a/src/home/database/clickhouse.py b/src/home/database/clickhouse.py
index 4a2a247..ca81628 100644
--- a/src/home/database/clickhouse.py
+++ b/src/home/database/clickhouse.py
@@ -1,4 +1,9 @@
+import logging
+
+from zoneinfo import ZoneInfo
+from datetime import datetime, timedelta
from clickhouse_driver import Client as ClickhouseClient
+from ..config import is_development_mode
_links = {}
@@ -8,3 +13,27 @@ def get_clickhouse(db: str) -> ClickhouseClient:
_links[db] = ClickhouseClient.from_url(f'clickhouse://localhost/{db}')
return _links[db]
+
+
+class ClickhouseDatabase:
+ def __init__(self, db: str):
+ self.db = get_clickhouse(db)
+
+ self.server_timezone = self.db.execute('SELECT timezone()')[0][0]
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def query(self, *args, **kwargs):
+ settings = {'use_client_time_zone': True}
+ kwargs['settings'] = settings
+
+ if 'no_tz_fix' not in kwargs and len(args) > 1 and isinstance(args[1], dict):
+ for k, v in args[1].items():
+ if isinstance(v, datetime):
+ args[1][k] = v.astimezone(tz=ZoneInfo(self.server_timezone))
+
+ result = self.db.execute(*args, **kwargs)
+
+ if is_development_mode():
+ self.logger.debug(args[0] if len(args) == 1 else args[0] % args[1])
+
+ return result
diff --git a/src/home/database/inverter.py b/src/home/database/inverter.py
index 756186c..1e967c4 100644
--- a/src/home/database/inverter.py
+++ b/src/home/database/inverter.py
@@ -1,40 +1,17 @@
-import logging
-
-from zoneinfo import ZoneInfo
from time import time
-from datetime import datetime, timedelta
+from datetime import datetime
from typing import Optional
from collections import namedtuple
-from ..config import is_development_mode
-from .clickhouse import get_clickhouse
+from .clickhouse import ClickhouseDatabase
IntervalList = list[list[Optional[datetime]]]
-class InverterDatabase:
+class InverterDatabase(ClickhouseDatabase):
def __init__(self):
- self.db = get_clickhouse('solarmon')
- self.server_timezone = self.query('SELECT timezone()')[0][0]
-
- self.logger = logging.getLogger(self.__class__.__name__)
-
- def query(self, *args, **kwargs):
- settings = {'use_client_time_zone': True}
- kwargs['settings'] = settings
-
- if 'no_tz_fix' not in kwargs and len(args) > 1 and isinstance(args[1], dict):
- for k, v in args[1].items():
- if isinstance(v, datetime):
- args[1][k] = v.astimezone(tz=ZoneInfo(self.server_timezone))
-
- result = self.db.execute(*args, **kwargs)
-
- if is_development_mode():
- self.logger.debug(args[0] if len(args) == 1 else args[0] % args[1])
-
- return result
+ super().__init__('solarmon')
def add_generation(self, home_id: int, client_time: int, watts: int) -> None:
self.db.execute(
diff --git a/src/home/database/sensors.py b/src/home/database/sensors.py
index 4cfaa08..8155108 100644
--- a/src/home/database/sensors.py
+++ b/src/home/database/sensors.py
@@ -1,7 +1,7 @@
from time import time
from datetime import datetime
from typing import Tuple, List
-from .clickhouse import get_clickhouse
+from .clickhouse import ClickhouseDatabase
from ..api.types import TemperatureSensorLocation
@@ -25,9 +25,9 @@ def get_temperature_table(sensor: TemperatureSensorLocation) -> str:
return 'temp_spb1'
-class SensorsDatabase:
+class SensorsDatabase(ClickhouseDatabase):
def __init__(self):
- self.db = get_clickhouse('home')
+ super().__init__('home')
def add_temperature(self,
home_id: int,
@@ -62,7 +62,7 @@ class SensorsDatabase:
ORDER BY ClientTime"""
dt_from, dt_to = time_range
- data = self.db.execute(sql, {
+ data = self.query(sql, {
'from': dt_from,
'to': dt_to
})
diff --git a/src/home/http/__init__.py b/src/home/http/__init__.py
index 963e13c..6030e95 100644
--- a/src/home/http/__init__.py
+++ b/src/home/http/__init__.py
@@ -1,2 +1,2 @@
from .http import serve, ok, routes, HTTPServer
-from aiohttp.web import FileResponse, StreamResponse, Request \ No newline at end of file
+from aiohttp.web import FileResponse, StreamResponse, Request, Response
diff --git a/src/home/web_api/__init__.py b/src/home/web_api/__init__.py
deleted file mode 100644
index 20655da..0000000
--- a/src/home/web_api/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-from .web_api import get_app \ No newline at end of file
diff --git a/src/home/web_api/web_api.py b/src/home/web_api/web_api.py
deleted file mode 100644
index 6b8c54e..0000000
--- a/src/home/web_api/web_api.py
+++ /dev/null
@@ -1,213 +0,0 @@
-import logging
-import json
-import os.path
-
-from datetime import datetime, timedelta
-from typing import Optional
-
-from werkzeug.exceptions import HTTPException
-from flask import Flask, request, Response
-
-from ..config import config, is_development_mode
-from ..database import BotsDatabase, SensorsDatabase
-from ..util import stringify, format_tb
-from ..api.types import BotType, TemperatureSensorLocation, SoundSensorLocation
-from ..media import SoundRecordStorage
-
-db: Optional[BotsDatabase] = None
-sensors_db: Optional[SensorsDatabase] = None
-app = Flask(__name__)
-logger = logging.getLogger(__name__)
-
-
-class AuthError(Exception):
- def __init__(self, message: str):
- super().__init__()
- self.message = message
-
-
-# api methods
-# -----------
-
-@app.route("/")
-def hello():
- message = "nothing here, keep lurking"
- if is_development_mode():
- message += ' (dev mode)'
- return message
-
-
-@app.route('/api/sensors/data/', methods=['GET'])
-def sensors_data():
- hours = request.args.get('hours', type=int, default=1)
- sensor = TemperatureSensorLocation(request.args.get('sensor', type=int))
-
- if hours < 1 or hours > 24:
- raise ValueError('invalid hours value')
-
- dt_to = datetime.now()
- dt_from = dt_to - timedelta(hours=hours)
-
- data = sensors_db.get_temperature_recordings(sensor, (dt_from, dt_to))
- return ok(data)
-
-
-@app.route('/api/sound_sensors/hits/', methods=['GET'])
-def get_sound_sensors_hits():
- location = SoundSensorLocation(request.args.get('location', type=int))
-
- after = request.args.get('after', type=int)
- kwargs = {}
- if after is None:
- last = request.args.get('last', type=int)
- if last is None:
- raise ValueError('you must pass `after` or `last` params')
- else:
- if not 0 < last < 100:
- raise ValueError('invalid last value: must be between 0 and 100')
- kwargs['last'] = last
- else:
- kwargs['after'] = datetime.fromtimestamp(after)
-
- data = db.get_sound_hits(location, **kwargs)
- return ok(data)
-
-
-@app.route('/api/sound_sensors/hits/', methods=['POST'])
-def post_sound_sensors_hits():
- hits = []
- for hit, count in json.loads(request.form.get('hits', type=str)):
- if not hasattr(SoundSensorLocation, hit.upper()):
- raise ValueError('invalid sensor location')
- if count < 1:
- raise ValueError(f'invalid count: {count}')
- hits.append((SoundSensorLocation[hit.upper()], count))
-
- db.add_sound_hits(hits, datetime.now())
- return ok()
-
-
-@app.route('/api/logs/bot-request/', methods=['POST'])
-def log_bot_request():
- user_id = request.form.get('user_id', type=int, default=0)
- message = request.form.get('message', type=str, default='')
- bot = BotType(request.form.get('bot', type=int))
-
- # validate message
- if message.strip() == '':
- raise ValueError('message can\'t be empty')
-
- # add record to the database
- db.add_request(bot, user_id, message)
-
- return ok()
-
-
-@app.route('/api/logs/openwrt/', methods=['POST'])
-def log_openwrt():
- logs = request.form.get('logs', type=str, default='')
-
- # validate it
- logs = json.loads(logs)
- assert type(logs) is list, "invalid json data (list expected)"
-
- lines = []
- for line in logs:
- assert type(line) is list, "invalid line type (list expected)"
- assert len(line) == 2, f"expected 2 items in line, got {len(line)}"
- assert type(line[0]) is int, "invalid line[0] type (int expected)"
- assert type(line[1]) is str, "invalid line[1] type (str expected)"
-
- lines.append((
- datetime.fromtimestamp(line[0]),
- line[1]
- ))
-
- db.add_openwrt_logs(lines)
- return ok()
-
-
-@app.route('/api/recordings/list/', methods=['GET'])
-def recordings_list():
- extended = request.args.get('extended', type=bool, default=False)
- node = request.args.get('node', type=str)
-
- root = os.path.join(config['recordings']['directory'], node)
- if not os.path.isdir(root):
- raise ValueError(f'invalid node {node}: no such directory')
-
- storage = SoundRecordStorage(root)
- files = storage.getfiles(as_objects=extended)
- if extended:
- files = list(map(lambda file: file.__dict__(), files))
-
- return ok(files)
-
-
-# internal functions
-# ------------------
-
-def ok(data=None) -> Response:
- response = {'result': 'ok'}
- if data is not None:
- response['data'] = data
- return Response(stringify(response),
- mimetype='application/json')
-
-
-def err(e) -> Response:
- error = {
- 'type': e.__class__.__name__,
- 'message': e.message if hasattr(e, 'message') else str(e)
- }
- if is_development_mode():
- tb = format_tb(e)
- if tb:
- error['stacktrace'] = tb
- data = {
- 'result': 'error',
- 'error': error
- }
- return Response(stringify(data), mimetype='application/json')
-
-
-def get_token() -> Optional[str]:
- name = 'X-Token'
- if name in request.headers:
- return request.headers[name]
-
- token = request.args.get('token', default='', type=str)
- if token != '':
- return token
-
- return None
-
-
-@app.errorhandler(Exception)
-def handle_exception(e):
- if isinstance(e, HTTPException):
- return e
- return err(e), 500
-
-
-@app.before_request
-def validate_token() -> None:
- if request.path.startswith('/api/') and not is_development_mode():
- token = get_token()
- if not token:
- raise AuthError(f'token is missing')
-
- if token != config['api']['token']:
- raise AuthError('invalid token')
-
-
-def get_app():
- global db, sensors_db
-
- config.load('web_api')
- app.config.from_mapping(**config['flask'])
-
- db = BotsDatabase()
- sensors_db = SensorsDatabase()
-
- return app
diff --git a/src/web_api.py b/src/web_api.py
index beaab57..2a3dfcd 100755
--- a/src/web_api.py
+++ b/src/web_api.py
@@ -1,13 +1,198 @@
#!/usr/bin/env python3
-from home.web_api import get_app
-from typing import Optional
-from flask import Flask
+import asyncio
+import json
+import os
-app: Optional[Flask] = None
+from datetime import datetime, timedelta
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from aiohttp import web
+from home import http
+from home.util import parse_addr
+from home.config import config, is_development_mode
+from home.database import BotsDatabase, SensorsDatabase, InverterDatabase
+from home.api.types import BotType, TemperatureSensorLocation, SoundSensorLocation
+from home.media import SoundRecordStorage
-if __name__ in ('__main__', 'app'):
- app = get_app()
+
+class AuthError(Exception):
+ def __init__(self, message: str):
+ super().__init__()
+ self.message = message
+
+
+class WebAPIServer(http.HTTPServer):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self.app.middlewares.append(self.validate_auth)
+
+ self.get('/', self.get_index)
+ self.get('/sensors/data/', self.GET_sensors_data)
+ self.get('/sound-sensors/hits/', self.GET_sound_sensors_hits)
+ self.post('/sound-sensors/hits/', self.POST_sound_sensors_hits)
+
+ self.post('/log/bot-request/', self.POST_bot_request_log)
+ self.post('/log/openwrt/', self.POST_openwrt_log)
+
+ self.get('/recordings/list/', self.GET_recordings_list)
+
+ @staticmethod
+ @web.middleware
+ async def validate_auth(req: http.Request, handler):
+ def get_token() -> str:
+ name = 'X-Token'
+ if name in req.headers:
+ return req.headers[name]
+
+ return req.query['token']
+
+ try:
+ token = get_token()
+ except KeyError:
+ raise AuthError('no token')
+
+ if token != config['api']['token']:
+ raise AuthError('invalid token')
+
+ return await handler(req)
+
+ @staticmethod
+ async def get_index(req: http.Request):
+ message = "nothing here, keep lurking"
+ if is_development_mode():
+ message += ' (dev mode)'
+ return http.Response(text=message, content_type='text/plain')
+
+ async def GET_sensors_data(self, req: http.Request):
+ try:
+ hours = int(req.query['hours'])
+ if hours < 1 or hours > 24:
+ raise ValueError('invalid hours value')
+ except KeyError:
+ hours = 1
+
+ sensor = TemperatureSensorLocation(int(req.query['sensor']))
+
+ dt_to = datetime.now()
+ dt_from = dt_to - timedelta(hours=hours)
+
+ db = SensorsDatabase()
+ data = db.get_temperature_recordings(sensor, (dt_from, dt_to))
+ return self.ok(data)
+
+ async def GET_sound_sensors_hits(self, req: http.Request):
+ location = SoundSensorLocation(int(req.query['location']))
+
+ after = int(req.query['after'])
+ kwargs = {}
+ if after is None:
+ last = int(req.query['last'])
+ if last is None:
+ raise ValueError('you must pass `after` or `last` params')
+ else:
+ if not 0 < last < 100:
+ raise ValueError('invalid last value: must be between 0 and 100')
+ kwargs['last'] = last
+ else:
+ kwargs['after'] = datetime.fromtimestamp(after)
+
+ data = BotsDatabase().get_sound_hits(location, **kwargs)
+ return self.ok(data)
+
+ async def POST_sound_sensors_hits(self, req: http.Request):
+ hits = []
+ data = await req.post()
+ for hit, count in json.loads(data['hits']):
+ if not hasattr(SoundSensorLocation, hit.upper()):
+ raise ValueError('invalid sensor location')
+ if count < 1:
+ raise ValueError(f'invalid count: {count}')
+ hits.append((SoundSensorLocation[hit.upper()], count))
+
+ BotsDatabase().add_sound_hits(hits, datetime.now())
+ return self.ok()
+
+ async def POST_bot_request_log(self, req: http.Request):
+ data = await req.post()
+
+ try:
+ user_id = int(data['user_id'])
+ except KeyError:
+ user_id = 0
+
+ try:
+ message = data['message']
+ except KeyError:
+ message = ''
+
+ bot = BotType(int(data['bot']))
+
+ # validate message
+ if message.strip() == '':
+ raise ValueError('message can\'t be empty')
+
+ # add record to the database
+ BotsDatabase().add_request(bot, user_id, message)
+
+ return self.ok()
+
+ async def POST_openwrt_log(self, req: http.Request):
+ data = await req.post()
+
+ try:
+ logs = data['logs']
+ except KeyError:
+ logs = ''
+
+ # validate it
+ logs = json.loads(logs)
+ assert type(logs) is list, "invalid json data (list expected)"
+
+ lines = []
+ for line in logs:
+ assert type(line) is list, "invalid line type (list expected)"
+ assert len(line) == 2, f"expected 2 items in line, got {len(line)}"
+ assert type(line[0]) is int, "invalid line[0] type (int expected)"
+ assert type(line[1]) is str, "invalid line[1] type (str expected)"
+
+ lines.append((
+ datetime.fromtimestamp(line[0]),
+ line[1]
+ ))
+
+ BotsDatabase().add_openwrt_logs(lines)
+ return self.ok()
+
+ async def GET_recordings_list(self, req: http.Request):
+ data = await req.post()
+
+ try:
+ extended = bool(int(data['extended']))
+ except KeyError:
+ extended = False
+
+ node = data['node']
+
+ root = os.path.join(config['recordings']['directory'], node)
+ if not os.path.isdir(root):
+ raise ValueError(f'invalid node {node}: no such directory')
+
+ storage = SoundRecordStorage(root)
+ files = storage.getfiles(as_objects=extended)
+ if extended:
+ files = list(map(lambda file: file.__dict__(), files))
+
+ return self.ok(files)
+
+
+# start of the program
+# --------------------
if __name__ == '__main__':
- app.run(host='0.0.0.0')
+ config.load('web_api')
+
+ loop = asyncio.get_event_loop()
+
+ server = WebAPIServer(parse_addr(config['server']['listen']))
+ server.run()
diff --git a/src/web_api_uwsgi.py b/src/web_api_uwsgi.py
deleted file mode 100755
index e46f518..0000000
--- a/src/web_api_uwsgi.py
+++ /dev/null
@@ -1,8 +0,0 @@
-#!/usr/bin/env python3
-from home.web_api import get_app
-
-app = get_app()
-
-
-if __name__ == '__main__':
- app.run()