diff options
Diffstat (limited to 'include/py/homekit/database')
-rw-r--r-- | include/py/homekit/database/__init__.py | 29 | ||||
-rw-r--r-- | include/py/homekit/database/__init__.pyi | 11 | ||||
-rw-r--r-- | include/py/homekit/database/_base.py | 9 | ||||
-rw-r--r-- | include/py/homekit/database/bots.py | 96 | ||||
-rw-r--r-- | include/py/homekit/database/clickhouse.py | 39 | ||||
-rw-r--r-- | include/py/homekit/database/inverter.py | 212 | ||||
-rw-r--r-- | include/py/homekit/database/inverter_time_formats.py | 2 | ||||
-rw-r--r-- | include/py/homekit/database/mysql.py | 47 | ||||
-rw-r--r-- | include/py/homekit/database/sensors.py | 69 | ||||
-rw-r--r-- | include/py/homekit/database/simple_state.py | 48 | ||||
-rw-r--r-- | include/py/homekit/database/sqlite.py | 70 |
11 files changed, 632 insertions, 0 deletions
diff --git a/include/py/homekit/database/__init__.py b/include/py/homekit/database/__init__.py new file mode 100644 index 0000000..b50cbce --- /dev/null +++ b/include/py/homekit/database/__init__.py @@ -0,0 +1,29 @@ +import importlib + +__all__ = [ + 'get_mysql', + 'mysql_now', + 'get_clickhouse', + 'SimpleState', + + 'SensorsDatabase', + 'InverterDatabase', + 'BotsDatabase' +] + + +def __getattr__(name: str): + if name in __all__: + if name.endswith('Database'): + file = name[:-8].lower() + elif 'mysql' in name: + file = 'mysql' + elif 'clickhouse' in name: + file = 'clickhouse' + else: + file = 'simple_state' + + module = importlib.import_module(f'.{file}', __name__) + return getattr(module, name) + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/include/py/homekit/database/__init__.pyi b/include/py/homekit/database/__init__.pyi new file mode 100644 index 0000000..31aae5d --- /dev/null +++ b/include/py/homekit/database/__init__.pyi @@ -0,0 +1,11 @@ +from .mysql import ( + get_mysql as get_mysql, + mysql_now as mysql_now +) +from .clickhouse import get_clickhouse as get_clickhouse + +from simple_state import SimpleState as SimpleState + +from .sensors import SensorsDatabase as SensorsDatabase +from .inverter import InverterDatabase as InverterDatabase +from .bots import BotsDatabase as BotsDatabase diff --git a/include/py/homekit/database/_base.py b/include/py/homekit/database/_base.py new file mode 100644 index 0000000..dcec9da --- /dev/null +++ b/include/py/homekit/database/_base.py @@ -0,0 +1,9 @@ +import os + + +def get_data_root_directory() -> str: + return os.path.join( + os.environ['HOME'], + '.config', + 'homekit', + 'data')
\ No newline at end of file diff --git a/include/py/homekit/database/bots.py b/include/py/homekit/database/bots.py new file mode 100644 index 0000000..fb5f326 --- /dev/null +++ b/include/py/homekit/database/bots.py @@ -0,0 +1,96 @@ +import pytz + +from .mysql import mysql_now, MySQLDatabase, datetime_fmt +from ..api.types import ( + SoundSensorLocation +) +from typing import Optional, List, Tuple +from datetime import datetime +from html import escape + + +class OpenwrtLogRecord: + id: int + log_time: datetime + received_time: datetime + text: str + + def __init__(self, id, text, log_time, received_time): + self.id = id + self.text = text + self.log_time = log_time + self.received_time = received_time + + def __repr__(self): + return f"<b>{self.log_time.strftime('%H:%M:%S')}</b> {escape(self.text)}" + + +class BotsDatabase(MySQLDatabase): + def add_openwrt_logs(self, + lines: List[Tuple[datetime, str]], + access_point: int): + now = datetime.now() + with self.cursor() as cursor: + for line in lines: + time, text = line + cursor.execute("INSERT INTO openwrt (log_time, received_time, text, ap) VALUES (%s, %s, %s, %s)", + (time.strftime(datetime_fmt), now.strftime(datetime_fmt), text, access_point)) + self.commit() + + def add_sound_hits(self, + hits: List[Tuple[SoundSensorLocation, int]], + time: datetime): + with self.cursor() as cursor: + for loc, count in hits: + cursor.execute("INSERT INTO sound_hits (location, `time`, hits) VALUES (%s, %s, %s)", + (loc.name.lower(), time.strftime(datetime_fmt), count)) + self.commit() + + def get_sound_hits(self, + location: SoundSensorLocation, + after: Optional[datetime] = None, + last: Optional[int] = None) -> List[dict]: + with self.cursor(dictionary=True) as cursor: + sql = "SELECT `time`, hits FROM sound_hits WHERE location=%s" + args = [location.name.lower()] + + if after: + sql += ' AND `time` >= %s ORDER BY time DESC' + args.append(after) + elif last: + sql += ' ORDER BY time DESC LIMIT 0, %s' + args.append(last) + else: + raise ValueError('no `after`, no `last`, what do you expect?') + + cursor.execute(sql, tuple(args)) + data = [] + for row in cursor.fetchall(): + data.append({ + 'time': row['time'], + 'hits': row['hits'] + }) + return data + + def get_openwrt_logs(self, + filter_text: str, + min_id: int, + access_point: int, + limit: int = None) -> List[OpenwrtLogRecord]: + tz = pytz.timezone('Europe/Moscow') + with self.cursor(dictionary=True) as cursor: + sql = "SELECT * FROM openwrt WHERE ap=%s AND text LIKE %s AND id > %s" + if limit is not None: + sql += f" LIMIT {limit}" + + cursor.execute(sql, (access_point, f'%{filter_text}%', min_id)) + data = [] + for row in cursor.fetchall(): + data.append(OpenwrtLogRecord( + id=int(row['id']), + text=row['text'], + log_time=row['log_time'].astimezone(tz), + received_time=row['received_time'].astimezone(tz) + )) + + return data diff --git a/include/py/homekit/database/clickhouse.py b/include/py/homekit/database/clickhouse.py new file mode 100644 index 0000000..d0ec283 --- /dev/null +++ b/include/py/homekit/database/clickhouse.py @@ -0,0 +1,39 @@ +import logging + +from zoneinfo import ZoneInfo +from datetime import datetime +from clickhouse_driver import Client as ClickhouseClient +from ..config import is_development_mode + +_links = {} + + +def get_clickhouse(db: str) -> ClickhouseClient: + if db not in _links: + _links[db] = ClickhouseClient.from_url(f'clickhouse://localhost/{db}') + + return _links[db] + + +class ClickhouseDatabase: + def __init__(self, db: str): + self.db = get_clickhouse(db) + + self.server_timezone = self.db.execute('SELECT timezone()')[0][0] + self.logger = logging.getLogger(self.__class__.__name__) + + def query(self, *args, **kwargs): + settings = {'use_client_time_zone': True} + kwargs['settings'] = settings + + if 'no_tz_fix' not in kwargs and len(args) > 1 and isinstance(args[1], dict): + for k, v in args[1].items(): + if isinstance(v, datetime): + args[1][k] = v.astimezone(tz=ZoneInfo(self.server_timezone)) + + result = self.db.execute(*args, **kwargs) + + if is_development_mode(): + self.logger.debug(args[0] if len(args) == 1 else args[0] % args[1]) + + return result diff --git a/include/py/homekit/database/inverter.py b/include/py/homekit/database/inverter.py new file mode 100644 index 0000000..fc3f74f --- /dev/null +++ b/include/py/homekit/database/inverter.py @@ -0,0 +1,212 @@ +from time import time +from datetime import datetime, timedelta +from typing import Optional +from collections import namedtuple + +from .clickhouse import ClickhouseDatabase + + +IntervalList = list[list[Optional[datetime]]] + + +class InverterDatabase(ClickhouseDatabase): + def __init__(self): + super().__init__('solarmon') + + def add_generation(self, home_id: int, client_time: int, watts: int) -> None: + self.db.execute( + 'INSERT INTO generation (ClientTime, ReceivedTime, HomeID, Watts) VALUES', + [[client_time, round(time()), home_id, watts]] + ) + + def add_status(self, home_id: int, + client_time: int, + grid_voltage: int, + grid_freq: int, + ac_output_voltage: int, + ac_output_freq: int, + ac_output_apparent_power: int, + ac_output_active_power: int, + output_load_percent: int, + battery_voltage: int, + battery_voltage_scc: int, + battery_voltage_scc2: int, + battery_discharge_current: int, + battery_charge_current: int, + battery_capacity: int, + inverter_heat_sink_temp: int, + mppt1_charger_temp: int, + mppt2_charger_temp: int, + pv1_input_power: int, + pv2_input_power: int, + pv1_input_voltage: int, + pv2_input_voltage: int, + mppt1_charger_status: int, + mppt2_charger_status: int, + battery_power_direction: int, + dc_ac_power_direction: int, + line_power_direction: int, + load_connected: int) -> None: + self.db.execute("""INSERT INTO status ( + ClientTime, + ReceivedTime, + HomeID, + GridVoltage, + GridFrequency, + ACOutputVoltage, + ACOutputFrequency, + ACOutputApparentPower, + ACOutputActivePower, + OutputLoadPercent, + BatteryVoltage, + BatteryVoltageSCC, + BatteryVoltageSCC2, + BatteryDischargingCurrent, + BatteryChargingCurrent, + BatteryCapacity, + HeatSinkTemp, + MPPT1ChargerTemp, + MPPT2ChargerTemp, + PV1InputPower, + PV2InputPower, + PV1InputVoltage, + PV2InputVoltage, + MPPT1ChargerStatus, + MPPT2ChargerStatus, + BatteryPowerDirection, + DCACPowerDirection, + LinePowerDirection, + LoadConnected) VALUES""", [[ + client_time, + round(time()), + home_id, + grid_voltage, + grid_freq, + ac_output_voltage, + ac_output_freq, + ac_output_apparent_power, + ac_output_active_power, + output_load_percent, + battery_voltage, + battery_voltage_scc, + battery_voltage_scc2, + battery_discharge_current, + battery_charge_current, + battery_capacity, + inverter_heat_sink_temp, + mppt1_charger_temp, + mppt2_charger_temp, + pv1_input_power, + pv2_input_power, + pv1_input_voltage, + pv2_input_voltage, + mppt1_charger_status, + mppt2_charger_status, + battery_power_direction, + dc_ac_power_direction, + line_power_direction, + load_connected + ]]) + + def get_consumed_energy(self, dt_from: datetime, dt_to: datetime) -> float: + rows = self.query('SELECT ClientTime, ACOutputActivePower FROM status' + ' WHERE ClientTime >= %(from)s AND ClientTime <= %(to)s' + ' ORDER BY ClientTime', {'from': dt_from, 'to': dt_to}) + prev_time = None + prev_wh = 0 + + ws = 0 # watt-seconds + for t, wh in rows: + if prev_time is not None: + n = (t - prev_time).total_seconds() + ws += prev_wh * n + + prev_time = t + prev_wh = wh + + return ws / 3600 # convert to watt-hours + + def get_intervals_by_condition(self, + dt_from: datetime, + dt_to: datetime, + cond_start: str, + cond_end: str) -> IntervalList: + rows = None + ranges = [[None, None]] + + while rows is None or len(rows) > 0: + if ranges[len(ranges)-1][0] is None: + condition = cond_start + range_idx = 0 + else: + condition = cond_end + range_idx = 1 + + rows = self.query('SELECT ClientTime FROM status ' + f'WHERE ClientTime > %(from)s AND ClientTime <= %(to)s AND {condition}' + ' ORDER BY ClientTime LIMIT 1', + {'from': dt_from, 'to': dt_to}) + if not rows: + break + + row = rows[0] + + ranges[len(ranges) - 1][range_idx] = row[0] + if range_idx == 1: + ranges.append([None, None]) + + dt_from = row[0] + + if ranges[len(ranges)-1][0] is None: + ranges.pop() + elif ranges[len(ranges)-1][1] is None: + ranges[len(ranges)-1][1] = dt_to - timedelta(seconds=1) + + return ranges + + def get_grid_connected_intervals(self, dt_from: datetime, dt_to: datetime) -> IntervalList: + return self.get_intervals_by_condition(dt_from, dt_to, 'GridFrequency > 0', 'GridFrequency = 0') + + def get_grid_used_intervals(self, dt_from: datetime, dt_to: datetime) -> IntervalList: + return self.get_intervals_by_condition(dt_from, + dt_to, + "LinePowerDirection = 'Input'", + "LinePowerDirection != 'Input'") + + def get_grid_consumed_energy(self, dt_from: datetime, dt_to: datetime) -> float: + PrevData = namedtuple('PrevData', 'time, pd, bat_chg, bat_dis, wh') + + ws = 0 # watt-seconds + amps = 0 # amper-seconds + + intervals = self.get_grid_used_intervals(dt_from, dt_to) + for dt_start, dt_end in intervals: + fields = ', '.join([ + 'ClientTime', + 'DCACPowerDirection', + 'BatteryChargingCurrent', + 'BatteryDischargingCurrent', + 'ACOutputActivePower' + ]) + rows = self.query(f'SELECT {fields} FROM status' + ' WHERE ClientTime >= %(from)s AND ClientTime < %(to)s ORDER BY ClientTime', + {'from': dt_start, 'to': dt_end}) + + prev = PrevData(time=None, pd=None, bat_chg=None, bat_dis=None, wh=None) + for ct, pd, bat_chg, bat_dis, wh in rows: + if prev.time is not None: + n = (ct-prev.time).total_seconds() + ws += prev.wh * n + + if pd == 'DC/AC': + amps -= prev.bat_dis * n + elif pd == 'AC/DC': + amps += prev.bat_chg * n + + prev = PrevData(time=ct, pd=pd, bat_chg=bat_chg, bat_dis=bat_dis, wh=wh) + + amps /= 3600 + wh = ws / 3600 + wh += amps*48 + + return wh diff --git a/include/py/homekit/database/inverter_time_formats.py b/include/py/homekit/database/inverter_time_formats.py new file mode 100644 index 0000000..7c37d30 --- /dev/null +++ b/include/py/homekit/database/inverter_time_formats.py @@ -0,0 +1,2 @@ +FormatTime = '%Y-%m-%d %H:%M:%S' +FormatDate = '%Y-%m-%d' diff --git a/include/py/homekit/database/mysql.py b/include/py/homekit/database/mysql.py new file mode 100644 index 0000000..fe97cd4 --- /dev/null +++ b/include/py/homekit/database/mysql.py @@ -0,0 +1,47 @@ +import time +import logging + +from mysql.connector import connect, MySQLConnection, Error +from typing import Optional +from ..config import config + +link: Optional[MySQLConnection] = None +logger = logging.getLogger(__name__) + +datetime_fmt = '%Y-%m-%d %H:%M:%S' + + +def get_mysql() -> MySQLConnection: + global link + + if link is not None: + return link + + link = connect( + host=config['mysql']['host'], + user=config['mysql']['user'], + password=config['mysql']['password'], + database=config['mysql']['database'], + ) + link.time_zone = '+01:00' + return link + + +def mysql_now() -> str: + return time.strftime('%Y-%m-%d %H:%M:%S') + + +class MySQLDatabase: + def __init__(self): + self.db = get_mysql() + + def cursor(self, **kwargs): + try: + self.db.ping(reconnect=True, attempts=2) + except Error as e: + logger.exception(e) + self.db = get_mysql() + return self.db.cursor(**kwargs) + + def commit(self): + self.db.commit() diff --git a/include/py/homekit/database/sensors.py b/include/py/homekit/database/sensors.py new file mode 100644 index 0000000..8155108 --- /dev/null +++ b/include/py/homekit/database/sensors.py @@ -0,0 +1,69 @@ +from time import time +from datetime import datetime +from typing import Tuple, List +from .clickhouse import ClickhouseDatabase +from ..api.types import TemperatureSensorLocation + + +def get_temperature_table(sensor: TemperatureSensorLocation) -> str: + if sensor == TemperatureSensorLocation.DIANA: + return 'temp_diana' + + elif sensor == TemperatureSensorLocation.STREET: + return 'temp_street' + + elif sensor == TemperatureSensorLocation.BIG_HOUSE_1: + return 'temp' + + elif sensor == TemperatureSensorLocation.BIG_HOUSE_2: + return 'temp_roof' + + elif sensor == TemperatureSensorLocation.BIG_HOUSE_ROOM: + return 'temp_room' + + elif sensor == TemperatureSensorLocation.SPB1: + return 'temp_spb1' + + +class SensorsDatabase(ClickhouseDatabase): + def __init__(self): + super().__init__('home') + + def add_temperature(self, + home_id: int, + client_time: int, + sensor: TemperatureSensorLocation, + temp: int, + rh: int): + table = get_temperature_table(sensor) + sql = """INSERT INTO """ + table + """ ( + ClientTime, + ReceivedTime, + HomeID, + Temperature, + RelativeHumidity + ) VALUES""" + self.db.execute(sql, [[ + client_time, + int(time()), + home_id, + temp, + rh + ]]) + + def get_temperature_recordings(self, + sensor: TemperatureSensorLocation, + time_range: Tuple[datetime, datetime], + home_id=1) -> List[tuple]: + table = get_temperature_table(sensor) + sql = f"""SELECT ClientTime, Temperature, RelativeHumidity + FROM {table} + WHERE ClientTime >= %(from)s AND ClientTime <= %(to)s + ORDER BY ClientTime""" + dt_from, dt_to = time_range + + data = self.query(sql, { + 'from': dt_from, + 'to': dt_to + }) + return [(date, temp/100, humidity/100) for date, temp, humidity in data] diff --git a/include/py/homekit/database/simple_state.py b/include/py/homekit/database/simple_state.py new file mode 100644 index 0000000..2b8ebe7 --- /dev/null +++ b/include/py/homekit/database/simple_state.py @@ -0,0 +1,48 @@ +import os +import json +import atexit + +from ._base import get_data_root_directory + + +class SimpleState: + def __init__(self, + name: str, + default: dict = None): + if default is None: + default = {} + elif type(default) is not dict: + raise TypeError('default must be dictionary') + + path = os.path.join(get_data_root_directory(), name) + if not os.path.exists(path): + self._data = default + else: + with open(path, 'r') as f: + self._data = json.loads(f.read()) + + self._file = path + atexit.register(self.__cleanup) + + def __cleanup(self): + if hasattr(self, '_file'): + with open(self._file, 'w') as f: + f.write(json.dumps(self._data)) + atexit.unregister(self.__cleanup) + + def __del__(self): + if 'open' in __builtins__: + self.__cleanup() + + def __getitem__(self, key): + return self._data[key] + + def __setitem__(self, key, value): + self._data[key] = value + + def __contains__(self, key): + return key in self._data + + def __delitem__(self, key): + if key in self._data: + del self._data[key] diff --git a/include/py/homekit/database/sqlite.py b/include/py/homekit/database/sqlite.py new file mode 100644 index 0000000..1651a93 --- /dev/null +++ b/include/py/homekit/database/sqlite.py @@ -0,0 +1,70 @@ +import sqlite3 +import os.path +import logging + +from ._base import get_data_root_directory +from ..config import config, is_development_mode + + +def _get_database_path(name: str) -> str: + return os.path.join( + get_data_root_directory(), + f'{name}.db') + + +class SQLiteBase: + SCHEMA = 1 + + def __init__(self, name=None, path=None, check_same_thread=False): + if not path: + if not name: + name = config.app_name + database_path = _get_database_path(name) + else: + database_path = path + if not os.path.exists(os.path.dirname(database_path)): + os.makedirs(os.path.dirname(database_path)) + + self.logger = logging.getLogger(self.__class__.__name__) + self.sqlite = sqlite3.connect(database_path, check_same_thread=check_same_thread) + + if is_development_mode(): + self.sql_logger = logging.getLogger(self.__class__.__name__) + self.sql_logger.setLevel('TRACE') + self.sqlite.set_trace_callback(self.sql_logger.trace) + + sqlite_version = self._get_sqlite_version() + self.logger.debug(f'SQLite version: {sqlite_version}') + + schema_version = self.schema_get_version() + self.logger.debug(f'Schema version: {schema_version}') + + self.schema_init(schema_version) + self.schema_set_version(self.SCHEMA) + + def __del__(self): + if self.sqlite: + self.sqlite.commit() + self.sqlite.close() + + def _get_sqlite_version(self) -> str: + cursor = self.sqlite.cursor() + cursor.execute("SELECT sqlite_version()") + return cursor.fetchone()[0] + + def schema_get_version(self) -> int: + cursor = self.sqlite.execute('PRAGMA user_version') + return int(cursor.fetchone()[0]) + + def schema_set_version(self, v) -> None: + self.sqlite.execute('PRAGMA user_version={:d}'.format(v)) + self.logger.info(f'Schema set to {v}') + + def cursor(self) -> sqlite3.Cursor: + return self.sqlite.cursor() + + def commit(self) -> None: + return self.sqlite.commit() + + def schema_init(self, version: int) -> None: + raise ValueError(f'{self.__class__.__name__}: must override schema_init') |