summaryrefslogtreecommitdiff
path: root/include/py/homekit/database
diff options
context:
space:
mode:
Diffstat (limited to 'include/py/homekit/database')
-rw-r--r--include/py/homekit/database/__init__.py29
-rw-r--r--include/py/homekit/database/__init__.pyi11
-rw-r--r--include/py/homekit/database/_base.py9
-rw-r--r--include/py/homekit/database/bots.py96
-rw-r--r--include/py/homekit/database/clickhouse.py39
-rw-r--r--include/py/homekit/database/inverter.py212
-rw-r--r--include/py/homekit/database/inverter_time_formats.py2
-rw-r--r--include/py/homekit/database/mysql.py47
-rw-r--r--include/py/homekit/database/sensors.py69
-rw-r--r--include/py/homekit/database/simple_state.py48
-rw-r--r--include/py/homekit/database/sqlite.py70
11 files changed, 632 insertions, 0 deletions
diff --git a/include/py/homekit/database/__init__.py b/include/py/homekit/database/__init__.py
new file mode 100644
index 0000000..b50cbce
--- /dev/null
+++ b/include/py/homekit/database/__init__.py
@@ -0,0 +1,29 @@
+import importlib
+
+__all__ = [
+ 'get_mysql',
+ 'mysql_now',
+ 'get_clickhouse',
+ 'SimpleState',
+
+ 'SensorsDatabase',
+ 'InverterDatabase',
+ 'BotsDatabase'
+]
+
+
+def __getattr__(name: str):
+ if name in __all__:
+ if name.endswith('Database'):
+ file = name[:-8].lower()
+ elif 'mysql' in name:
+ file = 'mysql'
+ elif 'clickhouse' in name:
+ file = 'clickhouse'
+ else:
+ file = 'simple_state'
+
+ module = importlib.import_module(f'.{file}', __name__)
+ return getattr(module, name)
+
+ raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/include/py/homekit/database/__init__.pyi b/include/py/homekit/database/__init__.pyi
new file mode 100644
index 0000000..31aae5d
--- /dev/null
+++ b/include/py/homekit/database/__init__.pyi
@@ -0,0 +1,11 @@
+from .mysql import (
+ get_mysql as get_mysql,
+ mysql_now as mysql_now
+)
+from .clickhouse import get_clickhouse as get_clickhouse
+
+from simple_state import SimpleState as SimpleState
+
+from .sensors import SensorsDatabase as SensorsDatabase
+from .inverter import InverterDatabase as InverterDatabase
+from .bots import BotsDatabase as BotsDatabase
diff --git a/include/py/homekit/database/_base.py b/include/py/homekit/database/_base.py
new file mode 100644
index 0000000..dcec9da
--- /dev/null
+++ b/include/py/homekit/database/_base.py
@@ -0,0 +1,9 @@
+import os
+
+
+def get_data_root_directory() -> str:
+ return os.path.join(
+ os.environ['HOME'],
+ '.config',
+ 'homekit',
+ 'data') \ No newline at end of file
diff --git a/include/py/homekit/database/bots.py b/include/py/homekit/database/bots.py
new file mode 100644
index 0000000..fb5f326
--- /dev/null
+++ b/include/py/homekit/database/bots.py
@@ -0,0 +1,96 @@
+import pytz
+
+from .mysql import mysql_now, MySQLDatabase, datetime_fmt
+from ..api.types import (
+ SoundSensorLocation
+)
+from typing import Optional, List, Tuple
+from datetime import datetime
+from html import escape
+
+
+class OpenwrtLogRecord:
+ id: int
+ log_time: datetime
+ received_time: datetime
+ text: str
+
+ def __init__(self, id, text, log_time, received_time):
+ self.id = id
+ self.text = text
+ self.log_time = log_time
+ self.received_time = received_time
+
+ def __repr__(self):
+ return f"<b>{self.log_time.strftime('%H:%M:%S')}</b> {escape(self.text)}"
+
+
+class BotsDatabase(MySQLDatabase):
+ def add_openwrt_logs(self,
+ lines: List[Tuple[datetime, str]],
+ access_point: int):
+ now = datetime.now()
+ with self.cursor() as cursor:
+ for line in lines:
+ time, text = line
+ cursor.execute("INSERT INTO openwrt (log_time, received_time, text, ap) VALUES (%s, %s, %s, %s)",
+ (time.strftime(datetime_fmt), now.strftime(datetime_fmt), text, access_point))
+ self.commit()
+
+ def add_sound_hits(self,
+ hits: List[Tuple[SoundSensorLocation, int]],
+ time: datetime):
+ with self.cursor() as cursor:
+ for loc, count in hits:
+ cursor.execute("INSERT INTO sound_hits (location, `time`, hits) VALUES (%s, %s, %s)",
+ (loc.name.lower(), time.strftime(datetime_fmt), count))
+ self.commit()
+
+ def get_sound_hits(self,
+ location: SoundSensorLocation,
+ after: Optional[datetime] = None,
+ last: Optional[int] = None) -> List[dict]:
+ with self.cursor(dictionary=True) as cursor:
+ sql = "SELECT `time`, hits FROM sound_hits WHERE location=%s"
+ args = [location.name.lower()]
+
+ if after:
+ sql += ' AND `time` >= %s ORDER BY time DESC'
+ args.append(after)
+ elif last:
+ sql += ' ORDER BY time DESC LIMIT 0, %s'
+ args.append(last)
+ else:
+ raise ValueError('no `after`, no `last`, what do you expect?')
+
+ cursor.execute(sql, tuple(args))
+ data = []
+ for row in cursor.fetchall():
+ data.append({
+ 'time': row['time'],
+ 'hits': row['hits']
+ })
+ return data
+
+ def get_openwrt_logs(self,
+ filter_text: str,
+ min_id: int,
+ access_point: int,
+ limit: int = None) -> List[OpenwrtLogRecord]:
+ tz = pytz.timezone('Europe/Moscow')
+ with self.cursor(dictionary=True) as cursor:
+ sql = "SELECT * FROM openwrt WHERE ap=%s AND text LIKE %s AND id > %s"
+ if limit is not None:
+ sql += f" LIMIT {limit}"
+
+ cursor.execute(sql, (access_point, f'%{filter_text}%', min_id))
+ data = []
+ for row in cursor.fetchall():
+ data.append(OpenwrtLogRecord(
+ id=int(row['id']),
+ text=row['text'],
+ log_time=row['log_time'].astimezone(tz),
+ received_time=row['received_time'].astimezone(tz)
+ ))
+
+ return data
diff --git a/include/py/homekit/database/clickhouse.py b/include/py/homekit/database/clickhouse.py
new file mode 100644
index 0000000..d0ec283
--- /dev/null
+++ b/include/py/homekit/database/clickhouse.py
@@ -0,0 +1,39 @@
+import logging
+
+from zoneinfo import ZoneInfo
+from datetime import datetime
+from clickhouse_driver import Client as ClickhouseClient
+from ..config import is_development_mode
+
+_links = {}
+
+
+def get_clickhouse(db: str) -> ClickhouseClient:
+ if db not in _links:
+ _links[db] = ClickhouseClient.from_url(f'clickhouse://localhost/{db}')
+
+ return _links[db]
+
+
+class ClickhouseDatabase:
+ def __init__(self, db: str):
+ self.db = get_clickhouse(db)
+
+ self.server_timezone = self.db.execute('SELECT timezone()')[0][0]
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def query(self, *args, **kwargs):
+ settings = {'use_client_time_zone': True}
+ kwargs['settings'] = settings
+
+ if 'no_tz_fix' not in kwargs and len(args) > 1 and isinstance(args[1], dict):
+ for k, v in args[1].items():
+ if isinstance(v, datetime):
+ args[1][k] = v.astimezone(tz=ZoneInfo(self.server_timezone))
+
+ result = self.db.execute(*args, **kwargs)
+
+ if is_development_mode():
+ self.logger.debug(args[0] if len(args) == 1 else args[0] % args[1])
+
+ return result
diff --git a/include/py/homekit/database/inverter.py b/include/py/homekit/database/inverter.py
new file mode 100644
index 0000000..fc3f74f
--- /dev/null
+++ b/include/py/homekit/database/inverter.py
@@ -0,0 +1,212 @@
+from time import time
+from datetime import datetime, timedelta
+from typing import Optional
+from collections import namedtuple
+
+from .clickhouse import ClickhouseDatabase
+
+
+IntervalList = list[list[Optional[datetime]]]
+
+
+class InverterDatabase(ClickhouseDatabase):
+ def __init__(self):
+ super().__init__('solarmon')
+
+ def add_generation(self, home_id: int, client_time: int, watts: int) -> None:
+ self.db.execute(
+ 'INSERT INTO generation (ClientTime, ReceivedTime, HomeID, Watts) VALUES',
+ [[client_time, round(time()), home_id, watts]]
+ )
+
+ def add_status(self, home_id: int,
+ client_time: int,
+ grid_voltage: int,
+ grid_freq: int,
+ ac_output_voltage: int,
+ ac_output_freq: int,
+ ac_output_apparent_power: int,
+ ac_output_active_power: int,
+ output_load_percent: int,
+ battery_voltage: int,
+ battery_voltage_scc: int,
+ battery_voltage_scc2: int,
+ battery_discharge_current: int,
+ battery_charge_current: int,
+ battery_capacity: int,
+ inverter_heat_sink_temp: int,
+ mppt1_charger_temp: int,
+ mppt2_charger_temp: int,
+ pv1_input_power: int,
+ pv2_input_power: int,
+ pv1_input_voltage: int,
+ pv2_input_voltage: int,
+ mppt1_charger_status: int,
+ mppt2_charger_status: int,
+ battery_power_direction: int,
+ dc_ac_power_direction: int,
+ line_power_direction: int,
+ load_connected: int) -> None:
+ self.db.execute("""INSERT INTO status (
+ ClientTime,
+ ReceivedTime,
+ HomeID,
+ GridVoltage,
+ GridFrequency,
+ ACOutputVoltage,
+ ACOutputFrequency,
+ ACOutputApparentPower,
+ ACOutputActivePower,
+ OutputLoadPercent,
+ BatteryVoltage,
+ BatteryVoltageSCC,
+ BatteryVoltageSCC2,
+ BatteryDischargingCurrent,
+ BatteryChargingCurrent,
+ BatteryCapacity,
+ HeatSinkTemp,
+ MPPT1ChargerTemp,
+ MPPT2ChargerTemp,
+ PV1InputPower,
+ PV2InputPower,
+ PV1InputVoltage,
+ PV2InputVoltage,
+ MPPT1ChargerStatus,
+ MPPT2ChargerStatus,
+ BatteryPowerDirection,
+ DCACPowerDirection,
+ LinePowerDirection,
+ LoadConnected) VALUES""", [[
+ client_time,
+ round(time()),
+ home_id,
+ grid_voltage,
+ grid_freq,
+ ac_output_voltage,
+ ac_output_freq,
+ ac_output_apparent_power,
+ ac_output_active_power,
+ output_load_percent,
+ battery_voltage,
+ battery_voltage_scc,
+ battery_voltage_scc2,
+ battery_discharge_current,
+ battery_charge_current,
+ battery_capacity,
+ inverter_heat_sink_temp,
+ mppt1_charger_temp,
+ mppt2_charger_temp,
+ pv1_input_power,
+ pv2_input_power,
+ pv1_input_voltage,
+ pv2_input_voltage,
+ mppt1_charger_status,
+ mppt2_charger_status,
+ battery_power_direction,
+ dc_ac_power_direction,
+ line_power_direction,
+ load_connected
+ ]])
+
+ def get_consumed_energy(self, dt_from: datetime, dt_to: datetime) -> float:
+ rows = self.query('SELECT ClientTime, ACOutputActivePower FROM status'
+ ' WHERE ClientTime >= %(from)s AND ClientTime <= %(to)s'
+ ' ORDER BY ClientTime', {'from': dt_from, 'to': dt_to})
+ prev_time = None
+ prev_wh = 0
+
+ ws = 0 # watt-seconds
+ for t, wh in rows:
+ if prev_time is not None:
+ n = (t - prev_time).total_seconds()
+ ws += prev_wh * n
+
+ prev_time = t
+ prev_wh = wh
+
+ return ws / 3600 # convert to watt-hours
+
+ def get_intervals_by_condition(self,
+ dt_from: datetime,
+ dt_to: datetime,
+ cond_start: str,
+ cond_end: str) -> IntervalList:
+ rows = None
+ ranges = [[None, None]]
+
+ while rows is None or len(rows) > 0:
+ if ranges[len(ranges)-1][0] is None:
+ condition = cond_start
+ range_idx = 0
+ else:
+ condition = cond_end
+ range_idx = 1
+
+ rows = self.query('SELECT ClientTime FROM status '
+ f'WHERE ClientTime > %(from)s AND ClientTime <= %(to)s AND {condition}'
+ ' ORDER BY ClientTime LIMIT 1',
+ {'from': dt_from, 'to': dt_to})
+ if not rows:
+ break
+
+ row = rows[0]
+
+ ranges[len(ranges) - 1][range_idx] = row[0]
+ if range_idx == 1:
+ ranges.append([None, None])
+
+ dt_from = row[0]
+
+ if ranges[len(ranges)-1][0] is None:
+ ranges.pop()
+ elif ranges[len(ranges)-1][1] is None:
+ ranges[len(ranges)-1][1] = dt_to - timedelta(seconds=1)
+
+ return ranges
+
+ def get_grid_connected_intervals(self, dt_from: datetime, dt_to: datetime) -> IntervalList:
+ return self.get_intervals_by_condition(dt_from, dt_to, 'GridFrequency > 0', 'GridFrequency = 0')
+
+ def get_grid_used_intervals(self, dt_from: datetime, dt_to: datetime) -> IntervalList:
+ return self.get_intervals_by_condition(dt_from,
+ dt_to,
+ "LinePowerDirection = 'Input'",
+ "LinePowerDirection != 'Input'")
+
+ def get_grid_consumed_energy(self, dt_from: datetime, dt_to: datetime) -> float:
+ PrevData = namedtuple('PrevData', 'time, pd, bat_chg, bat_dis, wh')
+
+ ws = 0 # watt-seconds
+ amps = 0 # amper-seconds
+
+ intervals = self.get_grid_used_intervals(dt_from, dt_to)
+ for dt_start, dt_end in intervals:
+ fields = ', '.join([
+ 'ClientTime',
+ 'DCACPowerDirection',
+ 'BatteryChargingCurrent',
+ 'BatteryDischargingCurrent',
+ 'ACOutputActivePower'
+ ])
+ rows = self.query(f'SELECT {fields} FROM status'
+ ' WHERE ClientTime >= %(from)s AND ClientTime < %(to)s ORDER BY ClientTime',
+ {'from': dt_start, 'to': dt_end})
+
+ prev = PrevData(time=None, pd=None, bat_chg=None, bat_dis=None, wh=None)
+ for ct, pd, bat_chg, bat_dis, wh in rows:
+ if prev.time is not None:
+ n = (ct-prev.time).total_seconds()
+ ws += prev.wh * n
+
+ if pd == 'DC/AC':
+ amps -= prev.bat_dis * n
+ elif pd == 'AC/DC':
+ amps += prev.bat_chg * n
+
+ prev = PrevData(time=ct, pd=pd, bat_chg=bat_chg, bat_dis=bat_dis, wh=wh)
+
+ amps /= 3600
+ wh = ws / 3600
+ wh += amps*48
+
+ return wh
diff --git a/include/py/homekit/database/inverter_time_formats.py b/include/py/homekit/database/inverter_time_formats.py
new file mode 100644
index 0000000..7c37d30
--- /dev/null
+++ b/include/py/homekit/database/inverter_time_formats.py
@@ -0,0 +1,2 @@
+FormatTime = '%Y-%m-%d %H:%M:%S'
+FormatDate = '%Y-%m-%d'
diff --git a/include/py/homekit/database/mysql.py b/include/py/homekit/database/mysql.py
new file mode 100644
index 0000000..fe97cd4
--- /dev/null
+++ b/include/py/homekit/database/mysql.py
@@ -0,0 +1,47 @@
+import time
+import logging
+
+from mysql.connector import connect, MySQLConnection, Error
+from typing import Optional
+from ..config import config
+
+link: Optional[MySQLConnection] = None
+logger = logging.getLogger(__name__)
+
+datetime_fmt = '%Y-%m-%d %H:%M:%S'
+
+
+def get_mysql() -> MySQLConnection:
+ global link
+
+ if link is not None:
+ return link
+
+ link = connect(
+ host=config['mysql']['host'],
+ user=config['mysql']['user'],
+ password=config['mysql']['password'],
+ database=config['mysql']['database'],
+ )
+ link.time_zone = '+01:00'
+ return link
+
+
+def mysql_now() -> str:
+ return time.strftime('%Y-%m-%d %H:%M:%S')
+
+
+class MySQLDatabase:
+ def __init__(self):
+ self.db = get_mysql()
+
+ def cursor(self, **kwargs):
+ try:
+ self.db.ping(reconnect=True, attempts=2)
+ except Error as e:
+ logger.exception(e)
+ self.db = get_mysql()
+ return self.db.cursor(**kwargs)
+
+ def commit(self):
+ self.db.commit()
diff --git a/include/py/homekit/database/sensors.py b/include/py/homekit/database/sensors.py
new file mode 100644
index 0000000..8155108
--- /dev/null
+++ b/include/py/homekit/database/sensors.py
@@ -0,0 +1,69 @@
+from time import time
+from datetime import datetime
+from typing import Tuple, List
+from .clickhouse import ClickhouseDatabase
+from ..api.types import TemperatureSensorLocation
+
+
+def get_temperature_table(sensor: TemperatureSensorLocation) -> str:
+ if sensor == TemperatureSensorLocation.DIANA:
+ return 'temp_diana'
+
+ elif sensor == TemperatureSensorLocation.STREET:
+ return 'temp_street'
+
+ elif sensor == TemperatureSensorLocation.BIG_HOUSE_1:
+ return 'temp'
+
+ elif sensor == TemperatureSensorLocation.BIG_HOUSE_2:
+ return 'temp_roof'
+
+ elif sensor == TemperatureSensorLocation.BIG_HOUSE_ROOM:
+ return 'temp_room'
+
+ elif sensor == TemperatureSensorLocation.SPB1:
+ return 'temp_spb1'
+
+
+class SensorsDatabase(ClickhouseDatabase):
+ def __init__(self):
+ super().__init__('home')
+
+ def add_temperature(self,
+ home_id: int,
+ client_time: int,
+ sensor: TemperatureSensorLocation,
+ temp: int,
+ rh: int):
+ table = get_temperature_table(sensor)
+ sql = """INSERT INTO """ + table + """ (
+ ClientTime,
+ ReceivedTime,
+ HomeID,
+ Temperature,
+ RelativeHumidity
+ ) VALUES"""
+ self.db.execute(sql, [[
+ client_time,
+ int(time()),
+ home_id,
+ temp,
+ rh
+ ]])
+
+ def get_temperature_recordings(self,
+ sensor: TemperatureSensorLocation,
+ time_range: Tuple[datetime, datetime],
+ home_id=1) -> List[tuple]:
+ table = get_temperature_table(sensor)
+ sql = f"""SELECT ClientTime, Temperature, RelativeHumidity
+ FROM {table}
+ WHERE ClientTime >= %(from)s AND ClientTime <= %(to)s
+ ORDER BY ClientTime"""
+ dt_from, dt_to = time_range
+
+ data = self.query(sql, {
+ 'from': dt_from,
+ 'to': dt_to
+ })
+ return [(date, temp/100, humidity/100) for date, temp, humidity in data]
diff --git a/include/py/homekit/database/simple_state.py b/include/py/homekit/database/simple_state.py
new file mode 100644
index 0000000..2b8ebe7
--- /dev/null
+++ b/include/py/homekit/database/simple_state.py
@@ -0,0 +1,48 @@
+import os
+import json
+import atexit
+
+from ._base import get_data_root_directory
+
+
+class SimpleState:
+ def __init__(self,
+ name: str,
+ default: dict = None):
+ if default is None:
+ default = {}
+ elif type(default) is not dict:
+ raise TypeError('default must be dictionary')
+
+ path = os.path.join(get_data_root_directory(), name)
+ if not os.path.exists(path):
+ self._data = default
+ else:
+ with open(path, 'r') as f:
+ self._data = json.loads(f.read())
+
+ self._file = path
+ atexit.register(self.__cleanup)
+
+ def __cleanup(self):
+ if hasattr(self, '_file'):
+ with open(self._file, 'w') as f:
+ f.write(json.dumps(self._data))
+ atexit.unregister(self.__cleanup)
+
+ def __del__(self):
+ if 'open' in __builtins__:
+ self.__cleanup()
+
+ def __getitem__(self, key):
+ return self._data[key]
+
+ def __setitem__(self, key, value):
+ self._data[key] = value
+
+ def __contains__(self, key):
+ return key in self._data
+
+ def __delitem__(self, key):
+ if key in self._data:
+ del self._data[key]
diff --git a/include/py/homekit/database/sqlite.py b/include/py/homekit/database/sqlite.py
new file mode 100644
index 0000000..1651a93
--- /dev/null
+++ b/include/py/homekit/database/sqlite.py
@@ -0,0 +1,70 @@
+import sqlite3
+import os.path
+import logging
+
+from ._base import get_data_root_directory
+from ..config import config, is_development_mode
+
+
+def _get_database_path(name: str) -> str:
+ return os.path.join(
+ get_data_root_directory(),
+ f'{name}.db')
+
+
+class SQLiteBase:
+ SCHEMA = 1
+
+ def __init__(self, name=None, path=None, check_same_thread=False):
+ if not path:
+ if not name:
+ name = config.app_name
+ database_path = _get_database_path(name)
+ else:
+ database_path = path
+ if not os.path.exists(os.path.dirname(database_path)):
+ os.makedirs(os.path.dirname(database_path))
+
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self.sqlite = sqlite3.connect(database_path, check_same_thread=check_same_thread)
+
+ if is_development_mode():
+ self.sql_logger = logging.getLogger(self.__class__.__name__)
+ self.sql_logger.setLevel('TRACE')
+ self.sqlite.set_trace_callback(self.sql_logger.trace)
+
+ sqlite_version = self._get_sqlite_version()
+ self.logger.debug(f'SQLite version: {sqlite_version}')
+
+ schema_version = self.schema_get_version()
+ self.logger.debug(f'Schema version: {schema_version}')
+
+ self.schema_init(schema_version)
+ self.schema_set_version(self.SCHEMA)
+
+ def __del__(self):
+ if self.sqlite:
+ self.sqlite.commit()
+ self.sqlite.close()
+
+ def _get_sqlite_version(self) -> str:
+ cursor = self.sqlite.cursor()
+ cursor.execute("SELECT sqlite_version()")
+ return cursor.fetchone()[0]
+
+ def schema_get_version(self) -> int:
+ cursor = self.sqlite.execute('PRAGMA user_version')
+ return int(cursor.fetchone()[0])
+
+ def schema_set_version(self, v) -> None:
+ self.sqlite.execute('PRAGMA user_version={:d}'.format(v))
+ self.logger.info(f'Schema set to {v}')
+
+ def cursor(self) -> sqlite3.Cursor:
+ return self.sqlite.cursor()
+
+ def commit(self) -> None:
+ return self.sqlite.commit()
+
+ def schema_init(self, version: int) -> None:
+ raise ValueError(f'{self.__class__.__name__}: must override schema_init')