diff options
117 files changed, 8012 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f95115a --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +/.idea +/venv +*.pyc +__pycache__ +.DS_Store +/src/test/test_inverter_monitor.log
\ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..bb24298 --- /dev/null +++ b/Makefile @@ -0,0 +1,46 @@ +INSTALL = /usr/bin/env install +GLOBAL_PREFIX = /usr/local + +ifeq ($(shell id -u), 0) + USER_PREFIX = /usr/local +else + USER_PREFIX = $(HOME)/.local +endif + +PROGRAMS = admin_bot inverter_bot pump_bot sensors_bot +PROGRAMS += inverter_mqtt_receiver inverter_mqtt_sender +PROGRAMS += sensors_mqtt_receiver sensors_mqtt_sender +PROGRAMS += si7021d +PROGRAMS += gpiorelayd +PROGRAMS += gpiosensord +#PROGRAMS += web_api + +all: + @echo "Supported commands:" + @echo + @echo " \033[1mmake install\033[0m symlink all programs to $(USER_PREFIX)" + @echo " \033[1mmake install-tools\033[0m copy admin scripts to /usr/local/bin" + @echo " \033[1mmake venv\033[0m create virtualenv and install dependencies" + @echo " \033[1mmake web-api-dev\033[0m launch web api development server" + @echo + +venv: + python3 -m venv venv + . ./venv/bin/activate && pip3 install -r requirements.txt + +web-api-dev: + . ./venv/bin/activate && FLASK_ENV=development python3 src/web_api.py + +install: check-root + for name in @(PROGRAMS); do ln -s src/${name}.py $(USER_PREFIX)/bin/$name; done + +install-tools: check-root + $(INSTALL) tools/clickhouse-backup.sh $(GLOBAL_PREFIX)/bin + chmod +x $(GLOBAL_PREFIX)/bin/clickhouse-backup.sh + +check-root: + ifneq ($(shell id -u), 0) + $(error "You must be root.") + endif + +.PHONY: all install install-local install-tools venv web-api-dev check-root
\ No newline at end of file diff --git a/assets/mqtt_ca.crt b/assets/mqtt_ca.crt new file mode 100644 index 0000000..045ae10 --- /dev/null +++ b/assets/mqtt_ca.crt @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIID4zCCAsugAwIBAgIUcW9D2Yym/nNf//Sfv1G8kwpEBCMwDQYJKoZIhvcNAQEL +BQAwgYAxCzAJBgNVBAYTAlJVMQ8wDQYDVQQIDAZNb3Njb3cxDzANBgNVBAcMBk1v +c2NvdzEUMBIGA1UECgwLU29sYXJNb24uUlUxFzAVBgNVBAMMDmNhLnNvbGFybW9u +LnJ1MSAwHgYJKoZIhvcNAQkBFhFhZG1pbkBzb2xhcm1vbi5ydTAeFw0yMTA1MTYx +NzI2MjRaFw0zMTA1MTQxNzI2MjRaMIGAMQswCQYDVQQGEwJSVTEPMA0GA1UECAwG +TW9zY293MQ8wDQYDVQQHDAZNb3Njb3cxFDASBgNVBAoMC1NvbGFyTW9uLlJVMRcw +FQYDVQQDDA5jYS5zb2xhcm1vbi5ydTEgMB4GCSqGSIb3DQEJARYRYWRtaW5Ac29s +YXJtb24ucnUwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDEEPOhEE74 +LDWVhtY3fFQu1HD3GYv2b8SgXXk1evFs2QiLtw7wtvVG9jM+JjLadY50gMZYlrKe +NqFxj7OutTx0RnkFLQ0Q3xkEsQOlWVvgFf4qwZ8pEgAnmVGHQjBeM4vmgY0Dxnqd +GLrjLVKwEMYM1PiV3pp1vMDJGouoxp3bOL7Iz++/07Atim9g8RZ+gyw080JJUKdB +7alR3ZfND2GMFXd03aosE5c7YqIwjGrT73K4sdqP8ydwEPtjBfn4b746uERllsT1 +EBc4Iv25RWdUy1p1YIaa8y9/34h7QPUSawjdnnL+Ktq9DCxv8WDKoSRK5E7bwswf +DKHFEmoI4IjHAgMBAAGjUzBRMB0GA1UdDgQWBBSqdoh/ZkUgfDWQoxjXU6CeIO4H +FDAfBgNVHSMEGDAWgBSqdoh/ZkUgfDWQoxjXU6CeIO4HFDAPBgNVHRMBAf8EBTAD +AQH/MA0GCSqGSIb3DQEBCwUAA4IBAQCM6JdaY+pT3E/8Tfz+M0R4kgqasyc9fAQP +g7tf2HrMPCtuIZF8aJYMNi0pfcnuUtr9FXFgGjyG+PZxqD2lHS+F/U5I8XqtTNJM +FW5Ls9dulRjmiGs0u8JbEX3igFTuCh0EZbtJgOLt2rOwSLv9PwI+ng4n8LBtbXVl +icfzWxGbnx/Bzoa7/Rk6Gs10Jf5bAeklchx/DbytSmoYSs9TxGdsrYkllznRts76 +6DHptSctecdi0svL4cE9dXWl6OSgG674khWPTd0I9bcHgJCQ6T1gPLRpnFJJ1ZT6 +ORgl25mkt+AX5U+naLMuUXU9TBKr3foxBMWqrSu5uC5K494Lbrvv +-----END CERTIFICATE----- diff --git a/doc/arecord_opi_lite.md b/doc/arecord_opi_lite.md new file mode 100644 index 0000000..153f9a1 --- /dev/null +++ b/doc/arecord_opi_lite.md @@ -0,0 +1,13 @@ +In order to use microphone on **Orange Pi Lite**: +- enable audio codec in `armbian-config` +- put this to `/etc/rc.local` (and make it executable): + ``` + for v in unmute cap; do + /usr/bin/amixer set "Line In" $v + /usr/bin/amixer set "Mic1" $v + done + + for k in "Mic1 Boost" "Line In" "Mic1"; do + /usr/bin/amixer set "$k" "86%" + done + ```
\ No newline at end of file diff --git a/doc/autossh.md b/doc/autossh.md new file mode 100644 index 0000000..fdcde19 --- /dev/null +++ b/doc/autossh.md @@ -0,0 +1,19 @@ +`/etc/systemd/system/my-ssh-tunnel.service`: + +``` +[Unit] +Description=ssh tunnel for localhost:22 +After=network.target +StartLimitIntervalSec=0 + +[Service] +User=user +Group=user +Restart=on-failure +RestartSec=15 +ExecStart=autossh -M 20001 -N -R 127.0.0.1:44223:127.0.0.1:22 -o StrictHostKeyChecking=no -o ExitOnForwardFailure=yes solarmon-tunnel@solarmon.ru +WorkingDirectory=/home/user + +[Install] +WantedBy=multi-user.target +```
\ No newline at end of file diff --git a/doc/database.md b/doc/database.md new file mode 100644 index 0000000..ba5a3d2 --- /dev/null +++ b/doc/database.md @@ -0,0 +1,65 @@ +# Databases + +## Inverter database + +ClickHouse tables: +```sql +CREATE TABLE status ( + ClientTime DateTime, + ReceivedTime DateTime, + HomeID UInt16, + GridVoltage UInt16, + GridFrequency UInt16, + ACOutputVoltage UInt16, + ACOutputFrequency UInt16, + ACOutputApparentPower UInt16, + ACOutputActivePower UInt16, + OutputLoadPercent UInt8, + BatteryVoltage UInt16, + BatteryVoltageSCC UInt16, + BatteryVoltageSCC2 UInt16, + BatteryDischargingCurrent UInt16, + BatteryChargingCurrent UInt16, + BatteryCapacity UInt8, + HeatSinkTemp UInt16, + MPPT1ChargerTemp UInt16, + MPPT2ChargerTemp UInt16, + PV1InputPower UInt16, + PV2InputPower UInt16, + PV1InputVoltage UInt16, + PV2InputVoltage UInt16, + MPPT1ChargerStatus Enum8('Abnormal' = 0, 'NotCharging' = 1, 'Charging' = 2), + MPPT2ChargerStatus Enum8('Abnormal' = 0, 'NotCharging' = 1, 'Charging' = 2), + BatteryPowerDirection Enum8('DoNothing' = 0, 'Charge' = 1, 'Discharge' = 2), + DCACPowerDirection Enum8('DoNothing' = 0, 'AC/DC' = 1, 'DC/AC' = 2), + LinePowerDirection Enum8('DoNothing' = 0, 'Input' = 1, 'Output' = 2), + LoadConnected Enum8('Disconnected' = 0, 'Connected' = 1) +) ENGINE = MergeTree() +PARTITION BY toYYYYMMDD(ReceivedTime) +ORDER BY (HomeID, ReceivedTime); + +CREATE TABLE generation ( + ClientTime DateTime, + ReceivedTime DateTime, + HomeID UInt16, + Watts UInt16 +) ENGINE = MergeTree() +PARTITION BY toYYYYMMDD(ReceivedTime) +ORDER BY (HomeID, ReceivedTime); +``` + + +## Sensors database + +ClickHouse tables: +```sql +CREATE TABLE temp_table_name ( + ClientTime DateTime, + ReceivedTime DateTime, + HomeID UInt16, + Temperature Int16, + RelativeHumidity UInt16 +) ENGINE = MergeTree() +PARTITION BY toYYYYMMDD(ReceivedTime) +ORDER BY (HomeID, ReceivedTime); +```
\ No newline at end of file diff --git a/doc/gpio_h3.md b/doc/gpio_h3.md new file mode 100644 index 0000000..d3032ff --- /dev/null +++ b/doc/gpio_h3.md @@ -0,0 +1,7 @@ +As root: + +``` +git clone https://github.com/duxingkei33/orangepi_PC_gpio_pyH3 +cd orangepi_PC_gpio_pyH3 +python3 setup.pysdlfksdf install +```
\ No newline at end of file diff --git a/doc/inverter_bot.md b/doc/inverter_bot.md new file mode 100644 index 0000000..c9b299c --- /dev/null +++ b/doc/inverter_bot.md @@ -0,0 +1,76 @@ +# Inverter Bot + +### Bot configuration + +**`~/.config/inverter_bot/config.toml`**: + +```toml +[bot] +token = "..." +users = [ 1, 2, 3 ] +notify_users = [ 1, 2 ] + +[inverter] +host = "127.0.0.1" +port = 8305 + +[monitor] +vlow = 47 +vcrit = 45 + +gen_currents = [2, 10, 20, 30] +gen_raise_intervals = [ + 180, # 3 minutes for 2 A, then + 120, # 2 more minutes for 10 A, then + 120, # 3 more minutes for 20 A, then, finally, 30 A +] +gen_cur30_v_limit = 56.9 +gen_cur20_v_limit = 56.7 +gen_cur10_v_limit = 54 + +gen_floating_v = 54 +gen_floating_time_max = 7200 + +[logging] +verbose = false + +[api] +token = "..." +``` + +### systemd integration + +**`/etc/systemd/system/inverter_bot.service`**: + +```systemd +[Unit] +Description=inverter bot +After=inverterd.service + +[Service] +User=user +Group=user +Restart=on-failure +ExecStart=/home/user/home/bin/inverter_bot +WorkingDirectory=/home/user + +[Install] +WantedBy=multi-user.target +``` + + +### Commands +``` +lang - Set language +status - Show status +config - Show configuration +errors - Show errors +flags - Toggle flags +calcw - Calculate daily watts usage +calcwadv - Advanced watts usage calculator +setbatuv - Set battery under voltage +setgencc - Set AC charging current +setgenct - Set AC charging thresholds +monstatus - Monitor: dump state +monsetcur - Monitor: set charging currents +```
\ No newline at end of file diff --git a/doc/sensors_bot.md b/doc/sensors_bot.md new file mode 100644 index 0000000..9f1c008 --- /dev/null +++ b/doc/sensors_bot.md @@ -0,0 +1,33 @@ +# Sensors Bot + +Configuration is stored in **`~/.config/sensors_bot/config.toml`**. + +Example: + +```toml +[bot] +token = "..." +users = [ + 1, # user 1 + 2, # user 2 + 3, # user 3 +] + +[api] +token = ..." + +[sensors.name1] +ip = "192.168.0.2" +port = 8306 +label_ru = "Тут" +label_en = "Here" + +[sensors.name2] +ip = "192.168.0.3" +port = 8307 +label_ru = "Там" +label_en = "There" + +[logging] +verbose = false +```
\ No newline at end of file diff --git a/doc/sound_node.md b/doc/sound_node.md new file mode 100644 index 0000000..03e84e4 --- /dev/null +++ b/doc/sound_node.md @@ -0,0 +1,72 @@ +# Sound Node + +## Requirements + +``` +apt install -y python3-aiohttp python3-requests python3-toml +``` + +## Configuration + +Orange Pi Lite config (`/etc/sound_node.toml`): + +```toml +[node] +listen = "0.0.0.0:8313" +process_wait_timeout = 10 +name = "nodename" + +record_max_time = 1800 +storage = "/var/recordings" + +[arecord] +bin = "/usr/bin/arecord" + +[lame] +bin = "/usr/bin/lame" +bitrate = 192 + +[amixer] +bin = "/usr/bin/amixer" +controls = [ + { + name = "Line In", + caps = ["mute", "cap", "volume"] + }, + { + name = "Mic1", + caps = ["mute", "cap", "volume"] + }, + { + name = "Mic1 Boost", + caps = ["volume"] + } +] + +[logging] +verbose = false +default_fmt = true +``` + +## Audio recording + +Install `lame`. + +Command to record audio: `arecord -v -f S16 -r 44100 -t raw 2>/dev/null | lame -r -s 44.1 -b 192 -m m - output.mp3 >/dev/null 2>/dev/null` + +## Uploading audios to remote server + +- Generate ssh keys for root on each sound node: + ``` + cd /root/.ssh + ssh-keygen -t ed25519 + ``` +- Add public keys on the remote server +- Copy `tools/sync-recordings-to-remote.sh` script to `/usr/local/bin` on all sound nodes, don't forget to `chmod +x` it. +- Add following lines to the root crontab (on all sound nodes): + ``` + TG_TOKEN="your telegram bot token" + TG_CHAT_ID="your telegram chat id" + + 30 * * * * /usr/local/bin/sync-recordings-to-remote.sh + ```
\ No newline at end of file diff --git a/doc/test_api.md b/doc/test_api.md new file mode 100644 index 0000000..9c0483f --- /dev/null +++ b/doc/test_api.md @@ -0,0 +1,12 @@ +# test_api.py + +Config example: +```toml +[api] +host = "app-dev.domain.ru" +token = "" +basic_auth = "user:password" + +[logging] +verbose = true +```
\ No newline at end of file diff --git a/pyA20/__init__.pyi b/pyA20/__init__.pyi new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/pyA20/__init__.pyi diff --git a/pyA20/gpio/connector.pyi b/pyA20/gpio/connector.pyi new file mode 100644 index 0000000..12b2b6e --- /dev/null +++ b/pyA20/gpio/connector.pyi @@ -0,0 +1,2 @@ +gpio1 = 0 +LED = 0
\ No newline at end of file diff --git a/pyA20/gpio/gpio.pyi b/pyA20/gpio/gpio.pyi new file mode 100644 index 0000000..225fcbe --- /dev/null +++ b/pyA20/gpio/gpio.pyi @@ -0,0 +1,24 @@ +HIGH = 1 +LOW = 0 +INPUT = 0 +OUTPUT = 0 +PULLUP = 0 +PULLDOWN = 0 + +def init(): + pass + +def setcfg(gpio: int, cfg: int): + pass + +def getcfg(gpio: int): + pass + +def output(gpio: int, value: int): + pass + +def pullup(gpio: int, pull: int): + pass + +def input(gpio: int): + pass
\ No newline at end of file diff --git a/pyA20/gpio/port.pyi b/pyA20/gpio/port.pyi new file mode 100644 index 0000000..17f69fe --- /dev/null +++ b/pyA20/gpio/port.pyi @@ -0,0 +1,36 @@ +# these are not real values, just placeholders + +PA12 = 0 +PA11 = 0 +PA6 = 0 + +PA1 = 0 +PA0 = 0 + +PA3 = 0 +PC0 = 0 +PC1 = 0 +PC2 = 0 +PA19 = 0 +PA7 = 0 +PA8 = 0 +PA9 = 0 +PA10 = 0 +PA20 = 0 + +PA13 = 0 +PA14 = 0 +PD14 = 0 +PC4 = 0 +PC7 = 0 +PA2 = 0 +PC3 = 0 +PA21 = 0 +PA18 = 0 +PG8 = 0 +PG9 = 0 +PG6 = 0 +PG7 = 0 + +POWER_LED = 0 +STATUS_LED = 0
\ No newline at end of file diff --git a/pyA20/port.pyi b/pyA20/port.pyi new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/pyA20/port.pyi diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..90bdd44 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,17 @@ +paho-mqtt~=1.5.1 +inverterd~=1.0.3 +clickhouse-driver~=0.2.0 +toml~=0.10.2 +Flask~=2.0.2 +mysql-connector-python~=8.0.27 +Werkzeug~=2.0.2 +uwsgi~=2.0.20 +python-telegram-bot~=13.1 +inverterd~=1.0.2 +requests~=2.26.0 +aiohttp~=3.8.1 +pytz~=2021.3 + +# following can be installed from debian repositories +# matplotlib~=3.5.0 + diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/__init__.py diff --git a/src/admin_bot.py b/src/admin_bot.py new file mode 100755 index 0000000..88e71e8 --- /dev/null +++ b/src/admin_bot.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +from typing import Optional +from telegram import ReplyKeyboardMarkup +from telegram.ext import MessageHandler +from home.config import config +from home.bot import Wrapper, Context, text_filter + + +def get_latest_logs(ctx: Context): + u = ctx.user + ctx.reply(ctx.lang('blbla')) + + +class AdminBot(Wrapper): + def __init__(self): + super().__init__() + + self.lang.ru(get_latest_logs="Смотреть последние логи") + self.lang.en(get_latest_logs="Get latest logs") + + self.add_handler(MessageHandler(text_filter(self.lang('get_latest_logs')), self.wrap(get_latest_logs))) + + def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [ + [self.lang('get_latest_logs')] + ] + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +if __name__ == '__main__': + config.load('admin_bot') + + bot = AdminBot() + # bot.enable_logging(BotType.ADMIN) + bot.run() diff --git a/src/gpiorelayd.py b/src/gpiorelayd.py new file mode 100755 index 0000000..f39a86a --- /dev/null +++ b/src/gpiorelayd.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +import logging +import os +import sys + +from home.config import config +from home.util import parse_addr +from home.relay.server import RelayServer + +logger = logging.getLogger(__name__) + + +if __name__ == '__main__': + if not os.getegid() == 0: + sys.exit('Must be run as root.') + + config.load() + + try: + s = RelayServer(pinname=config['relayd']['pin'], + addr=parse_addr(config['relayd']['listen'])) + s.run() + except KeyboardInterrupt: + logger.info('Exiting...') diff --git a/src/home/__init__.py b/src/home/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/home/__init__.py diff --git a/src/home/api/__init__.py b/src/home/api/__init__.py new file mode 100644 index 0000000..782a61e --- /dev/null +++ b/src/home/api/__init__.py @@ -0,0 +1,11 @@ +import importlib + +__all__ = ['WebAPIClient', 'RequestParams'] + + +def __getattr__(name): + if name in __all__: + module = importlib.import_module(f'.web_api_client', __name__) + return getattr(module, name) + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/home/api/__init__.pyi b/src/home/api/__init__.pyi new file mode 100644 index 0000000..1b812d6 --- /dev/null +++ b/src/home/api/__init__.pyi @@ -0,0 +1,4 @@ +from .web_api_client import ( + RequestParams as RequestParams, + WebAPIClient as WebAPIClient +) diff --git a/src/home/api/errors/__init__.py b/src/home/api/errors/__init__.py new file mode 100644 index 0000000..efb06aa --- /dev/null +++ b/src/home/api/errors/__init__.py @@ -0,0 +1 @@ +from .api_response_error import ApiResponseError diff --git a/src/home/api/errors/api_response_error.py b/src/home/api/errors/api_response_error.py new file mode 100644 index 0000000..6910b2d --- /dev/null +++ b/src/home/api/errors/api_response_error.py @@ -0,0 +1,28 @@ +from typing import Optional + + +class ApiResponseError(Exception): + def __init__(self, + status_code: int, + error_type: str, + error_message: str, + error_stacktrace: Optional[list[str]] = None): + super().__init__() + self.status_code = status_code + self.error_message = error_message + self.error_type = error_type + self.error_stacktrace = error_stacktrace + + def __str__(self): + def st_formatter(line: str): + return f'Remote| {line}' + + s = f'{self.error_type}: {self.error_message} (HTTP {self.status_code})' + if self.error_stacktrace is not None: + st = [] + for st_line in self.error_stacktrace: + st.append('\n'.join(st_formatter(st_subline) for st_subline in st_line.split('\n'))) + s += '\nRemote stacktrace:\n' + s += '\n'.join(st) + + return s diff --git a/src/home/api/types/__init__.py b/src/home/api/types/__init__.py new file mode 100644 index 0000000..9f27ff6 --- /dev/null +++ b/src/home/api/types/__init__.py @@ -0,0 +1,6 @@ +from .types import ( + BotType, + TemperatureSensorDataType, + TemperatureSensorLocation, + SoundSensorLocation +) diff --git a/src/home/api/types/types.py b/src/home/api/types/types.py new file mode 100644 index 0000000..b6233e6 --- /dev/null +++ b/src/home/api/types/types.py @@ -0,0 +1,29 @@ +from enum import Enum, auto + + +class BotType(Enum): + INVERTER = auto() + PUMP = auto() + SENSORS = auto() + ADMIN = auto() + SOUND = auto() + + +class TemperatureSensorLocation(Enum): + BIG_HOUSE_1 = auto() + BIG_HOUSE_2 = auto() + STREET = auto() + DIANA = auto() + SPB1 = auto() + + +class TemperatureSensorDataType(Enum): + TEMPERATURE = auto() + RELATIVE_HUMIDITY = auto() + + +class SoundSensorLocation(Enum): + DIANA = auto() + BIG_HOUSE = auto() + SPB1 = auto() + diff --git a/src/home/api/web_api_client.py b/src/home/api/web_api_client.py new file mode 100644 index 0000000..e3b0988 --- /dev/null +++ b/src/home/api/web_api_client.py @@ -0,0 +1,210 @@ +import requests +import json +import threading +import logging + +from collections import namedtuple +from datetime import datetime +from enum import Enum, auto +from typing import Optional, Callable, Union +from requests.auth import HTTPBasicAuth + +from .errors import ApiResponseError +from .types import * +from ..config import config +from ..util import stringify +from ..sound import RecordFile, SoundNodeClient + +logger = logging.getLogger(__name__) + + +RequestParams = namedtuple('RequestParams', 'params, files, method') + + +class HTTPMethod(Enum): + GET = auto() + POST = auto() + + +class WebAPIClient: + token: str + timeout: Union[float, tuple[float, float]] + basic_auth: Optional[HTTPBasicAuth] + do_async: bool + async_error_handler: Optional[Callable] + async_success_handler: Optional[Callable] + + def __init__(self, timeout: Union[float, tuple[float, float]] = 5): + self.token = config['api']['token'] + self.timeout = timeout + self.basic_auth = None + self.do_async = False + self.async_error_handler = None + self.async_success_handler = None + + if 'basic_auth' in config['api']: + ba = config['api']['basic_auth'] + col = ba.index(':') + + user = ba[:col] + pw = ba[col+1:] + + logger.debug(f'enabling basic auth: {user}:{pw}') + self.basic_auth = HTTPBasicAuth(user, pw) + + # api methods + # ----------- + + def log_bot_request(self, + bot: BotType, + user_id: int, + message: str): + return self._post('logs/bot-request/', { + 'bot': bot.value, + 'user_id': str(user_id), + 'message': message + }) + + def log_openwrt(self, + lines: list[tuple[int, str]]): + return self._post('logs/openwrt', { + 'logs': stringify(lines) + }) + + def get_sensors_data(self, + sensor: TemperatureSensorLocation, + hours: int): + data = self._get('sensors/data/', { + 'sensor': sensor.value, + 'hours': hours + }) + return [(datetime.fromtimestamp(date), temp, hum) for date, temp, hum in data] + + def add_sound_sensor_hits(self, + hits: list[tuple[str, int]]): + return self._post('sound_sensors/hits/', { + 'hits': stringify(hits) + }) + + def get_sound_sensor_hits(self, + location: SoundSensorLocation, + after: datetime) -> list[dict]: + return self._process_sound_sensor_hits_data(self._get('sound_sensors/hits/', { + 'after': int(after.timestamp()), + 'location': location.value + })) + + def get_last_sound_sensor_hits(self, location: SoundSensorLocation, last: int): + return self._process_sound_sensor_hits_data(self._get('sound_sensors/hits/', { + 'last': last, + 'location': location.value + })) + + def recordings_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]: + files = self._get('recordings/list/', {'extended': int(extended)})['data'] + if as_objects: + return SoundNodeClient.record_list_from_serialized(files) + return files + + def _process_sound_sensor_hits_data(self, data: list[dict]) -> list[dict]: + for item in data: + item['time'] = datetime.fromtimestamp(item['time']) + return data + + # internal methods + # ---------------- + + def _get(self, *args, **kwargs): + return self._call(method=HTTPMethod.GET, *args, **kwargs) + + def _post(self, *args, **kwargs): + return self._call(method=HTTPMethod.POST, *args, **kwargs) + + def _call(self, + name: str, + params: dict, + method: HTTPMethod, + files: Optional[dict[str, str]] = None): + if not self.do_async: + return self._make_request(name, params, method, files) + else: + t = threading.Thread(target=self._make_request_in_thread, args=(name, params, method, files)) + t.start() + return None + + def _make_request(self, + name: str, + params: dict, + method: HTTPMethod = HTTPMethod.GET, + files: Optional[dict[str, str]] = None) -> Optional[any]: + domain = config['api']['host'] + kwargs = {} + + if self.basic_auth is not None: + kwargs['auth'] = self.basic_auth + + if method == HTTPMethod.GET: + if files: + raise RuntimeError('can\'t upload files using GET, please use me properly') + kwargs['params'] = params + f = requests.get + else: + kwargs['data'] = params + f = requests.post + + fd = {} + if files: + for fname, fpath in files.items(): + fd[fname] = open(fpath, 'rb') + kwargs['files'] = fd + + try: + r = f(f'https://{domain}/api/{name}', + headers={'X-Token': self.token}, + timeout=self.timeout, + **kwargs) + + if r.headers['content-type'] != 'application/json': + raise ApiResponseError(r.status_code, 'TypeError', 'content-type is not application/json') + + data = json.loads(r.text) + if r.status_code != 200 or data['result'] == 'error': + raise ApiResponseError(r.status_code, + data['error']['type'], + data['error']['message'], + data['error']['stacktrace'] if 'stacktrace' in data['error'] else None) + + return data['data'] if 'data' in data else True + finally: + for fname, f in fd.items(): + # logger.debug(f'closing file {fname} (fd={f})') + try: + f.close() + except Exception as exc: + logger.exception(exc) + pass + + def _make_request_in_thread(self, name, params, method, files): + try: + result = self._make_request(name, params, method, files) + self._report_async_success(result, name, RequestParams(params=params, method=method, files=files)) + except Exception as e: + logger.exception(e) + self._report_async_error(e, name, RequestParams(params=params, method=method, files=files)) + + def enable_async(self, + success_handler: Optional[Callable] = None, + error_handler: Optional[Callable] = None): + self.do_async = True + if error_handler: + self.async_error_handler = error_handler + if success_handler: + self.async_success_handler = success_handler + + def _report_async_error(self, *args): + if self.async_error_handler: + self.async_error_handler(*args) + + def _report_async_success(self, *args): + if self.async_success_handler: + self.async_success_handler(*args)
\ No newline at end of file diff --git a/src/home/bot/__init__.py b/src/home/bot/__init__.py new file mode 100644 index 0000000..5e68af7 --- /dev/null +++ b/src/home/bot/__init__.py @@ -0,0 +1,6 @@ +from .reporting import ReportingHelper +from .lang import LangPack +from .wrapper import Wrapper, Context, text_filter +from .store import Store +from .errors import * +from .util import command_usage, user_any_name
\ No newline at end of file diff --git a/src/home/bot/errors.py b/src/home/bot/errors.py new file mode 100644 index 0000000..74eee6f --- /dev/null +++ b/src/home/bot/errors.py @@ -0,0 +1,2 @@ +class StoreNotEnabledError(Exception): + pass
\ No newline at end of file diff --git a/src/home/bot/lang.py b/src/home/bot/lang.py new file mode 100644 index 0000000..2f10358 --- /dev/null +++ b/src/home/bot/lang.py @@ -0,0 +1,76 @@ +import logging + +from typing import Union, Optional + +logger = logging.getLogger(__name__) + + +class LangStrings(dict): + _lang: Optional[str] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._lang = None + + def setlang(self, lang: str): + self._lang = lang + + def __missing__(self, key): + logger.warning(f'key {key} is missing in language {self._lang}') + return '{%s}' % key + + def __setitem__(self, key, value): + raise NotImplementedError(f'setting translation strings this way is prohibited (was trying to set {key}={value})') + + +class LangPack: + strings: dict[str, LangStrings[str, str]] + default_lang: str + + def __init__(self): + self.strings = {} + self.default_lang = 'en' + + def ru(self, **kwargs) -> None: + self.set(kwargs, 'ru') + + def en(self, **kwargs) -> None: + self.set(kwargs, 'en') + + def set(self, + strings: Union[LangStrings, dict], + lang: str) -> None: + + if isinstance(strings, dict) and not isinstance(strings, LangStrings): + strings = LangStrings(**strings) + strings.setlang(lang) + + if lang not in self.strings: + self.strings[lang] = strings + else: + self.strings[lang].update(strings) + + def all(self, key): + result = [] + for strings in self.strings.values(): + result.append(strings[key]) + return result + + @property + def languages(self) -> list[str]: + return list(self.strings.keys()) + + def get(self, key: str, lang: str, *args) -> str: + return self.strings[lang][key] % args + + def __call__(self, *args, **kwargs): + return self.strings[self.default_lang][args[0]] + + def __getitem__(self, key): + return self.strings[self.default_lang][key] + + def __setitem__(self, key, value): + raise NotImplementedError('setting translation strings this way is prohibited') + + def __contains__(self, key): + return key in self.strings[self.default_lang] diff --git a/src/home/bot/reporting.py b/src/home/bot/reporting.py new file mode 100644 index 0000000..df3da2a --- /dev/null +++ b/src/home/bot/reporting.py @@ -0,0 +1,22 @@ +import logging + +from telegram import Message +from ..api import WebAPIClient as APIClient +from ..api.errors import ApiResponseError +from ..api.types import BotType + +logger = logging.getLogger(__name__) + + +class ReportingHelper: + def __init__(self, client: APIClient, bot_type: BotType): + self.client = client + self.bot_type = bot_type + + def report(self, message, text: str = None) -> None: + if text is None: + text = message.text + try: + self.client.log_bot_request(self.bot_type, message.chat_id, text) + except ApiResponseError as error: + logger.exception(error) diff --git a/src/home/bot/store.py b/src/home/bot/store.py new file mode 100644 index 0000000..aeedc47 --- /dev/null +++ b/src/home/bot/store.py @@ -0,0 +1,80 @@ +import sqlite3 +import os.path +import logging + +from ..config import config + +logger = logging.getLogger(__name__) + + +def _get_database_path() -> str: + return os.path.join(os.environ['HOME'], '.config', config.app_name, 'bot.db') + + +class Store: + SCHEMA_VERSION = 1 + + def __init__(self): + self.sqlite = sqlite3.connect(_get_database_path(), check_same_thread=False) + + sqlite_version = self._get_sqlite_version() + logger.info(f'SQLite version: {sqlite_version}') + + schema_version = self._get_schema_version() + logger.info(f'Schema version: {schema_version}') + + if schema_version < 1: + self._database_init() + elif schema_version < Store.SCHEMA_VERSION: + self._database_upgrade(Store.SCHEMA_VERSION) + + def __del__(self): + if self.sqlite: + self.sqlite.commit() + self.sqlite.close() + + def _get_sqlite_version(self) -> str: + cursor = self.sqlite.cursor() + cursor.execute("SELECT sqlite_version()") + + return cursor.fetchone()[0] + + def _get_schema_version(self) -> int: + cursor = self.sqlite.execute('PRAGMA user_version') + return int(cursor.fetchone()[0]) + + def _set_schema_version(self, v) -> None: + self.sqlite.execute('PRAGMA user_version={:d}'.format(v)) + logger.info(f'Schema set to {v}') + + def _database_init(self) -> None: + cursor = self.sqlite.cursor() + cursor.execute("""CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY, + lang TEXT NOT NULL + )""") + self.sqlite.commit() + self._set_schema_version(1) + + def _database_upgrade(self, version: int) -> None: + # do the upgrade here + + # self.sqlite.commit() + self._set_schema_version(version) + + def get_user_lang(self, user_id: int, default: str = 'en') -> str: + cursor = self.sqlite.cursor() + cursor.execute('SELECT lang FROM users WHERE id=?', (user_id,)) + row = cursor.fetchone() + + if row is None: + cursor.execute('INSERT INTO users (id, lang) VALUES (?, ?)', (user_id, default)) + self.sqlite.commit() + return default + else: + return row[0] + + def set_user_lang(self, user_id: int, lang: str) -> None: + cursor = self.sqlite.cursor() + cursor.execute('UPDATE users SET lang=? WHERE id=?', (lang, user_id)) + self.sqlite.commit()
\ No newline at end of file diff --git a/src/home/bot/util.py b/src/home/bot/util.py new file mode 100644 index 0000000..4f80a67 --- /dev/null +++ b/src/home/bot/util.py @@ -0,0 +1,57 @@ +from telegram import User +from .lang import LangStrings + +_strings = { + 'en': LangStrings( + usage='Usage', + arguments='Arguments' + ), + 'ru': LangStrings( + usage='Использование', + arguments='Аргументы' + ) +} + + +def command_usage(command: str, arguments: dict, language='en') -> str: + if language not in _strings: + raise ValueError('unsupported language') + + blocks = [] + argument_names = [] + argument_lines = [] + for k, v in arguments.items(): + argument_names.append(k) + argument_lines.append( + f'<code>{k}</code>: {v}' + ) + + command = f'/{command}' + if argument_names: + command += ' ' + ' '.join(argument_names) + + blocks.append( + f'<b>{_strings[language]["usage"]}</b>\n' + f'<code>{command}</code>' + ) + + if argument_lines: + blocks.append( + f'<b>{_strings[language]["arguments"]}</b>\n' + '\n'.join(argument_lines) + ) + + return '\n\n'.join(blocks) + + +def user_any_name(user: User) -> str: + name = [user.first_name, user.last_name] + name = list(filter(lambda s: s is not None, name)) + name = ' '.join(name).strip() + + if not name: + name = user.username + + if not name: + name = str(user.id) + + return name diff --git a/src/home/bot/wrapper.py b/src/home/bot/wrapper.py new file mode 100644 index 0000000..8651e90 --- /dev/null +++ b/src/home/bot/wrapper.py @@ -0,0 +1,339 @@ +import logging +import traceback + +from html import escape +from telegram import ( + Update, + ParseMode, + ReplyKeyboardMarkup, + CallbackQuery, + User, +) +from telegram.ext import ( + Updater, + Filters, + BaseFilter, + Handler, + CommandHandler, + MessageHandler, + CallbackQueryHandler, + CallbackContext, + ConversationHandler +) +from telegram.error import TimedOut +from ..config import config +from typing import Optional, Union +from .store import Store +from .lang import LangPack +from ..api.types import BotType +from ..api import WebAPIClient +from .reporting import ReportingHelper + +logger = logging.getLogger(__name__) +languages = { + 'en': 'English', + 'ru': 'Русский' +} +LANG_STARTED = range(1) +user_filter: Optional[BaseFilter] = None + + +def default_langpack() -> LangPack: + lang = LangPack() + lang.en( + start_message="Select command on the keyboard.", + unknown_message="Unknown message", + cancel="Cancel", + select_language="Select language on the keyboard.", + invalid_language="Invalid language. Please try again.", + language_saved='Saved.', + ) + lang.ru( + start_message="Выберите команду на клавиатуре.", + unknown_message="Неизвестная команда", + cancel="Отмена", + select_language="Выберите язык на клавиатуре.", + invalid_language="Неверный язык. Пожалуйста, попробуйте снова", + language_saved="Настройки сохранены." + ) + return lang + + +def init_user_filter(): + global user_filter + if user_filter is None: + if 'users' in config['bot']: + logger.info('allowed users: ' + str(config['bot']['users'])) + user_filter = Filters.user(config['bot']['users']) + else: + user_filter = Filters.all # not sure if this is correct + + +def text_filter(*args): + init_user_filter() + return Filters.text(args[0] if isinstance(args[0], list) else [*args]) & user_filter + + +def exc2text(e: Exception) -> str: + tb = ''.join(traceback.format_tb(e.__traceback__)) + return f'{e.__class__.__name__}: ' + escape(str(e)) + "\n\n" + escape(tb) + + +class IgnoreMarkup: + pass + + +class Context: + _update: Optional[Update] + _callback_context: Optional[CallbackContext] + _markup_getter: callable + _lang: LangPack + _store: Optional[Store] + _user_lang: Optional[str] + + def __init__(self, + update: Optional[Update], + callback_context: Optional[CallbackContext], + markup_getter: callable, + lang: LangPack, + store: Optional[Store]): + self._update = update + self._callback_context = callback_context + self._markup_getter = markup_getter + self._lang = lang + self._store = store + self._user_lang = None + + def reply(self, text, markup=None): + if markup is None: + markup = self._markup_getter(self) + kwargs = dict(parse_mode=ParseMode.HTML) + if not isinstance(markup, IgnoreMarkup): + kwargs['reply_markup'] = markup + self._update.message.reply_text(text, **kwargs) + + def reply_exc(self, e: Exception) -> None: + self.reply(exc2text(e)) + + def answer(self, text: str = None): + self.callback_query.answer(text) + + def edit(self, text, markup=None): + kwargs = dict(parse_mode=ParseMode.HTML) + if not isinstance(markup, IgnoreMarkup): + kwargs['reply_markup'] = markup + self.callback_query.edit_message_text(text, **kwargs) + + @property + def text(self) -> str: + return self._update.message.text + + @property + def callback_query(self) -> CallbackQuery: + return self._update.callback_query + + @property + def args(self) -> Optional[list[str]]: + return self._callback_context.args + + @property + def user_id(self) -> int: + return self.user.id + + @property + def user(self) -> User: + return self._update.effective_user + + @property + def user_lang(self) -> str: + if self._user_lang is None: + self._user_lang = self._store.get_user_lang(self.user_id) + return self._user_lang + + def lang(self, key: str, *args) -> str: + return self._lang.get(key, self.user_lang, *args) + + def is_callback_context(self) -> bool: + return self._update.callback_query and self._update.callback_query.data and self._update.callback_query.data != '' + + +class Wrapper: + store: Optional[Store] + updater: Updater + lang: LangPack + reporting: Optional[ReportingHelper] + + def __init__(self): + self.updater = Updater(config['bot']['token'], + request_kwargs={'read_timeout': 6, 'connect_timeout': 7}) + self.lang = default_langpack() + self.store = Store() + self.reporting = None + + init_user_filter() + + dispatcher = self.updater.dispatcher + dispatcher.add_handler(CommandHandler('start', self.wrap(self.start), user_filter)) + + # transparently log all messages + self.add_handler(MessageHandler(Filters.all & user_filter, self.logging_message_handler), group=10) + self.add_handler(CallbackQueryHandler(self.logging_callback_handler), group=10) + + def run(self): + self._lang_setup() + self.updater.dispatcher.add_handler( + MessageHandler(Filters.all & user_filter, self.wrap(self.any)) + ) + + # start the bot + self.updater.start_polling() + + # run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT + self.updater.idle() + + def enable_logging(self, bot_type: BotType): + api = WebAPIClient(timeout=3) + api.enable_async() + + self.reporting = ReportingHelper(api, bot_type) + + def logging_message_handler(self, update: Update, context: CallbackContext): + if self.reporting is None: + return + + self.reporting.report(update.message) + + def logging_callback_handler(self, update: Update, context: CallbackContext): + if self.reporting is None: + return + + self.reporting.report(update.callback_query.message, text=update.callback_query.data) + + def wrap(self, f: callable): + def handler(update: Update, context: CallbackContext): + ctx = Context(update, + callback_context=context, + markup_getter=self.markup, + lang=self.lang, + store=self.store) + + try: + return f(ctx) + except Exception as e: + if not self.exception_handler(e, ctx) and not isinstance(e, TimedOut): + logger.exception(e) + if not ctx.is_callback_context(): + ctx.reply_exc(e) + else: + self.notify_user(ctx.user_id, exc2text(e)) + + return handler + + def add_handler(self, handler: Handler, group=0): + self.updater.dispatcher.add_handler(handler, group=group) + + def start(self, ctx: Context): + if 'start_message' not in self.lang: + ctx.reply('Please define start_message or override start()') + return + + ctx.reply(ctx.lang('start_message')) + + def any(self, ctx: Context): + if 'invalid_command' not in self.lang: + ctx.reply('Please define invalid_command or override any()') + return + + ctx.reply(ctx.lang('invalid_command')) + + def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]: + return None + + def exception_handler(self, e: Exception, ctx: Context) -> Optional[bool]: + pass + + def notify_all(self, text_getter: callable, exclude: tuple[int] = ()) -> None: + if 'notify_users' not in config['bot']: + logger.error('notify_all() called but no notify_users directive found in the config') + return + + for user_id in config['bot']['notify_users']: + if user_id in exclude: + continue + + text = text_getter(self.store.get_user_lang(user_id)) + self.updater.bot.send_message(chat_id=user_id, + text=text, + parse_mode='HTML') + + def notify_user(self, user_id: int, text: Union[str, Exception]) -> None: + if isinstance(text, Exception): + text = exc2text(text) + self.updater.bot.send_message(chat_id=user_id, text=text, parse_mode='HTML') + + def send_audio(self, user_id, **kwargs): + self.updater.bot.send_audio(chat_id=user_id, **kwargs) + + def send_file(self, user_id, **kwargs): + self.updater.bot.send_document(chat_id=user_id, **kwargs) + + # + # Language Selection + # + + def _lang_setup(self): + supported = self.lang.languages + if len(supported) > 1: + cancel_filter = Filters.text(self.lang.all('cancel')) + + self.add_handler(ConversationHandler( + entry_points=[CommandHandler('lang', self.wrap(self._lang_command), user_filter)], + states={ + LANG_STARTED: [ + *list(map(lambda key: MessageHandler(text_filter(languages[key]), + self.wrap(self._lang_input)), supported)), + MessageHandler(user_filter & ~cancel_filter, self.wrap(self._lang_invalid_input)) + ] + }, + fallbacks=[MessageHandler(user_filter & cancel_filter, self.wrap(self._lang_cancel_input))] + )) + + def _lang_command(self, ctx: Context): + logger.debug(f'current language: {ctx.user_lang}') + + buttons = [] + for name in languages.values(): + buttons.append(name) + markup = ReplyKeyboardMarkup([buttons, [ctx.lang('cancel')]], one_time_keyboard=False) + + ctx.reply(ctx.lang('select_language'), markup=markup) + return LANG_STARTED + + def _lang_input(self, ctx: Context): + lang = None + for key, value in languages.items(): + if value == ctx.text: + lang = key + break + + if lang is None: + ValueError('could not find the language') + + self.store.set_user_lang(ctx.user_id, lang) + + ctx.reply(ctx.lang('language_saved'), markup=IgnoreMarkup()) + + self.start(ctx) + return ConversationHandler.END + + def _lang_invalid_input(self, ctx: Context): + ctx.reply(self.lang('invalid_language'), markup=IgnoreMarkup()) + return LANG_STARTED + + def _lang_cancel_input(self, ctx: Context): + self.start(ctx) + return ConversationHandler.END + + @property + def user_filter(self): + return user_filter diff --git a/src/home/config/__init__.py b/src/home/config/__init__.py new file mode 100644 index 0000000..d4b1c27 --- /dev/null +++ b/src/home/config/__init__.py @@ -0,0 +1 @@ +from .config import ConfigStore, config, is_development_mode diff --git a/src/home/config/config.py b/src/home/config/config.py new file mode 100644 index 0000000..75cfc3a --- /dev/null +++ b/src/home/config/config.py @@ -0,0 +1,110 @@ +import toml +import logging +import os + +from os.path import join, isdir, isfile +from typing import Optional, Any, MutableMapping +from argparse import ArgumentParser + + +def _get_config_path(name: str) -> str: + dirname = join(os.environ['HOME'], '.config', name) + filename = join(os.environ['HOME'], '.config', f'{name}.toml') + if isdir(dirname): + return join(dirname, 'config.toml') + elif isfile(filename): + return filename + else: + raise IOError(f'configuration file not found (tried {dirname}/config.toml and {filename})') + + +class ConfigStore: + data: MutableMapping[str, Any] + app_name: Optional[str] + + def __int__(self): + self.data = {} + self.app_name = None + + def load(self, name: Optional[str] = None, + use_cli=True, + parser: ArgumentParser = None): + self.app_name = name + + if (name is None) and (not use_cli): + raise RuntimeError('either config name must be none or use_cli must be True') + + log_default_fmt = False + log_file = None + log_verbose = False + + path = None + if use_cli: + if parser is None: + parser = ArgumentParser() + parser.add_argument('--config', type=str, required=name is None, + help='Path to the config in TOML format') + parser.add_argument('--verbose', action='store_true') + parser.add_argument('--log-file', type=str) + parser.add_argument('--log-default-fmt', action='store_true') + args = parser.parse_args() + + if args.config: + path = args.config + if args.verbose: + log_verbose = True + if args.log_file: + log_file = args.log_file + if args.log_default_fmt: + log_default_fmt = args.log_default_fmt + + if name and path is None: + path = _get_config_path(name) + + self.data = toml.load(path) + + if 'logging' in self: + if not log_file and 'file' in self['logging']: + log_file = self['logging']['file'] + if log_default_fmt and 'default_fmt' in self['logging']: + log_default_fmt = self['logging']['default_fmt'] + + setup_logging(log_verbose, log_file, log_default_fmt) + + if use_cli: + return args + + def __getitem__(self, key): + return self.data[key] + + def __setitem__(self, key, value): + raise NotImplementedError('overwriting config values is prohibited') + + def __contains__(self, key): + return key in self.data + + +config = ConfigStore() + + +def is_development_mode() -> bool: + if 'FLASK_ENV' in os.environ and os.environ['FLASK_ENV'] == 'development': + return True + + return ('logging' in config) and ('verbose' in config['logging']) and (config['logging']['verbose'] is True) + + +def setup_logging(verbose=False, log_file=None, default_fmt=False): + logging_level = logging.INFO + if is_development_mode() or verbose: + logging_level = logging.DEBUG + + log_config = {'level': logging_level} + if not default_fmt: + log_config['format'] = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + + if log_file is not None: + log_config['filename'] = log_file + log_config['encoding'] = 'utf-8' + + logging.basicConfig(**log_config) diff --git a/src/home/database/__init__.py b/src/home/database/__init__.py new file mode 100644 index 0000000..b50cbce --- /dev/null +++ b/src/home/database/__init__.py @@ -0,0 +1,29 @@ +import importlib + +__all__ = [ + 'get_mysql', + 'mysql_now', + 'get_clickhouse', + 'SimpleState', + + 'SensorsDatabase', + 'InverterDatabase', + 'BotsDatabase' +] + + +def __getattr__(name: str): + if name in __all__: + if name.endswith('Database'): + file = name[:-8].lower() + elif 'mysql' in name: + file = 'mysql' + elif 'clickhouse' in name: + file = 'clickhouse' + else: + file = 'simple_state' + + module = importlib.import_module(f'.{file}', __name__) + return getattr(module, name) + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/home/database/__init__.pyi b/src/home/database/__init__.pyi new file mode 100644 index 0000000..31aae5d --- /dev/null +++ b/src/home/database/__init__.pyi @@ -0,0 +1,11 @@ +from .mysql import ( + get_mysql as get_mysql, + mysql_now as mysql_now +) +from .clickhouse import get_clickhouse as get_clickhouse + +from simple_state import SimpleState as SimpleState + +from .sensors import SensorsDatabase as SensorsDatabase +from .inverter import InverterDatabase as InverterDatabase +from .bots import BotsDatabase as BotsDatabase diff --git a/src/home/database/bots.py b/src/home/database/bots.py new file mode 100644 index 0000000..bc490e1 --- /dev/null +++ b/src/home/database/bots.py @@ -0,0 +1,104 @@ +import pytz + +from .mysql import mysql_now, MySQLDatabase, datetime_fmt +from ..api.types import ( + BotType, + SoundSensorLocation +) +from typing import Optional +from datetime import datetime +from html import escape + + +class OpenwrtLogRecord: + id: int + log_time: datetime + received_time: datetime + text: str + + def __init__(self, id, text, log_time, received_time): + self.id = id + self.text = text + self.log_time = log_time + self.received_time = received_time + + def __repr__(self): + return f"<b>{self.log_time.strftime('%H:%M:%S')}</b> {escape(self.text)}" + + +class BotsDatabase(MySQLDatabase): + def add_request(self, + bot: BotType, + user_id: int, + message: str): + with self.cursor() as cursor: + cursor.execute("INSERT INTO requests_log (user_id, message, bot, time) VALUES (%s, %s, %s, %s)", + (user_id, message, bot.name.lower(), mysql_now())) + self.commit() + + def add_openwrt_logs(self, + lines: list[tuple[datetime, str]]): + now = datetime.now() + with self.cursor() as cursor: + for line in lines: + time, text = line + cursor.execute("INSERT INTO openwrt (log_time, received_time, text) VALUES (%s, %s, %s)", + (time.strftime(datetime_fmt), now.strftime(datetime_fmt), text)) + self.commit() + + def add_sound_hits(self, + hits: list[tuple[SoundSensorLocation, int]], + time: datetime): + with self.cursor() as cursor: + for loc, count in hits: + cursor.execute("INSERT INTO sound_hits (location, `time`, hits) VALUES (%s, %s, %s)", + (loc.name.lower(), time.strftime(datetime_fmt), count)) + self.commit() + + def get_sound_hits(self, + location: SoundSensorLocation, + after: Optional[datetime] = None, + last: Optional[int] = None) -> list[dict]: + with self.cursor(dictionary=True) as cursor: + sql = "SELECT `time`, hits FROM sound_hits WHERE location=%s" + args = [location.name.lower()] + + if after: + sql += ' AND `time` >= %s ORDER BY time DESC' + args.append(after) + elif last: + sql += ' ORDER BY time DESC LIMIT 0, %s' + args.append(last) + else: + raise ValueError('no `after`, no `last`, what do you expect?') + + cursor.execute(sql, tuple(args)) + data = [] + for row in cursor.fetchall(): + data.append({ + 'time': row['time'], + 'hits': row['hits'] + }) + return data + + def get_openwrt_logs(self, + filter_text: str, + min_id: int, + limit: int = None) -> list[OpenwrtLogRecord]: + tz = pytz.timezone('Europe/Moscow') + with self.cursor(dictionary=True) as cursor: + sql = "SELECT * FROM openwrt WHERE text LIKE %s AND id > %s" + if limit is not None: + sql += f" LIMIT {limit}" + + cursor.execute(sql, (f'%{filter_text}%', min_id)) + data = [] + for row in cursor.fetchall(): + data.append(OpenwrtLogRecord( + id=int(row['id']), + text=row['text'], + log_time=row['log_time'].astimezone(tz), + received_time=row['received_time'].astimezone(tz) + )) + + return data diff --git a/src/home/database/clickhouse.py b/src/home/database/clickhouse.py new file mode 100644 index 0000000..4a2a247 --- /dev/null +++ b/src/home/database/clickhouse.py @@ -0,0 +1,10 @@ +from clickhouse_driver import Client as ClickhouseClient + +_links = {} + + +def get_clickhouse(db: str) -> ClickhouseClient: + if db not in _links: + _links[db] = ClickhouseClient.from_url(f'clickhouse://localhost/{db}') + + return _links[db] diff --git a/src/home/database/inverter.py b/src/home/database/inverter.py new file mode 100644 index 0000000..8902f04 --- /dev/null +++ b/src/home/database/inverter.py @@ -0,0 +1,102 @@ +from .clickhouse import get_clickhouse +from time import time + + +class InverterDatabase: + def __init__(self): + self.db = get_clickhouse('solarmon') + + def add_generation(self, home_id: int, client_time: int, watts: int) -> None: + self.db.execute( + 'INSERT INTO generation (ClientTime, ReceivedTime, HomeID, Watts) VALUES', + [[client_time, round(time()), home_id, watts]] + ) + + def add_status(self, home_id: int, + client_time: int, + grid_voltage: int, + grid_freq: int, + ac_output_voltage: int, + ac_output_freq: int, + ac_output_apparent_power: int, + ac_output_active_power: int, + output_load_percent: int, + battery_voltage: int, + battery_voltage_scc: int, + battery_voltage_scc2: int, + battery_discharging_current: int, + battery_charging_current: int, + battery_capacity: int, + inverter_heat_sink_temp: int, + mppt1_charger_temp: int, + mppt2_charger_temp: int, + pv1_input_power: int, + pv2_input_power: int, + pv1_input_voltage: int, + pv2_input_voltage: int, + mppt1_charger_status: int, + mppt2_charger_status: int, + battery_power_direction: int, + dc_ac_power_direction: int, + line_power_direction: int, + load_connected: int) -> None: + self.db.execute("""INSERT INTO status ( + ClientTime, + ReceivedTime, + HomeID, + GridVoltage, + GridFrequency, + ACOutputVoltage, + ACOutputFrequency, + ACOutputApparentPower, + ACOutputActivePower, + OutputLoadPercent, + BatteryVoltage, + BatteryVoltageSCC, + BatteryVoltageSCC2, + BatteryDischargingCurrent, + BatteryChargingCurrent, + BatteryCapacity, + HeatSinkTemp, + MPPT1ChargerTemp, + MPPT2ChargerTemp, + PV1InputPower, + PV2InputPower, + PV1InputVoltage, + PV2InputVoltage, + MPPT1ChargerStatus, + MPPT2ChargerStatus, + BatteryPowerDirection, + DCACPowerDirection, + LinePowerDirection, + LoadConnected) VALUES""", [[ + client_time, + round(time()), + home_id, + grid_voltage, + grid_freq, + ac_output_voltage, + ac_output_freq, + ac_output_apparent_power, + ac_output_active_power, + output_load_percent, + battery_voltage, + battery_voltage_scc, + battery_voltage_scc2, + battery_discharging_current, + battery_charging_current, + battery_capacity, + inverter_heat_sink_temp, + mppt1_charger_temp, + mppt2_charger_temp, + pv1_input_power, + pv2_input_power, + pv1_input_voltage, + pv2_input_voltage, + mppt1_charger_status, + mppt2_charger_status, + battery_power_direction, + dc_ac_power_direction, + line_power_direction, + load_connected + ]]) diff --git a/src/home/database/mysql.py b/src/home/database/mysql.py new file mode 100644 index 0000000..fe97cd4 --- /dev/null +++ b/src/home/database/mysql.py @@ -0,0 +1,47 @@ +import time +import logging + +from mysql.connector import connect, MySQLConnection, Error +from typing import Optional +from ..config import config + +link: Optional[MySQLConnection] = None +logger = logging.getLogger(__name__) + +datetime_fmt = '%Y-%m-%d %H:%M:%S' + + +def get_mysql() -> MySQLConnection: + global link + + if link is not None: + return link + + link = connect( + host=config['mysql']['host'], + user=config['mysql']['user'], + password=config['mysql']['password'], + database=config['mysql']['database'], + ) + link.time_zone = '+01:00' + return link + + +def mysql_now() -> str: + return time.strftime('%Y-%m-%d %H:%M:%S') + + +class MySQLDatabase: + def __init__(self): + self.db = get_mysql() + + def cursor(self, **kwargs): + try: + self.db.ping(reconnect=True, attempts=2) + except Error as e: + logger.exception(e) + self.db = get_mysql() + return self.db.cursor(**kwargs) + + def commit(self): + self.db.commit() diff --git a/src/home/database/sensors.py b/src/home/database/sensors.py new file mode 100644 index 0000000..ca53dd0 --- /dev/null +++ b/src/home/database/sensors.py @@ -0,0 +1,66 @@ +from time import time +from datetime import datetime +from typing import Tuple, List +from .clickhouse import get_clickhouse +from ..api.types import TemperatureSensorLocation + + +def get_temperature_table(sensor: TemperatureSensorLocation) -> str: + if sensor == TemperatureSensorLocation.DIANA: + return 'temp_diana' + + elif sensor == TemperatureSensorLocation.STREET: + return 'temp_street' + + elif sensor == TemperatureSensorLocation.BIG_HOUSE_1: + return 'temp' + + elif sensor == TemperatureSensorLocation.BIG_HOUSE_2: + return 'temp_roof' + + elif sensor == TemperatureSensorLocation.SPB1: + return 'temp_spb1' + + +class SensorsDatabase: + def __init__(self): + self.db = get_clickhouse('home') + + def add_temperature(self, + home_id: int, + client_time: int, + sensor: TemperatureSensorLocation, + temp: int, + rh: int): + table = get_temperature_table(sensor) + sql = """INSERT INTO """ + table + """ ( + ClientTime, + ReceivedTime, + HomeID, + Temperature, + RelativeHumidity + ) VALUES""" + self.db.execute(sql, [[ + client_time, + int(time()), + home_id, + temp, + rh + ]]) + + def get_temperature_recordings(self, + sensor: TemperatureSensorLocation, + time_range: Tuple[datetime, datetime], + home_id=1) -> List[tuple]: + table = get_temperature_table(sensor) + sql = f"""SELECT ClientTime, Temperature, RelativeHumidity + FROM {table} + WHERE ClientTime >= %(from)s AND ClientTime <= %(to)s + ORDER BY ClientTime""" + dt_from, dt_to = time_range + + data = self.db.execute(sql, { + 'from': dt_from, + 'to': dt_to + }) + return [(date, temp/100, humidity/100) for date, temp, humidity in data] diff --git a/src/home/database/simple_state.py b/src/home/database/simple_state.py new file mode 100644 index 0000000..cada9c8 --- /dev/null +++ b/src/home/database/simple_state.py @@ -0,0 +1,46 @@ +import os +import json +import atexit + + +class SimpleState: + def __init__(self, + file: str, + default: dict = None, + **kwargs): + if default is None: + default = {} + elif type(default) is not dict: + raise TypeError('default must be dictionary') + + if not os.path.exists(file): + self._data = default + else: + with open(file, 'r') as f: + self._data = json.loads(f.read()) + + self._file = file + atexit.register(self.__cleanup) + + def __cleanup(self): + if hasattr(self, '_file'): + with open(self._file, 'w') as f: + f.write(json.dumps(self._data)) + atexit.unregister(self.__cleanup) + + def __del__(self): + if 'open' in __builtins__: + self.__cleanup() + + def __getitem__(self, key): + return self._data[key] + + def __setitem__(self, key, value): + self._data[key] = value + + def __contains__(self, key): + return key in self._data + + def __delitem__(self, key): + if key in self._data: + del self._data[key] diff --git a/src/home/inverter/__init__.py b/src/home/inverter/__init__.py new file mode 100644 index 0000000..b184580 --- /dev/null +++ b/src/home/inverter/__init__.py @@ -0,0 +1,8 @@ +from .monitor import ( + ChargingEvent, + InverterMonitor, + BatteryState, + BatteryPowerDirection +) +from .inverter_wrapper import wrapper_instance +from .util import beautify_table diff --git a/src/home/inverter/inverter_wrapper.py b/src/home/inverter/inverter_wrapper.py new file mode 100644 index 0000000..df2c2fc --- /dev/null +++ b/src/home/inverter/inverter_wrapper.py @@ -0,0 +1,48 @@ +import json + +from threading import Lock +from inverterd import ( + Format, + Client as InverterClient, + InverterError +) + +_lock = Lock() + + +class InverterClientWrapper: + def __init__(self): + self._inverter = None + self._host = None + self._port = None + + def init(self, host: str, port: int): + self._host = host + self._port = port + self.create() + + def create(self): + self._inverter = InverterClient(host=self._host, port=self._port) + self._inverter.connect() + + def exec(self, command: str, arguments: tuple = (), format=Format.JSON): + with _lock: + try: + self._inverter.format(format) + response = self._inverter.exec(command, arguments) + if format == Format.JSON: + response = json.loads(response) + return response + except InverterError as e: + raise e + except Exception as e: + # silently try to reconnect + try: + self.create() + except Exception: + pass + raise e + + +wrapper_instance = InverterClientWrapper() + diff --git a/src/home/inverter/monitor.py b/src/home/inverter/monitor.py new file mode 100644 index 0000000..02ae155 --- /dev/null +++ b/src/home/inverter/monitor.py @@ -0,0 +1,448 @@ +import logging +import time + +from enum import Enum, auto +from threading import Thread +from typing import Callable, Optional +from .inverter_wrapper import wrapper_instance as inverter +from inverterd import InverterError +from ..util import Stopwatch, StopwatchError +from ..config import config + +logger = logging.getLogger(__name__) + + +class BatteryPowerDirection(Enum): + DISCHARGING = auto() + CHARGING = auto() + DO_NOTHING = auto() + + +class ChargingEvent(Enum): + AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR = auto() + AC_NOT_CHARGING = auto() + AC_CHARGING_STARTED = auto() + AC_DISCONNECTED = auto() + AC_CURRENT_CHANGED = auto() + AC_MOSTLY_CHARGED = auto() + AC_CHARGING_FINISHED = auto() + + +class ChargingState(Enum): + NOT_CHARGING = auto() + AC_BUT_SOLAR = auto() + AC_WAITING = auto() + AC_OK = auto() + AC_DONE = auto() + + +class CurrentChangeDirection(Enum): + UP = auto() + DOWN = auto() + + +class BatteryState(Enum): + NORMAL = auto() + LOW = auto() + CRITICAL = auto() + + +def _pd_from_string(pd: str) -> BatteryPowerDirection: + if pd == 'Discharge': + return BatteryPowerDirection.DISCHARGING + elif pd == 'Charge': + return BatteryPowerDirection.CHARGING + elif pd == 'Do nothing': + return BatteryPowerDirection.DO_NOTHING + else: + raise ValueError(f'invalid power direction: {pd}') + + +class MonitorConfig: + def __getattr__(self, item): + return config['monitor'][item] + + +cfg = MonitorConfig() + + +class InverterMonitor(Thread): + charging_event_handler: Optional[Callable] + battery_event_handler: Optional[Callable] + error_handler: Optional[Callable] + + def __init__(self): + super().__init__() + self.setName('InverterMonitor') + + self.interrupted = False + self.min_allowed_current = 0 + + # Event handlers for the bot. + self.charging_event_handler = None + self.battery_event_handler = None + self.error_handler = None + + # Currents list, defined in the bot config. + self.currents = cfg.gen_currents + self.currents.sort() + + # We start charging at lowest possible current, then increase it once per minute (or so) to the maximum level. + # This is done so that the load on the generator increases smoothly, not abruptly. Generator will thank us. + self.current_change_direction = CurrentChangeDirection.UP + self.next_current_enter_time = 0 + self.active_current_idx = -1 + + self.battery_state = BatteryState.NORMAL + self.charging_state = ChargingState.NOT_CHARGING + + # 'Mostly-charged' means that we've already lowered the charging current to the level + # at which batteries are charging pretty slow. So instead of burning gasoline and shaking the air, + # we can just turn the generator off at this point. + self.mostly_charged = False + + # The stopwatch is used to measure how long does the battery voltage exceeds the float voltage level. + # We don't want to damage our batteries, right? + self.floating_stopwatch = Stopwatch() + + @property + def active_current(self) -> Optional[int]: + try: + if self.active_current_idx < 0: + return None + return self.currents[self.active_current_idx] + except IndexError: + return None + + def run(self): + # Check allowed currents and validate the config. + allowed_currents = list(inverter.exec('get-allowed-ac-charging-currents')['data']) + allowed_currents.sort() + + for a in self.currents: + if a not in allowed_currents: + raise ValueError(f'invalid value {a} in gen_currents list') + + self.min_allowed_current = min(allowed_currents) + + # Read data and run implemented programs every 2 seconds. + while not self.interrupted: + try: + response = inverter.exec('get-status') + if response['result'] != 'ok': + logger.error('get-status failed:', response) + else: + gs = response['data'] + + ac = gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0 + solar = gs['pv1_input_power']['value'] > 0 + v = float(gs['battery_voltage']['value']) + load_watts = int(gs['ac_output_active_power']['value']) + pd = _pd_from_string(gs['battery_power_direction']) + + logger.debug(f'got status: ac={ac}, solar={solar}, v={v}, pd={pd}') + + self.gen_charging_program(ac, solar, v, pd) + + if not ac or pd != BatteryPowerDirection.CHARGING: + # if AC is disconnected or not charging, run the low voltage checking program + self.low_voltage_program(v, load_watts) + + elif self.battery_state != BatteryState.NORMAL: + # AC is connected and the battery is charging, assume battery level is normal + self.battery_state = BatteryState.NORMAL + + except InverterError as e: + logger.exception(e) + + time.sleep(2) + + def gen_charging_program(self, + ac: bool, # whether AC is connected + solar: bool, # whether MPPT is active + v: float, # current battery voltage + pd: BatteryPowerDirection # current power direction + ): + if self.charging_state == ChargingState.NOT_CHARGING: + if ac and solar: + # Not charging because MPPT is active (solar line is connected). + # Notify users about it and change the current state. + self.charging_state = ChargingState.AC_BUT_SOLAR + self.charging_event_handler(ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR) + logger.info('entering AC_BUT_SOLAR state') + elif ac: + # Not charging, but AC is connected and ready to use. + # Start the charging program. + self.gen_start(pd) + + elif self.charging_state == ChargingState.AC_BUT_SOLAR: + if not ac: + # AC charger has been disconnected. Since the state is AC_BUT_SOLAR, + # charging probably never even started. Stop the charging program. + self.gen_stop(ChargingState.NOT_CHARGING) + elif not solar: + # MPPT has been disconnected, and, since AC is still connected, we can + # try to start the charging program. + self.gen_start(pd) + + elif self.charging_state in (ChargingState.AC_OK, ChargingState.AC_WAITING): + if not ac: + # Charging was in progress, but AC has been suddenly disconnected. + # Sad, but what can we do? Stop the charging program and return. + self.gen_stop(ChargingState.NOT_CHARGING) + return + + if solar: + # Charging was in progress, but MPPT has been detected. Inverter doesn't charge + # batteries from AC when MPPT is active, so we have to pause our program. + self.charging_state = ChargingState.AC_BUT_SOLAR + self.charging_event_handler(ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR) + try: + self.floating_stopwatch.pause() + except StopwatchError: + msg = 'gen_charging_program: floating_stopwatch.pause() failed at (1)' + logger.warning(msg) + self.error_handler(msg) + logger.info('solar power connected during charging, entering AC_BUT_SOLAR state') + + # No surprises at this point, just check the values and make decisions based on them. + # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + # We've reached the 'mostly-charged' point, the voltage level is not float, + # but inverter decided to stop charging (or somebody used a kettle, lol). + # Anyway, assume that charging is complete, stop the program, notify users and return. + if self.mostly_charged and v > (cfg.gen_floating_v - 1) and pd != BatteryPowerDirection.CHARGING: + self.gen_stop(ChargingState.AC_DONE) + return + + # Monitor inverter power direction and notify users when it changes. + state = ChargingState.AC_OK if pd == BatteryPowerDirection.CHARGING else ChargingState.AC_WAITING + if state != self.charging_state: + self.charging_state = state + + evt = ChargingEvent.AC_CHARGING_STARTED if state == ChargingState.AC_OK else ChargingEvent.AC_NOT_CHARGING + self.charging_event_handler(evt) + + if self.floating_stopwatch.get_elapsed_time() >= cfg.gen_floating_time_max: + # We've been at a bulk voltage level too long, so we have to stop charging. + # Set the minimum current possible. + + if self.current_change_direction == CurrentChangeDirection.UP: + # This shouldn't happen, obviously an error. + msg = 'gen_charging_program:' + msg += ' been at bulk voltage level too long, but current change direction is still \'up\'!' + msg += ' This is obviously an error, please fix it' + logger.warning(msg) + self.error_handler(msg) + + self.gen_next_current(current=self.min_allowed_current) + + elif self.active_current is not None: + # If voltage is greater than float voltage, keep the stopwatch ticking + if v > cfg.gen_floating_v and self.floating_stopwatch.is_paused(): + try: + self.floating_stopwatch.go() + except StopwatchError: + msg = 'gen_charging_program: floating_stopwatch.go() failed at (2)' + logger.warning(msg) + self.error_handler(msg) + # Otherwise, pause it + elif v <= cfg.gen_floating_v and not self.floating_stopwatch.is_paused(): + try: + self.floating_stopwatch.pause() + except StopwatchError: + msg = 'gen_charging_program: floating_stopwatch.pause() failed at (3)' + logger.warning(msg) + self.error_handler(msg) + + # Charging current monitoring + if self.current_change_direction == CurrentChangeDirection.UP: + # Generator is warming up in this code path + + if self.next_current_enter_time != 0 and pd != BatteryPowerDirection.CHARGING: + # Generator was warming up and charging, but stopped (pd has changed). + # Resetting to the minimum possible pd + logger.info(f'gen_charging_program (warming path): was charging but power direction suddeny changed. resetting to minimum current') + self.next_current_enter_time = 0 + self.gen_next_current(current=self.min_allowed_current) + + elif self.next_current_enter_time == 0 and pd == BatteryPowerDirection.CHARGING: + self.next_current_enter_time = time.time() + cfg.gen_raise_intervals[self.active_current_idx] + logger.info(f'gen_charging_program (warming path): set next_current_enter_time to {self.next_current_enter_time}') + + elif self.next_current_enter_time != 0 and time.time() >= self.next_current_enter_time: + logger.info('gen_charging_program (warming path): hit next_current_enter_time, calling gen_next_current()') + self.gen_next_current() + else: + # Gradually lower the current level, based on how close + # battery voltage has come to the bulk level. + if self.active_current >= 30: + upper_bound = cfg.gen_cur30_v_limit + elif self.active_current == 20: + upper_bound = cfg.gen_cur20_v_limit + else: + upper_bound = cfg.gen_cur10_v_limit + + # Voltage is high enough already and it's close to bulk level; we hit the upper bound, + # so let's lower the current + if v >= upper_bound: + self.gen_next_current() + + elif self.charging_state == ChargingState.AC_DONE: + # We've already finished charging, but AC was connected. Not that it's disconnected, + # set the appropriate state and notify users. + if not ac: + self.gen_stop(ChargingState.NOT_CHARGING) + + def gen_start(self, pd: BatteryPowerDirection): + if pd == BatteryPowerDirection.CHARGING: + self.charging_state = ChargingState.AC_OK + self.charging_event_handler(ChargingEvent.AC_CHARGING_STARTED) + logger.info('AC line connected and charging, entering AC_OK state') + + # Continue the stopwatch, if needed + try: + self.floating_stopwatch.go() + except StopwatchError: + msg = 'floating_stopwatch.go() failed at ac_charging_start(), AC_OK path' + logger.warning(msg) + self.error_handler(msg) + else: + self.charging_state = ChargingState.AC_WAITING + self.charging_event_handler(ChargingEvent.AC_NOT_CHARGING) + logger.info('AC line connected but not charging yet, entering AC_WAITING state') + + # Pause the stopwatch, if needed + try: + if not self.floating_stopwatch.is_paused(): + self.floating_stopwatch.pause() + except StopwatchError: + msg = 'floating_stopwatch.pause() failed at ac_charging_start(), AC_WAITING path' + logger.warning(msg) + self.error_handler(msg) + + # idx == -1 means haven't started our program yet. + if self.active_current_idx == -1: + self.gen_next_current() + # self.set_hw_charging_current(self.min_allowed_current) + + def gen_stop(self, reason: ChargingState): + self.charging_state = reason + + if reason == ChargingState.AC_DONE: + event = ChargingEvent.AC_CHARGING_FINISHED + elif reason == ChargingState.NOT_CHARGING: + event = ChargingEvent.AC_DISCONNECTED + else: + raise ValueError(f'ac_charging_stop: unexpected reason {reason}') + + logger.info(f'charging is finished, entering {reason} state') + self.charging_event_handler(event) + + # Let Mr. Proper do his job + if self.active_current_idx != -1: + self.next_current_enter_time = 0 + self.mostly_charged = False + self.active_current_idx = -1 + self.floating_stopwatch.reset() + + def gen_next_current(self, current=None): + if current is None: + try: + current = self._next_current() + logger.debug(f'gen_next_current: ready to change charging current to {current} A') + except IndexError: + logger.debug('gen_next_current: was going to change charging current, but no currents left; finishing charging program') + self.gen_stop(ChargingState.AC_DONE) + return + + else: + try: + idx = self.currents.index(current) + except ValueError: + msg = f'gen_next_current: got current={current} but it\'s not in the currents list' + logger.error(msg) + self.error_handler(msg) + return + self.active_current_idx = idx + + if self.current_change_direction == CurrentChangeDirection.DOWN: + if current == self.currents[0]: + self.mostly_charged = True + self.gen_stop(ChargingState.AC_DONE) + + elif current == self.currents[1] and not self.mostly_charged: + self.mostly_charged = True + self.charging_event_handler(ChargingEvent.AC_MOSTLY_CHARGED) + + self.set_hw_charging_current(current) + + def set_hw_charging_current(self, current: int): + try: + response = inverter.exec('set-max-ac-charging-current', (0, current)) + if response['result'] != 'ok': + logger.error(f'failed to change AC charging current to {current} A') + raise InverterError('set-max-ac-charging-current: inverterd reported error') + else: + self.charging_event_handler(ChargingEvent.AC_CURRENT_CHANGED, current=current) + logger.info(f'changed AC charging current to {current} A') + except InverterError as e: + self.error_handler(f'failed to set charging current to {current} A (caught InverterError)') + logger.exception(e) + + def _next_current(self): + if self.current_change_direction == CurrentChangeDirection.UP: + self.active_current_idx += 1 + if self.active_current_idx == len(self.currents)-1: + logger.info('_next_current: charging current power direction to DOWN') + self.current_change_direction = CurrentChangeDirection.DOWN + self.next_current_enter_time = 0 + else: + if self.active_current_idx == 0: + raise IndexError('can\'t go lower') + self.active_current_idx -= 1 + + logger.info(f'_next_current: active_current_idx set to {self.active_current_idx}, returning current of {self.currents[self.active_current_idx]} A') + return self.currents[self.active_current_idx] + + def low_voltage_program(self, v: float, load_watts: int): + crit_level = cfg.vcrit + low_level = cfg.vlow + + if v <= crit_level: + state = BatteryState.CRITICAL + elif v <= low_level: + state = BatteryState.LOW + else: + state = BatteryState.NORMAL + + if state != self.battery_state: + self.battery_state = state + self.battery_event_handler(state, v, load_watts) + + def set_charging_event_handler(self, handler: Callable): + self.charging_event_handler = handler + + def set_battery_event_handler(self, handler: Callable): + self.battery_event_handler = handler + + def set_error_handler(self, handler: Callable): + self.error_handler = handler + + def stop(self): + self.interrupted = True + + def dump_status(self) -> dict: + return { + 'interrupted': self.interrupted, + 'currents': self.currents, + 'active_current': self.active_current, + 'current_change_direction': self.current_change_direction.name, + 'battery_state': self.battery_state.name, + 'charging_state': self.charging_state.name, + 'mostly_charged': self.mostly_charged, + 'floating_stopwatch_paused': self.floating_stopwatch.is_paused(), + 'floating_stopwatch_elapsed': self.floating_stopwatch.get_elapsed_time(), + 'time_now': time.time(), + 'next_current_enter_time': self.next_current_enter_time, + } diff --git a/src/home/inverter/util.py b/src/home/inverter/util.py new file mode 100644 index 0000000..a577e6a --- /dev/null +++ b/src/home/inverter/util.py @@ -0,0 +1,8 @@ +import re + + +def beautify_table(s): + lines = s.split('\n') + lines = list(map(lambda line: re.sub(r'\s+', ' ', line), lines)) + lines = list(map(lambda line: re.sub(r'(.*?): (.*)', r'<b>\1:</b> \2', line), lines)) + return '\n'.join(lines) diff --git a/src/home/mqtt/__init__.py b/src/home/mqtt/__init__.py new file mode 100644 index 0000000..c0ef9ba --- /dev/null +++ b/src/home/mqtt/__init__.py @@ -0,0 +1,2 @@ +from .mqtt import MQTTBase +from .util import poll_tick diff --git a/src/home/mqtt/message/__init__.py b/src/home/mqtt/message/__init__.py new file mode 100644 index 0000000..2a2221b --- /dev/null +++ b/src/home/mqtt/message/__init__.py @@ -0,0 +1,2 @@ +from .inverter import Status, Generation +from .sensors import Temperature diff --git a/src/home/mqtt/message/inverter.py b/src/home/mqtt/message/inverter.py new file mode 100644 index 0000000..2df17e5 --- /dev/null +++ b/src/home/mqtt/message/inverter.py @@ -0,0 +1,86 @@ +import struct + +from typing import Tuple + + +class Status: + # 46 bytes + format = 'IHHHHHHBHHHHHBHHHHHHHH' + + def pack(self, time: int, data: dict) -> bytes: + bits = 0 + bits |= (data['mppt1_charger_status'] & 0x3) + bits |= (data['mppt2_charger_status'] & 0x3) << 2 + bits |= (data['battery_power_direction'] & 0x3) << 4 + bits |= (data['dc_ac_power_direction'] & 0x3) << 6 + bits |= (data['line_power_direction'] & 0x3) << 8 + bits |= (data['load_connected'] & 0x1) << 10 + + return struct.pack( + self.format, + time, + int(data['grid_voltage'] * 10), + int(data['grid_freq'] * 10), + int(data['ac_output_voltage'] * 10), + int(data['ac_output_freq'] * 10), + data['ac_output_apparent_power'], + data['ac_output_active_power'], + data['output_load_percent'], + int(data['battery_voltage'] * 10), + int(data['battery_voltage_scc'] * 10), + int(data['battery_voltage_scc2'] * 10), + data['battery_discharging_current'], + data['battery_charging_current'], + data['battery_capacity'], + data['inverter_heat_sink_temp'], + data['mppt1_charger_temp'], + data['mppt2_charger_temp'], + data['pv1_input_power'], + data['pv2_input_power'], + int(data['pv1_input_voltage'] * 10), + int(data['pv2_input_voltage'] * 10), + bits + ) + + def unpack(self, buf: bytes) -> Tuple[int, dict]: + data = struct.unpack(self.format, buf) + return data[0], { + 'grid_voltage': data[1] / 10, + 'grid_freq': data[2] / 10, + 'ac_output_voltage': data[3] / 10, + 'ac_output_freq': data[4] / 10, + 'ac_output_apparent_power': data[5], + 'ac_output_active_power': data[6], + 'output_load_percent': data[7], + 'battery_voltage': data[8] / 10, + 'battery_voltage_scc': data[9] / 10, + 'battery_voltage_scc2': data[10] / 10, + 'battery_discharging_current': data[11], + 'battery_charging_current': data[12], + 'battery_capacity': data[13], + 'inverter_heat_sink_temp': data[14], + 'mppt1_charger_temp': data[15], + 'mppt2_charger_temp': data[16], + 'pv1_input_power': data[17], + 'pv2_input_power': data[18], + 'pv1_input_voltage': data[19] / 10, + 'pv2_input_voltage': data[20] / 10, + 'mppt1_charger_status': data[21] & 0x03, + 'mppt2_charger_status': (data[21] >> 2) & 0x03, + 'battery_power_direction': (data[21] >> 4) & 0x03, + 'dc_ac_power_direction': (data[21] >> 6) & 0x03, + 'line_power_direction': (data[21] >> 8) & 0x03, + 'load_connected': (data[21] >> 10) & 0x01, + } + + +class Generation: + # 8 bytes + format = 'II' + + def pack(self, time: int, wh: int) -> bytes: + return struct.pack(self.format, int(time), wh) + + def unpack(self, buf: bytes) -> tuple: + data = struct.unpack(self.format, buf) + return tuple(data) diff --git a/src/home/mqtt/message/sensors.py b/src/home/mqtt/message/sensors.py new file mode 100644 index 0000000..ee522f0 --- /dev/null +++ b/src/home/mqtt/message/sensors.py @@ -0,0 +1,19 @@ +import struct + +from typing import Tuple + + +class Temperature: + format = 'IhH' + + def pack(self, time: int, temp: float, rh: float) -> bytes: + return struct.pack( + self.format, + time, + int(temp*100), + int(rh*100) + ) + + def unpack(self, buf: bytes) -> Tuple[int, float, float]: + data = struct.unpack(self.format, buf) + return data[0], data[1]/100, data[2]/100 diff --git a/src/home/mqtt/mqtt.py b/src/home/mqtt/mqtt.py new file mode 100644 index 0000000..b360d22 --- /dev/null +++ b/src/home/mqtt/mqtt.py @@ -0,0 +1,61 @@ +import os.path +import paho.mqtt.client as mqtt +import ssl +import logging + +from typing import Tuple +from ..config import config + +logger = logging.getLogger(__name__) + + +def username_and_password() -> Tuple[str, str]: + username = config['mqtt']['username'] if 'username' in config['mqtt'] else None + password = config['mqtt']['password'] if 'password' in config['mqtt'] else None + return username, password + + +class MQTTBase: + def __init__(self, clean_session=True): + self.client = mqtt.Client(client_id=config['mqtt']['client_id'], + protocol=mqtt.MQTTv311, + clean_session=clean_session) + self.client.on_connect = self.on_connect + self.client.on_disconnect = self.on_disconnect + self.client.on_message = self.on_message + + self.home_id = 1 + + username, password = username_and_password() + if username and password: + self.client.username_pw_set(username, password) + + def configure_tls(self): + ca_certs = os.path.realpath(os.path.join( + os.path.dirname(os.path.realpath(__file__)), + '..', + '..', + '..', + 'assets', + 'mqtt_ca.crt' + )) + self.client.tls_set(ca_certs=ca_certs, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2) + + def connect_and_loop(self, loop_forever=True): + host = config['mqtt']['host'] + port = config['mqtt']['port'] + + self.client.connect(host, port, 60) + if loop_forever: + self.client.loop_forever() + else: + self.client.loop_start() + + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + logger.info("Connected with result code " + str(rc)) + + def on_disconnect(self, client: mqtt.Client, userdata, rc): + logger.info("Disconnected with result code " + str(rc)) + + def on_message(self, client: mqtt.Client, userdata, msg): + logger.info(msg.topic + ": " + str(msg.payload)) diff --git a/src/home/mqtt/util.py b/src/home/mqtt/util.py new file mode 100644 index 0000000..f71ffd8 --- /dev/null +++ b/src/home/mqtt/util.py @@ -0,0 +1,8 @@ +import time + + +def poll_tick(freq): + t = time.time() + while True: + t += freq + yield max(t - time.time(), 0) diff --git a/src/home/relay/__init__.py b/src/home/relay/__init__.py new file mode 100644 index 0000000..f1568be --- /dev/null +++ b/src/home/relay/__init__.py @@ -0,0 +1,16 @@ +import importlib + +__all__ = ['RelayClient', 'RelayServer'] + + +def __getattr__(name): + _map = { + 'RelayClient': '.client', + 'RelayServer': '.server' + } + + if name in __all__: + module = importlib.import_module(_map[name], __name__) + return getattr(module, name) + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/home/relay/__init__.pyi b/src/home/relay/__init__.pyi new file mode 100644 index 0000000..94341f6 --- /dev/null +++ b/src/home/relay/__init__.pyi @@ -0,0 +1,2 @@ +from .client import RelayClient as RelayClient +from .server import RelayServer as RelayServer diff --git a/src/home/relay/client.py b/src/home/relay/client.py new file mode 100644 index 0000000..8c8d6c4 --- /dev/null +++ b/src/home/relay/client.py @@ -0,0 +1,39 @@ +import socket + + +class RelayClient: + def __init__(self, port=8307, host='127.0.0.1'): + self._host = host + self._port = port + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + def __del__(self): + self.sock.close() + + def connect(self): + self.sock.connect((self._host, self._port)) + + def _write(self, line): + self.sock.sendall((line+'\r\n').encode()) + + def _read(self): + buf = bytearray() + while True: + buf.extend(self.sock.recv(256)) + if b'\r\n' in buf: + break + + response = buf.decode().strip() + return response + + def on(self): + self._write('on') + return self._read() + + def off(self): + self._write('off') + return self._read() + + def status(self): + self._write('get') + return self._read() diff --git a/src/home/relay/server.py b/src/home/relay/server.py new file mode 100644 index 0000000..1f33969 --- /dev/null +++ b/src/home/relay/server.py @@ -0,0 +1,82 @@ +import asyncio +import logging + +from pyA20.gpio import gpio +from pyA20.gpio import port as gpioport +from ..util import Addr + +logger = logging.getLogger(__name__) + + +class RelayServer: + OFF = 1 + ON = 0 + + def __init__(self, + pinname: str, + addr: Addr): + if not hasattr(gpioport, pinname): + raise ValueError(f'invalid pin {pinname}') + + self.pin = getattr(gpioport, pinname) + self.addr = addr + + gpio.init() + gpio.setcfg(self.pin, gpio.OUTPUT) + + self.lock = asyncio.Lock() + + def run(self): + asyncio.run(self.run_server()) + + async def relay_set(self, value): + async with self.lock: + gpio.output(self.pin, value) + + async def relay_get(self): + async with self.lock: + return int(gpio.input(self.pin)) == RelayServer.ON + + async def handle_client(self, reader, writer): + request = None + while request != 'quit': + try: + request = await reader.read(255) + if request == b'\x04': + break + request = request.decode('utf-8').strip() + except Exception: + break + + data = 'unknown' + if request == 'on': + await self.relay_set(RelayServer.ON) + logger.debug('set on') + data = 'ok' + + elif request == 'off': + await self.relay_set(RelayServer.OFF) + logger.debug('set off') + data = 'ok' + + elif request == 'get': + status = await self.relay_get() + data = 'on' if status is True else 'off' + + writer.write((data + '\r\n').encode('utf-8')) + try: + await writer.drain() + except ConnectionError: + break + + try: + writer.close() + except ConnectionError: + pass + + async def run_server(self): + host, port = self.addr + server = await asyncio.start_server(self.handle_client, host, port) + async with server: + logger.info('Server started.') + await server.serve_forever() diff --git a/src/home/sound/__init__.py b/src/home/sound/__init__.py new file mode 100644 index 0000000..43ddaff --- /dev/null +++ b/src/home/sound/__init__.py @@ -0,0 +1,8 @@ +from .node_client import SoundNodeClient +from .record import ( + RecordStatus, + RecordingNotFoundError, + Recorder, +) +from .storage import RecordStorage, RecordFile +from .record_client import RecordClient diff --git a/src/home/sound/amixer.py b/src/home/sound/amixer.py new file mode 100644 index 0000000..0ab2c64 --- /dev/null +++ b/src/home/sound/amixer.py @@ -0,0 +1,91 @@ +import subprocess + +from ..config import config +from threading import Lock +from typing import Union + + +_lock = Lock() +_default_step = 5 + + +def has_control(s: str) -> bool: + for control in config['amixer']['controls']: + if control['name'] == s: + return True + return False + + +def get_caps(s: str) -> list[str]: + for control in config['amixer']['controls']: + if control['name'] == s: + return control['caps'] + raise KeyError(f'control {s} not found') + + +def get_all() -> list: + controls = [] + for control in config['amixer']['controls']: + controls.append({ + 'name': control['name'], + 'info': get(control['name']), + 'caps': control['caps'] + }) + return controls + + +def get(control: str): + return call('get', control) + + +def mute(control): + return call('set', control, 'mute') + + +def unmute(control): + return call('set', control, 'unmute') + + +def cap(control): + return call('set', control, 'cap') + + +def nocap(control): + return call('set', control, 'nocap') + + +def _get_default_step() -> int: + if 'step' in config['amixer']: + return int(config['amixer']['step']) + + return _default_step + + +def incr(control, step=None): + if step is None: + step = _get_default_step() + return call('set', control, f'{step}%+') + + +def decr(control, step=None): + if step is None: + step = _get_default_step() + return call('set', control, f'{step}%-') + + +def call(*args, return_code=False) -> Union[int, str]: + with _lock: + result = subprocess.run([config['amixer']['bin'], *args], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if return_code: + return result.returncode + + if result.returncode != 0: + raise AmixerError(result.stderr.decode().strip()) + + return result.stdout.decode().strip() + + +class AmixerError(OSError): + pass diff --git a/src/home/sound/node_client.py b/src/home/sound/node_client.py new file mode 100644 index 0000000..7341208 --- /dev/null +++ b/src/home/sound/node_client.py @@ -0,0 +1,109 @@ +import requests +import logging +import shutil + +from ..util import Addr +from ..api.errors import ApiResponseError +from typing import Optional, Union +from .record import RecordFile + + +class SoundNodeClient: + def __init__(self, addr: Addr): + self.endpoint = f'http://{addr[0]}:{addr[1]}' + self.logger = logging.getLogger(self.__class__.__name__) + + def amixer_get_all(self): + return self._call('amixer/get-all/') + + def amixer_get(self, control: str): + return self._call(f'amixer/get/{control}/') + + def amixer_incr(self, control: str, step: Optional[int] = None): + params = {'step': step} if step is not None else None + return self._call(f'amixer/incr/{control}/', params=params) + + def amixer_decr(self, control: str, step: Optional[int] = None): + params = {'step': step} if step is not None else None + return self._call(f'amixer/decr/{control}/', params=params) + + def amixer_mute(self, control: str): + return self._call(f'amixer/mute/{control}/') + + def amixer_unmute(self, control: str): + return self._call(f'amixer/unmute/{control}/') + + def amixer_cap(self, control: str): + return self._call(f'amixer/cap/{control}/') + + def amixer_nocap(self, control: str): + return self._call(f'amixer/nocap/{control}/') + + def record(self, duration: int): + return self._call('record/', params={"duration": duration}) + + def record_info(self, record_id: int): + return self._call(f'record/info/{record_id}/') + + def record_forget(self, record_id: int): + return self._call(f'record/forget/{record_id}/') + + def record_download(self, record_id: int, output: str): + return self._call(f'record/download/{record_id}/', save_to=output) + + def storage_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]: + r = self._call('storage/list/', params={'extended': int(extended)}) + files = r['files'] + if as_objects: + return self.record_list_from_serialized(files) + return files + + @staticmethod + def record_list_from_serialized(files: Union[list[str], list[dict]]): + new_files = [] + for f in files: + kwargs = {'remote': True} + if isinstance(f, dict): + name = f['filename'] + kwargs['remote_filesize'] = f['filesize'] + else: + name = f + item = RecordFile(name, **kwargs) + new_files.append(item) + return new_files + + def storage_delete(self, file_id: str): + return self._call('storage/delete/', params={'file_id': file_id}) + + def storage_download(self, file_id: str, output: str): + return self._call('storage/download/', params={'file_id': file_id}, save_to=output) + + def _call(self, + method: str, + params: dict = None, + save_to: Optional[str] = None): + + kwargs = {} + if isinstance(params, dict): + kwargs['params'] = params + if save_to: + kwargs['stream'] = True + + url = f'{self.endpoint}/{method}' + self.logger.debug(f'calling {url}, kwargs: {kwargs}') + + r = requests.get(url, **kwargs) + if r.status_code != 200: + response = r.json() + raise ApiResponseError(status_code=r.status_code, + error_type=response['error'], + error_message=response['message'] or None, + error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None) + + if save_to: + r.raise_for_status() + with open(save_to, 'wb') as f: + shutil.copyfileobj(r.raw, f) + return True + + return r.json()['response'] diff --git a/src/home/sound/record.py b/src/home/sound/record.py new file mode 100644 index 0000000..1ad8827 --- /dev/null +++ b/src/home/sound/record.py @@ -0,0 +1,400 @@ +import threading +import time +import subprocess +import signal +import os +import logging + +from enum import Enum, auto +from typing import Optional +from ..config import config +from ..util import find_child_processes +from .storage import RecordFile, RecordStorage + + +_history_item_timeout = 7200 +_history_cleanup_freq = 3600 + + +class RecordStatus(Enum): + WAITING = auto() + RECORDING = auto() + FINISHED = auto() + ERROR = auto() + + +class RecordHistoryItem: + id: int + request_time: float + start_time: float + stop_time: float + relations: list[int] + status: RecordStatus + error: Optional[Exception] + file: Optional[RecordFile] + creation_time: float + + def __init__(self, id): + self.id = id + self.request_time = 0 + self.start_time = 0 + self.stop_time = 0 + self.relations = [] + self.status = RecordStatus.WAITING + self.file = None + self.error = None + self.creation_time = time.time() + + def add_relation(self, related_id: int): + self.relations.append(related_id) + + def mark_started(self, start_time: float): + self.start_time = start_time + self.status = RecordStatus.RECORDING + + def mark_finished(self, end_time: float, file: RecordFile): + self.stop_time = end_time + self.file = file + self.status = RecordStatus.FINISHED + + def mark_failed(self, error: Exception): + self.status = RecordStatus.ERROR + self.error = error + + def as_dict(self) -> dict: + data = { + 'id': self.id, + 'request_time': self.request_time, + 'status': self.status.value, + 'relations': self.relations, + 'start_time': self.start_time, + 'stop_time': self.stop_time, + } + if self.error: + data['error'] = str(self.error) + if self.file: + data['file'] = self.file.__dict__() + return data + + +class RecordingNotFoundError(Exception): + pass + + +class RecordHistory: + history: dict[int, RecordHistoryItem] + + def __init__(self): + self.history = {} + self.logger = logging.getLogger(self.__class__.__name__) + + def add(self, record_id: int): + self.logger.debug(f'add: record_id={record_id}') + + r = RecordHistoryItem(record_id) + r.request_time = time.time() + + self.history[record_id] = r + + def delete(self, record_id: int): + self.logger.debug(f'delete: record_id={record_id}') + del self.history[record_id] + + def cleanup(self): + del_ids = [] + for rid, item in self.history.items(): + if item.creation_time < time.time()-_history_item_timeout: + del_ids.append(rid) + for rid in del_ids: + self.delete(rid) + + def __getitem__(self, key): + if key not in self.history: + raise RecordingNotFoundError() + + return self.history[key] + + def __setitem__(self, key, value): + raise NotImplementedError('setting history item this way is prohibited') + + def __contains__(self, key): + return key in self.history + + +class Recording: + start_time: float + stop_time: float + duration: int + record_id: int + arecord_pid: Optional[int] + process: Optional[subprocess.Popen] + + g_record_id = 1 + + def __init__(self): + self.start_time = 0 + self.stop_time = 0 + self.duration = 0 + self.process = None + self.arecord_pid = None + self.record_id = Recording.next_id() + self.logger = logging.getLogger(self.__class__.__name__) + + def is_started(self) -> bool: + return self.start_time > 0 and self.stop_time > 0 + + def is_waiting(self): + return self.duration > 0 + + def ask_for(self, duration) -> int: + overtime = 0 + orig_duration = duration + + if self.is_started(): + already_passed = time.time() - self.start_time + max_duration = Recorder.get_max_record_time() - already_passed + self.logger.debug(f'ask_for({orig_duration}): recording is in progress, already passed {already_passed}s, max_duration set to {max_duration}') + else: + max_duration = Recorder.get_max_record_time() + + if duration > max_duration: + overtime = duration - max_duration + duration = max_duration + + self.logger.debug(f'ask_for({orig_duration}): requested duration ({orig_duration}) is greater than max ({max_duration}), overtime is {overtime}') + + self.duration += duration + if self.is_started(): + til_end = self.stop_time - time.time() + if til_end < 0: + til_end = 0 + + _prev_stop_time = self.stop_time + _to_add = duration - til_end + if _to_add < 0: + _to_add = 0 + + self.stop_time += _to_add + self.logger.debug(f'ask_for({orig_duration}): adding {_to_add} to stop_time (before: {_prev_stop_time}, after: {self.stop_time})') + + return overtime + + def start(self, output: str): + assert self.start_time == 0 and self.stop_time == 0, "already started?!" + assert self.process is None, "self.process is not None, what the hell?" + + cur = time.time() + self.start_time = cur + self.stop_time = cur + self.duration + + arecord = config['arecord']['bin'] + lame = config['lame']['bin'] + b = config['lame']['bitrate'] + + cmd = f'{arecord} -f S16 -r 44100 -t raw 2>/dev/null | {lame} -r -s 44.1 -b {b} -m m - {output} >/dev/null 2>/dev/null' + self.logger.debug(f'start: running `{cmd}`') + self.process = subprocess.Popen(cmd, shell=True, stdin=None, stdout=None, stderr=None, close_fds=True) + + sh_pid = self.process.pid + self.logger.debug(f'start: started, pid of shell is {sh_pid}') + + arecord_pid = self.find_arecord_pid(sh_pid) + if arecord_pid is not None: + self.arecord_pid = arecord_pid + self.logger.debug(f'start: pid of arecord is {arecord_pid}') + + def stop(self): + if self.process: + if self.arecord_pid is None: + self.arecord_pid = self.find_arecord_pid(self.process.pid) + + if self.arecord_pid is not None: + os.kill(self.arecord_pid, signal.SIGINT) + timeout = config['node']['process_wait_timeout'] + + self.logger.debug(f'stop: sent SIGINT to {self.arecord_pid}. now waiting up to {timeout} seconds...') + try: + self.process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + self.logger.warning(f'stop: wait({timeout}): timeout expired, calling terminate()') + self.process.terminate() + else: + self.logger.warning('stop: pid of arecord is unknown, calling terminate()') + self.process.terminate() + + rc = self.process.returncode + self.logger.debug(f'stop: rc={rc}') + + self.process = None + self.arecord_pid = 0 + + self.duration = 0 + self.start_time = 0 + self.stop_time = 0 + + def find_arecord_pid(self, sh_pid: int): + try: + children = find_child_processes(sh_pid) + except OSError as exc: + self.logger.warning(f'failed to find child process of {sh_pid}: ' + str(exc)) + return None + + for child in children: + if 'arecord' in child.cmd: + return child.pid + + return None + + @staticmethod + def next_id() -> int: + cur_id = Recording.g_record_id + Recording.g_record_id += 1 + return cur_id + + def increment_id(self): + self.record_id = Recording.next_id() + + +class Recorder: + interrupted: bool + lock: threading.Lock + history_lock: threading.Lock + recording: Optional[Recording] + overtime: int + history: RecordHistory + next_history_cleanup_time: float + storage: RecordStorage + + def __init__(self, storage: RecordStorage): + self.storage = storage + self.recording = Recording() + self.interrupted = False + self.lock = threading.Lock() + self.history_lock = threading.Lock() + self.overtime = 0 + self.history = RecordHistory() + self.next_history_cleanup_time = 0 + self.logger = logging.getLogger(self.__class__.__name__) + + def start_thread(self): + t = threading.Thread(target=self.loop) + t.daemon = True + t.start() + + def loop(self) -> None: + tempname = os.path.join(self.storage.root, 'temp.mp3') + + while not self.interrupted: + cur = time.time() + stopped = False + cur_record_id = None + + if self.next_history_cleanup_time == 0: + self.next_history_cleanup_time = time.time() + _history_cleanup_freq + elif self.next_history_cleanup_time <= time.time(): + self.logger.debug('loop: calling history.cleanup()') + try: + self.history.cleanup() + except Exception as e: + self.logger.error('loop: error while history.cleanup(): ' + str(e)) + self.next_history_cleanup_time = time.time() + _history_cleanup_freq + + with self.lock: + cur_record_id = self.recording.record_id + # self.logger.debug(f'cur_record_id={cur_record_id}') + + if not self.recording.is_started(): + if self.recording.is_waiting(): + try: + if os.path.exists(tempname): + self.logger.warning(f'loop: going to start new recording, but {tempname} still exists, unlinking..') + try: + os.unlink(tempname) + except OSError as e: + self.logger.exception(e) + self.recording.start(tempname) + with self.history_lock: + self.history[cur_record_id].mark_started(self.recording.start_time) + except Exception as exc: + self.logger.exception(exc) + + # there should not be any errors, but still.. + try: + self.recording.stop() + except Exception as exc: + self.logger.exception(exc) + + with self.history_lock: + self.history[cur_record_id].mark_failed(exc) + + self.logger.debug(f'loop: start exc path: calling increment_id()') + self.recording.increment_id() + else: + if cur >= self.recording.stop_time: + try: + start_time = self.recording.start_time + stop_time = self.recording.stop_time + self.recording.stop() + + saved_name = self.storage.save(tempname, + record_id=cur_record_id, + start_time=int(start_time), + stop_time=int(stop_time)) + + with self.history_lock: + self.history[cur_record_id].mark_finished(stop_time, saved_name) + except Exception as exc: + self.logger.exception(exc) + with self.history_lock: + self.history[cur_record_id].mark_failed(exc) + finally: + self.logger.debug(f'loop: stop exc final path: calling increment_id()') + self.recording.increment_id() + + stopped = True + + if stopped and self.overtime > 0: + self.logger.info(f'recording {cur_record_id} is stopped, but we\'ve got overtime ({self.overtime})') + _overtime = self.overtime + self.overtime = 0 + + related_id = self.record(_overtime) + self.logger.info(f'enqueued another record with id {related_id}') + + if cur_record_id is not None: + with self.history_lock: + self.history[cur_record_id].add_relation(related_id) + + time.sleep(0.2) + + def record(self, duration: int) -> int: + self.logger.debug(f'record: duration={duration}') + with self.lock: + overtime = self.recording.ask_for(duration) + self.logger.debug(f'overtime={overtime}') + + if overtime > self.overtime: + self.overtime = overtime + + if not self.recording.is_started(): + with self.history_lock: + self.history.add(self.recording.record_id) + + return self.recording.record_id + + def stop(self): + self.interrupted = True + + def get_info(self, record_id: int) -> RecordHistoryItem: + with self.history_lock: + return self.history[record_id] + + def forget(self, record_id: int): + with self.history_lock: + self.logger.info(f'forget: removing record {record_id} from history') + self.history.delete(record_id) + + @staticmethod + def get_max_record_time() -> int: + return config['node']['record_max_time'] + diff --git a/src/home/sound/record_client.py b/src/home/sound/record_client.py new file mode 100644 index 0000000..2744a8c --- /dev/null +++ b/src/home/sound/record_client.py @@ -0,0 +1,142 @@ +import time +import logging +import threading +import os.path + +from tempfile import gettempdir +from .record import RecordStatus +from .node_client import SoundNodeClient +from ..util import Addr +from typing import Optional, Callable + + +class RecordClient: + interrupted: bool + logger: logging.Logger + clients: dict[str, SoundNodeClient] + awaiting: dict[str, dict[int, Optional[dict]]] + error_handler: Optional[Callable] + finished_handler: Optional[Callable] + download_on_finish: bool + + def __init__(self, + nodes: dict[str, Addr], + error_handler: Optional[Callable] = None, + finished_handler: Optional[Callable] = None, + download_on_finish=False): + self.interrupted = False + self.logger = logging.getLogger(self.__class__.__name__) + self.clients = {} + self.awaiting = {} + self.download_on_finish = download_on_finish + + self.error_handler = error_handler + self.finished_handler = finished_handler + + self.awaiting_lock = threading.Lock() + + for node, addr in nodes.items(): + self.clients[node] = SoundNodeClient(addr) + self.awaiting[node] = {} + + try: + t = threading.Thread(target=self.loop) + t.daemon = True + t.start() + except (KeyboardInterrupt, SystemExit) as exc: + self.stop() + self.logger.exception(exc) + + def stop(self): + self.interrupted = True + + def loop(self): + while not self.interrupted: + # self.logger.debug('loop: tick') + + for node in self.awaiting.keys(): + with self.awaiting_lock: + record_ids = list(self.awaiting[node].keys()) + if not record_ids: + continue + + self.logger.debug(f'loop: node `{node}` awaiting list: {record_ids}') + + cl = self.getclient(node) + del_ids = [] + for rid in record_ids: + info = cl.record_info(rid) + + if info['relations']: + for relid in info['relations']: + self.wait_for_record(node, relid, self.awaiting[node][rid], is_relative=True) + + status = RecordStatus(info['status']) + if status in (RecordStatus.FINISHED, RecordStatus.ERROR): + if status == RecordStatus.FINISHED: + if self.download_on_finish: + local_fn = self.download(node, rid, info['file']['fileid']) + else: + local_fn = None + self._report_finished(info, local_fn, self.awaiting[node][rid]) + else: + self._report_error(info, self.awaiting[node][rid]) + del_ids.append(rid) + self.logger.debug(f'record {rid}: status {status}') + + if del_ids: + self.logger.debug(f'deleting {del_ids} from {node}\'s awaiting list') + with self.awaiting_lock: + for del_id in del_ids: + del self.awaiting[node][del_id] + + time.sleep(5) + + self.logger.info('loop ended') + + def getclient(self, node: str): + return self.clients[node] + + def record(self, + node: str, + duration: int, + userdata: Optional[dict] = None) -> int: + self.logger.debug(f'record: node={node}, duration={duration}, userdata={userdata}') + + cl = self.getclient(node) + record_id = cl.record(duration)['id'] + self.logger.debug(f'record: request sent, record_id={record_id}') + + self.wait_for_record(node, record_id, userdata) + return record_id + + def wait_for_record(self, + node: str, + record_id: int, + userdata: Optional[dict] = None, + is_relative=False): + with self.awaiting_lock: + if record_id not in self.awaiting[node]: + msg = f'wait_for_record: adding {record_id} to {node}' + if is_relative: + msg += ' (by relation)' + self.logger.debug(msg) + + self.awaiting[node][record_id] = userdata + + def download(self, node: str, record_id: int, fileid: str): + dst = os.path.join(gettempdir(), f'{node}_{fileid}.mp3') + cl = self.getclient(node) + cl.record_download(record_id, dst) + return dst + + def forget(self, node: str, rid: int): + self.getclient(node).record_forget(rid) + + def _report_finished(self, *args): + if self.finished_handler: + self.finished_handler(*args) + + def _report_error(self, *args): + if self.error_handler: + self.error_handler(*args) diff --git a/src/home/sound/storage.py b/src/home/sound/storage.py new file mode 100644 index 0000000..c61f6f6 --- /dev/null +++ b/src/home/sound/storage.py @@ -0,0 +1,155 @@ +import os +import re +import shutil +import logging + +from typing import Optional, Union +from datetime import datetime +from ..util import strgen + +logger = logging.getLogger(__name__) + + +class RecordFile: + start_time: Optional[datetime] + stop_time: Optional[datetime] + record_id: Optional[int] + name: str + file_id: Optional[str] + remote: bool + remote_filesize: int + storage_root: str + + human_date_dmt = '%d.%m.%y' + human_time_fmt = '%H:%M:%S' + + def __init__(self, filename: str, remote=False, remote_filesize=None, storage_root='/'): + self.name = filename + self.storage_root = storage_root + + self.remote = remote + self.remote_filesize = remote_filesize + + m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.mp3$', filename) + if m: + self.start_time = datetime.strptime(m.group(1), RecordStorage.time_fmt) + self.stop_time = datetime.strptime(m.group(2), RecordStorage.time_fmt) + self.record_id = int(m.group(3)) + self.file_id = (m.group(1) + '_' + m.group(2)).replace('-', '_') + else: + logger.warning(f'unexpected filename: {filename}') + self.start_time = None + self.stop_time = None + self.record_id = None + self.file_id = None + + @property + def path(self): + if self.remote: + return RuntimeError('remote recording, can\'t get real path') + + return os.path.realpath(os.path.join( + self.storage_root, self.name + )) + + @property + def start_humantime(self) -> str: + if self.start_time is None: + return '?' + fmt = f'{RecordFile.human_date_dmt} {RecordFile.human_time_fmt}' + return self.start_time.strftime(fmt) + + @property + def stop_humantime(self) -> str: + if self.stop_time is None: + return '?' + fmt = RecordFile.human_time_fmt + if self.start_time.date() != self.stop_time.date(): + fmt = f'{RecordFile.human_date_dmt} {fmt}' + return self.stop_time.strftime(fmt) + + @property + def start_unixtime(self) -> int: + if self.start_time is None: + return 0 + return int(self.start_time.timestamp()) + + @property + def stop_unixtime(self) -> int: + if self.stop_time is None: + return 0 + return int(self.stop_time.timestamp()) + + @property + def filesize(self): + if self.remote: + if self.remote_filesize is None: + raise RuntimeError('file is remote and remote_filesize is not set') + return self.remote_filesize + return os.path.getsize(self.path) + + def __dict__(self) -> dict: + return { + 'start_unixtime': self.start_unixtime, + 'stop_unixtime': self.stop_unixtime, + 'filename': self.name, + 'filesize': self.filesize, + 'fileid': self.file_id, + 'record_id': self.record_id or 0, + } + + +class RecordStorage: + time_fmt = '%d%m%y-%H%M%S' + + def __init__(self, root: str): + self.root = root + + def getfiles(self, as_objects=False) -> Union[list[str], list[RecordFile]]: + files = [] + for name in os.listdir(self.root): + path = os.path.join(self.root, name) + if os.path.isfile(path) and name.endswith('.mp3'): + files.append(name if not as_objects else RecordFile(name, storage_root=self.root)) + return files + + def find(self, file_id: str) -> Optional[RecordFile]: + for name in os.listdir(self.root): + if os.path.isfile(os.path.join(self.root, name)) and name.endswith('.mp3'): + item = RecordFile(name, storage_root=self.root) + if item.file_id == file_id: + return item + return None + + def purge(self): + files = self.getfiles() + if files: + logger = logging.getLogger(self.__name__) + for f in files: + try: + path = os.path.join(self.root, f) + logger.debug(f'purge: deleting {path}') + os.unlink(path) + except OSError as exc: + logger.exception(exc) + + def delete(self, file: RecordFile): + os.unlink(file.path) + + def save(self, + fn: str, + record_id: int, + start_time: int, + stop_time: int) -> RecordFile: + + start_time_s = datetime.fromtimestamp(start_time).strftime(self.time_fmt) + stop_time_s = datetime.fromtimestamp(stop_time).strftime(self.time_fmt) + + dst_fn = f'{start_time_s}_{stop_time_s}_id{record_id}' + if os.path.exists(os.path.join(self.root, dst_fn)): + dst_fn += strgen(4) + dst_fn += '.mp3' + dst_path = os.path.join(self.root, dst_fn) + + shutil.move(fn, dst_path) + return RecordFile(dst_fn, storage_root=self.root) diff --git a/src/home/soundsensor/__init__.py b/src/home/soundsensor/__init__.py new file mode 100644 index 0000000..30052f8 --- /dev/null +++ b/src/home/soundsensor/__init__.py @@ -0,0 +1,22 @@ +import importlib + +__all__ = [ + 'SoundSensorNode', + 'SoundSensorHitHandler', + 'SoundSensorServer', + 'SoundSensorServerGuardClient' +] + + +def __getattr__(name): + if name in __all__: + if name == 'SoundSensorNode': + file = 'node' + elif name == 'SoundSensorServerGuardClient': + file = 'server_client' + else: + file = 'server' + module = importlib.import_module(f'.{file}', __name__) + return getattr(module, name) + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/home/soundsensor/__init__.pyi b/src/home/soundsensor/__init__.pyi new file mode 100644 index 0000000..cb34972 --- /dev/null +++ b/src/home/soundsensor/__init__.pyi @@ -0,0 +1,8 @@ +from .server import ( + SoundSensorHitHandler as SoundSensorHitHandler, + SoundSensorServer as SoundSensorServer, +) +from .server_client import ( + SoundSensorServerGuardClient as SoundSensorServerGuardClient +) +from .node import SoundSensorNode as SoundSensorNode diff --git a/src/home/soundsensor/node.py b/src/home/soundsensor/node.py new file mode 100644 index 0000000..b4b8fbc --- /dev/null +++ b/src/home/soundsensor/node.py @@ -0,0 +1,73 @@ +import logging +import threading + +from typing import Optional +from time import sleep +from ..util import stringify, send_datagram, Addr + +from pyA20.gpio import gpio +from pyA20.gpio import port as gpioport + +logger = logging.getLogger(__name__) + + +class SoundSensorNode: + def __init__(self, + name: str, + pinname: str, + server_addr: Optional[Addr], + delay=0.005): + + if not hasattr(gpioport, pinname): + raise ValueError(f'invalid pin {pinname}') + + self.pin = getattr(gpioport, pinname) + self.name = name + self.delay = delay + + self.server_addr = server_addr + + self.hits = 0 + self.hitlock = threading.Lock() + + self.interrupted = False + + def run(self): + try: + t = threading.Thread(target=self.sensor_reader) + t.daemon = True + t.start() + + while True: + with self.hitlock: + hits = self.hits + self.hits = 0 + + if hits > 0: + try: + if self.server_addr is not None: + send_datagram(stringify([self.name, hits]), self.server_addr) + else: + logger.debug(f'server reporting disabled, skipping reporting {hits} hits') + except OSError as exc: + logger.exception(exc) + + sleep(1) + + except (KeyboardInterrupt, SystemExit) as e: + self.interrupted = True + logger.info(str(e)) + + def sensor_reader(self): + gpio.init() + gpio.setcfg(self.pin, gpio.INPUT) + gpio.pullup(self.pin, gpio.PULLUP) + + while not self.interrupted: + state = gpio.input(self.pin) + sleep(self.delay) + + if not state: + with self.hitlock: + logger.debug('got a hit') + self.hits += 1 diff --git a/src/home/soundsensor/server.py b/src/home/soundsensor/server.py new file mode 100644 index 0000000..490fc36 --- /dev/null +++ b/src/home/soundsensor/server.py @@ -0,0 +1,125 @@ +import asyncio +import json +import logging +import threading + +from ..config import config +from aiohttp import web +from aiohttp.web_exceptions import ( + HTTPNotFound +) + +from typing import Type +from ..util import Addr, stringify, format_tb + +logger = logging.getLogger(__name__) + + +class SoundSensorHitHandler(asyncio.DatagramProtocol): + def datagram_received(self, data, addr): + try: + data = json.loads(data) + except json.JSONDecodeError as e: + logger.error('failed to parse json datagram') + logger.exception(e) + return + + try: + name, hits = data + except (ValueError, IndexError) as e: + logger.error('failed to unpack data') + logger.exception(e) + return + + self.handler(name, hits) + + def handler(self, name: str, hits: int): + pass + + +class SoundSensorServer: + def __init__(self, + addr: Addr, + handler_impl: Type[SoundSensorHitHandler]): + self.addr = addr + self.impl = handler_impl + + self._recording_lock = threading.Lock() + self._recording_enabled = True + + if self.guard_control_enabled(): + if 'guard_recording_default' in config['server']: + self._recording_enabled = config['server']['guard_recording_default'] + + def guard_control_enabled(self) -> bool: + return 'guard_control' in config['server'] and config['server']['guard_control'] is True + + def set_recording(self, enabled: bool): + with self._recording_lock: + self._recording_enabled = enabled + + def is_recording_enabled(self) -> bool: + with self._recording_lock: + return self._recording_enabled + + def run(self): + if self.guard_control_enabled(): + t = threading.Thread(target=self.run_guard_server) + t.daemon = True + t.start() + + loop = asyncio.get_event_loop() + t = loop.create_datagram_endpoint(self.impl, local_addr=self.addr) + loop.run_until_complete(t) + loop.run_forever() + + def run_guard_server(self): + routes = web.RouteTableDef() + + def ok(data=None): + if data is None: + data = 1 + response = {'response': data} + return web.json_response(response, dumps=stringify) + + @web.middleware + async def errors_handler_middleware(request, handler): + try: + response = await handler(request) + return response + except HTTPNotFound: + return web.json_response({'error': 'not found'}, status=404) + except Exception as exc: + data = { + 'error': exc.__class__.__name__, + 'message': exc.message if hasattr(exc, 'message') else str(exc) + } + tb = format_tb(exc) + if tb: + data['stacktrace'] = tb + + return web.json_response(data, status=500) + + @routes.post('/guard/enable') + async def guard_enable(request): + self.set_recording(True) + return ok() + + @routes.post('/guard/disable') + async def guard_disable(request): + self.set_recording(False) + return ok() + + @routes.get('/guard/status') + async def guard_status(request): + return ok({'enabled': self.is_recording_enabled()}) + + asyncio.set_event_loop(asyncio.new_event_loop()) # need to create new event loop in new thread + app = web.Application() + app.add_routes(routes) + app.middlewares.append(errors_handler_middleware) + + web.run_app(app, + host=self.addr[0], + port=self.addr[1], + handle_signals=False) # handle_signals=True doesn't work in separate thread diff --git a/src/home/soundsensor/server_client.py b/src/home/soundsensor/server_client.py new file mode 100644 index 0000000..7eef996 --- /dev/null +++ b/src/home/soundsensor/server_client.py @@ -0,0 +1,38 @@ +import requests +import logging + +from ..util import Addr +from ..api.errors import ApiResponseError + + +class SoundSensorServerGuardClient: + def __init__(self, addr: Addr): + self.endpoint = f'http://{addr[0]}:{addr[1]}' + self.logger = logging.getLogger(self.__class__.__name__) + + def guard_enable(self): + return self._call('guard/enable', is_post=True) + + def guard_disable(self): + return self._call('guard/disable', is_post=True) + + def guard_status(self): + return self._call('guard/status') + + def _call(self, + method: str, + is_post=False): + + url = f'{self.endpoint}/{method}' + self.logger.debug(f'calling {url}') + + r = requests.get(url) if not is_post else requests.post(url) + + if r.status_code != 200: + response = r.json() + raise ApiResponseError(status_code=r.status_code, + error_type=response['error'], + error_message=response['message'] or None, + error_stacktrace=response['stacktrace'] if 'stacktrace' in response else None) + + return r.json()['response'] diff --git a/src/home/util.py b/src/home/util.py new file mode 100644 index 0000000..2c43cb0 --- /dev/null +++ b/src/home/util.py @@ -0,0 +1,213 @@ +import json +import socket +import time +import requests +import subprocess +import traceback +import logging +import string +import random + +from .config import config +from datetime import datetime +from typing import Tuple, Optional + +Addr = Tuple[str, int] # network address type (host, port) + +logger = logging.getLogger(__name__) + + +# https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks +def chunks(lst, n): + """Yield successive n-sized chunks from lst.""" + for i in range(0, len(lst), n): + yield lst[i:i + n] + + +def json_serial(obj): + """JSON serializer for datetime objects""" + if isinstance(obj, datetime): + return obj.timestamp() + raise TypeError("Type %s not serializable" % type(obj)) + + +def stringify(v) -> str: + return json.dumps(v, separators=(',', ':'), default=json_serial) + + +def ipv4_valid(ip: str) -> bool: + try: + socket.inet_aton(ip) + return True + except socket.error: + return False + + +def parse_addr(addr: str) -> Addr: + if addr.count(':') != 1: + raise ValueError('invalid host:port format') + + host, port = addr.split(':') + if not ipv4_valid(host): + raise ValueError('invalid ipv4 address') + + port = int(port) + if not 0 <= port <= 65535: + raise ValueError('invalid port') + + return host, port + + +def strgen(n: int): + return ''.join(random.choices(string.ascii_letters + string.digits, k=n)) + + +class MySimpleSocketClient: + host: str + port: int + + def __init__(self, host: str, port: int): + self.host = host + self.port = port + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect((self.host, self.port)) + self.sock.settimeout(5) + + def __del__(self): + self.sock.close() + + def write(self, line: str) -> None: + self.sock.sendall((line + '\r\n').encode()) + + def read(self) -> str: + buf = bytearray() + while True: + buf.extend(self.sock.recv(256)) + if b'\r\n' in buf: + break + + response = buf.decode().strip() + return response + + +def send_datagram(message: str, addr: Addr) -> None: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.sendto(message.encode(), addr) + + +def send_telegram(text: str, + parse_mode: str = None, + disable_web_page_preview: bool = False, + ): + data = { + 'chat_id': config['telegram']['chat_id'], + 'text': text + } + + if parse_mode is not None: + data['parse_mode'] = parse_mode + elif 'parse_mode' in config['telegram']: + data['parse_mode'] = config['telegram']['parse_mode'] + + if disable_web_page_preview or 'disable_web_page_preview' in config['telegram']: + data['disable_web_page_preview'] = 1 + + r = requests.post('https://api.telegram.org/bot%s/sendMessage' % config['telegram']['token'], data=data) + + if r.status_code != 200: + logger.error(r.text) + raise RuntimeError("telegram returned %d" % r.status_code) + + +def format_tb(exc) -> Optional[list[str]]: + tb = traceback.format_tb(exc.__traceback__) + if not tb: + return None + + tb = list(map(lambda s: s.strip(), tb)) + tb.reverse() + if tb[0][-1:] == ':': + tb[0] = tb[0][:-1] + + return tb + + +class ChildProcessInfo: + pid: int + cmd: str + + def __init__(self, + pid: int, + cmd: str): + self.pid = pid + self.cmd = cmd + + +def find_child_processes(ppid: int) -> list[ChildProcessInfo]: + p = subprocess.run(['pgrep', '-P', str(ppid), '--list-full'], capture_output=True) + if p.returncode != 0: + raise OSError(f'pgrep returned {p.returncode}') + + children = [] + + lines = p.stdout.decode().strip().split('\n') + for line in lines: + try: + space_idx = line.index(' ') + except ValueError as exc: + logger.exception(exc) + continue + + pid = int(line[0:space_idx]) + cmd = line[space_idx+1:] + + children.append(ChildProcessInfo(pid, cmd)) + + return children + + +class Stopwatch: + elapsed: float + time_started: Optional[float] + + def __init__(self): + self.elapsed = 0 + self.time_started = None + + def go(self): + if self.time_started is not None: + raise StopwatchError('stopwatch was already started') + + self.time_started = time.time() + + def pause(self): + if self.time_started is None: + raise StopwatchError('stopwatch was paused') + + self.elapsed += time.time() - self.time_started + self.time_started = None + + def get_elapsed_time(self): + elapsed = self.elapsed + if self.time_started is not None: + elapsed += time.time() - self.time_started + return elapsed + + def reset(self): + self.time_started = None + self.elapsed = 0 + + def is_paused(self): + return self.time_started is None + + +class StopwatchError(RuntimeError): + pass + + +def filesize_fmt(num, suffix="B") -> str: + for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: + if abs(num) < 1024.0: + return f"{num:3.1f} {unit}{suffix}" + num /= 1024.0 + return f"{num:.1f} Yi{suffix}"
\ No newline at end of file diff --git a/src/home/web_api/__init__.py b/src/home/web_api/__init__.py new file mode 100644 index 0000000..20655da --- /dev/null +++ b/src/home/web_api/__init__.py @@ -0,0 +1 @@ +from .web_api import get_app
\ No newline at end of file diff --git a/src/home/web_api/web_api.py b/src/home/web_api/web_api.py new file mode 100644 index 0000000..c75c031 --- /dev/null +++ b/src/home/web_api/web_api.py @@ -0,0 +1,213 @@ +import logging +import json +import os.path + +from datetime import datetime, timedelta +from typing import Optional + +from werkzeug.exceptions import HTTPException +from flask import Flask, request, Response + +from ..config import config, is_development_mode +from ..database import BotsDatabase, SensorsDatabase +from ..util import stringify, format_tb +from ..api.types import BotType, TemperatureSensorLocation, SoundSensorLocation +from ..sound import RecordStorage + +db: Optional[BotsDatabase] = None +sensors_db: Optional[SensorsDatabase] = None +app = Flask(__name__) +logger = logging.getLogger(__name__) + + +class AuthError(Exception): + def __init__(self, message: str): + super().__init__() + self.message = message + + +# api methods +# ----------- + +@app.route("/") +def hello(): + message = "nothing here, keep lurking" + if is_development_mode(): + message += ' (dev mode)' + return message + + +@app.route('/api/sensors/data/', methods=['GET']) +def sensors_data(): + hours = request.args.get('hours', type=int, default=1) + sensor = TemperatureSensorLocation(request.args.get('sensor', type=int)) + + if hours < 1 or hours > 24: + raise ValueError('invalid hours value') + + dt_to = datetime.now() + dt_from = dt_to - timedelta(hours=hours) + + data = sensors_db.get_temperature_recordings(sensor, (dt_from, dt_to)) + return ok(data) + + +@app.route('/api/sound_sensors/hits/', methods=['GET']) +def get_sound_sensors_hits(): + location = SoundSensorLocation(request.args.get('location', type=int)) + + after = request.args.get('after', type=int) + kwargs = {} + if after is None: + last = request.args.get('last', type=int) + if last is None: + raise ValueError('you must pass `after` or `last` params') + else: + if not 0 < last < 100: + raise ValueError('invalid last value: must be between 0 and 100') + kwargs['last'] = last + else: + kwargs['after'] = datetime.fromtimestamp(after) + + data = db.get_sound_hits(location, **kwargs) + return ok(data) + + +@app.route('/api/sound_sensors/hits/', methods=['POST']) +def post_sound_sensors_hits(): + hits = [] + for hit, count in json.loads(request.form.get('hits', type=str)): + if not hasattr(SoundSensorLocation, hit.upper()): + raise ValueError('invalid sensor location') + if count < 1: + raise ValueError(f'invalid count: {count}') + hits.append((SoundSensorLocation[hit.upper()], count)) + + db.add_sound_hits(hits, datetime.now()) + return ok() + + +@app.route('/api/logs/bot-request/', methods=['POST']) +def log_bot_request(): + user_id = request.form.get('user_id', type=int, default=0) + message = request.form.get('message', type=str, default='') + bot = BotType(request.form.get('bot', type=int)) + + # validate message + if message.strip() == '': + raise ValueError('message can\'t be empty') + + # add record to the database + db.add_request(bot, user_id, message) + + return ok() + + +@app.route('/api/logs/openwrt/', methods=['POST']) +def log_openwrt(): + logs = request.form.get('logs', type=str, default='') + + # validate it + logs = json.loads(logs) + assert type(logs) is list, "invalid json data (list expected)" + + lines = [] + for line in logs: + assert type(line) is list, "invalid line type (list expected)" + assert len(line) == 2, f"expected 2 items in line, got {len(line)}" + assert type(line[0]) is int, "invalid line[0] type (int expected)" + assert type(line[1]) is str, "invalid line[1] type (str expected)" + + lines.append(( + datetime.fromtimestamp(line[0]), + line[1] + )) + + db.add_openwrt_logs(lines) + return ok() + + +@app.route('/api/recordings/list/', methods=['GET']) +def recordings_list(): + extended = request.args.get('extended', type=bool, default=False) + node = request.args.get('node', type=str) + + root = os.path.join(config['recordings']['directory'], node) + if not os.path.isdir(root): + raise ValueError(f'invalid node {node}: no such directory') + + storage = RecordStorage(root) + files = storage.getfiles(as_objects=extended) + if extended: + files = list(map(lambda file: file.__dict__(), files)) + + return ok(files) + + +# internal functions +# ------------------ + +def ok(data=None) -> Response: + response = {'result': 'ok'} + if data is not None: + response['data'] = data + return Response(stringify(response), + mimetype='application/json') + + +def err(e) -> Response: + error = { + 'type': e.__class__.__name__, + 'message': e.message if hasattr(e, 'message') else str(e) + } + if is_development_mode(): + tb = format_tb(e) + if tb: + error['stacktrace'] = tb + data = { + 'result': 'error', + 'error': error + } + return Response(stringify(data), mimetype='application/json') + + +def get_token() -> Optional[str]: + name = 'X-Token' + if name in request.headers: + return request.headers[name] + + token = request.args.get('token', default='', type=str) + if token != '': + return token + + return None + + +@app.errorhandler(Exception) +def handle_exception(e): + if isinstance(e, HTTPException): + return e + return err(e), 500 + + +@app.before_request +def validate_token() -> None: + if request.path.startswith('/api/') and not is_development_mode(): + token = get_token() + if not token: + raise AuthError(f'token is missing') + + if token != config['api']['token']: + raise AuthError('invalid token') + + +def get_app(): + global db, sensors_db + + config.load('web_api') + app.config.from_mapping(**config['flask']) + + db = BotsDatabase() + sensors_db = SensorsDatabase() + + return app diff --git a/src/inverter_bot.py b/src/inverter_bot.py new file mode 100755 index 0000000..5ad5e33 --- /dev/null +++ b/src/inverter_bot.py @@ -0,0 +1,467 @@ +#!/usr/bin/env python3 +import logging +import re +import datetime +import json + +from inverterd import Format, InverterError +from html import escape +from typing import Optional, Tuple +from home.config import config +from home.bot import Wrapper, Context, text_filter, command_usage +from home.inverter import ( + wrapper_instance as inverter, + beautify_table, + + InverterMonitor, + ChargingEvent, + BatteryState, +) +from home.api.types import BotType +from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton +from telegram.ext import MessageHandler, CommandHandler, CallbackQueryHandler + +monitor: Optional[InverterMonitor] = None +bot: Optional[Wrapper] = None +LT = escape('<=') +flags_map = { + 'buzzer': 'BUZZ', + 'overload_bypass': 'OLBP', + 'escape_to_default_screen_after_1min_timeout': 'LCDE', + 'overload_restart': 'OLRS', + 'over_temp_restart': 'OTRS', + 'backlight_on': 'BLON', + 'alarm_on_on_primary_source_interrupt': 'ALRM', + 'fault_code_record': 'FTCR', +} +logger = logging.getLogger(__name__) + + +def monitor_charging(event: ChargingEvent, **kwargs) -> None: + args = [] + if event == ChargingEvent.AC_CHARGING_STARTED: + key = 'started' + elif event == ChargingEvent.AC_CHARGING_FINISHED: + key = 'finished' + elif event == ChargingEvent.AC_DISCONNECTED: + key = 'disconnected' + elif event == ChargingEvent.AC_NOT_CHARGING: + key = 'not_charging' + elif event == ChargingEvent.AC_CURRENT_CHANGED: + key = 'current_changed' + args.append(kwargs['current']) + elif event == ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR: + key = 'na_solar' + elif event == ChargingEvent.AC_MOSTLY_CHARGED: + key = 'mostly_charged' + else: + logger.error('unknown charging event:', event) + return + + bot.notify_all( + lambda lang: bot.lang.get(f'chrg_evt_{key}', lang, *args) + ) + + +def monitor_battery(state: BatteryState, v: float, load_watts: int) -> None: + if state == BatteryState.NORMAL: + emoji = '✅' + elif state == BatteryState.LOW: + emoji = '⚠️' + elif state == BatteryState.CRITICAL: + emoji = '‼️' + else: + logger.error('unknown battery state:', state) + return + + bot.notify_all( + lambda lang: bot.lang.get('battery_level_changed', lang, + emoji, bot.lang.get(f'bat_state_{state.name.lower()}', lang), v, load_watts) + ) + + +def monitor_error(error: str) -> None: + bot.notify_all( + lambda lang: bot.lang.get('error_message', lang, error) + ) + + +def full_status(ctx: Context) -> None: + status = inverter.exec('get-status', format=Format.TABLE) + ctx.reply(beautify_table(status)) + + +def full_rated(ctx: Context) -> None: + rated = inverter.exec('get-rated', format=Format.TABLE) + ctx.reply(beautify_table(rated)) + + +def full_errors(ctx: Context) -> None: + errors = inverter.exec('get-errors', format=Format.TABLE) + ctx.reply(beautify_table(errors)) + + +def flags(ctx: Context) -> None: + flags = inverter.exec('get-flags')['data'] + text, markup = build_flags_keyboard(flags, ctx) + ctx.reply(text, markup=markup) + + +def build_flags_keyboard(flags: dict, ctx: Context) -> Tuple[str, InlineKeyboardMarkup]: + keyboard = [] + for k, v in flags.items(): + label = ('✅' if v else '❌') + ' ' + ctx.lang(f'flag_{k}') + proto_flag = flags_map[k] + keyboard.append([InlineKeyboardButton(label, callback_data=f'flag_{proto_flag}')]) + + return ctx.lang('flags_press_button'), InlineKeyboardMarkup(keyboard) + + +def status(ctx: Context) -> None: + gs = inverter.exec('get-status')['data'] + + # render response + power_direction = gs['battery_power_direction'].lower() + power_direction = re.sub(r'ge$', 'ging', power_direction) + + charging_rate = '' + chrg_at = ctx.lang('charging_at') + + if power_direction == 'charging': + charging_rate = f'{chrg_at}%s %s' % ( + gs['battery_charging_current']['value'], gs['battery_charging_current']['unit']) + pd_label = ctx.lang('pd_charging') + elif power_direction == 'discharging': + charging_rate = f'{chrg_at}%s %s' % ( + gs['battery_discharging_current']['value'], gs['battery_discharging_current']['unit']) + pd_label = ctx.lang('pd_discharging') + else: + pd_label = ctx.lang('pd_nothing') + + html = f'<b>{ctx.lang("battery")}:</b> %s %s' % (gs['battery_voltage']['value'], gs['battery_voltage']['unit']) + html += ' (%s%s)' % (pd_label, charging_rate) + + html += f'\n<b>{ctx.lang("load")}:</b> %s %s' % (gs['ac_output_active_power']['value'], gs['ac_output_active_power']['unit']) + html += ' (%s%%)' % (gs['output_load_percent']['value']) + + if gs['pv1_input_power']['value'] > 0: + html += f'\n<b>{ctx.lang("gen_input_power")}:</b> %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit']) + + if gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0: + html += f'\n<b>{ctx.lang("generator")}:</b> %s %s' % (gs['grid_voltage']['unit'], gs['grid_voltage']['value']) + html += ', %s %s' % (gs['grid_freq']['value'], gs['grid_freq']['unit']) + + # send response + ctx.reply(html) + + +def generation(ctx: Context) -> None: + today = datetime.date.today() + yday = today - datetime.timedelta(days=1) + yday2 = today - datetime.timedelta(days=2) + + gs = inverter.exec('get-status')['data'] + + gen_today = inverter.exec('get-day-generated', (today.year, today.month, today.day))['data'] + gen_yday = None + gen_yday2 = None + + if yday.month == today.month: + gen_yday = inverter.exec('get-day-generated', (yday.year, yday.month, yday.day))['data'] + + if yday2.month == today.month: + gen_yday2 = inverter.exec('get-day-generated', (yday2.year, yday2.month, yday2.day))['data'] + + # render response + html = f'<b>{ctx.lang("gen_input_power")}:</b> %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit']) + html += ' (%s %s)' % (gs['pv1_input_voltage']['value'], gs['pv1_input_voltage']['unit']) + + html += f'\n<b>{ctx.lang("gen_today")}:</b> %s Wh' % (gen_today['wh']) + + if gen_yday is not None: + html += f'\n<b>{ctx.lang("gen_yday1")}:</b> %s Wh' % (gen_yday['wh']) + + if gen_yday2 is not None: + html += f'\n<b>{ctx.lang("gen_yday2")}:</b> %s Wh' % (gen_yday2['wh']) + + # send response + ctx.reply(html) + + +def setgencc(ctx: Context) -> None: + allowed_values = inverter.exec('get-allowed-ac-charging-currents')['data'] + + try: + current = int(ctx.args[0]) + if current not in allowed_values: + raise ValueError(f'invalid value {current}') + + response = inverter.exec('set-max-ac-charging-current', (0, current)) + ctx.reply('OK' if response['result'] == 'ok' else 'ERROR') + + # TODO notify monitor + + except (IndexError, ValueError): + ctx.reply(command_usage('setgencc', { + 'A': ctx.lang('setgencc_a', ', '.join(map(lambda x: str(x), allowed_values))) + }, language=ctx.user_lang)) + + +def setgenct(ctx: Context) -> None: + try: + cv = float(ctx.args[0]) + dv = float(ctx.args[1]) + + if 44 <= cv <= 51 and 48 <= dv <= 58: + response = inverter.exec('set-charging-thresholds', (cv, dv)) + ctx.reply('OK' if response['result'] == 'ok' else 'ERROR') + else: + raise ValueError('invalid values') + + except (IndexError, ValueError): + ctx.reply(command_usage('setgenct', { + 'CV': ctx.lang('setgenct_cv'), + 'DV': ctx.lang('setgenct_dv') + }, language=ctx.user_lang)) + + +def setbatuv(ctx: Context) -> None: + try: + v = float(ctx.args[0]) + + if 40.0 <= v <= 48.0: + response = inverter.exec('set-battery-cut-off-voltage', (v,)) + ctx.reply('OK' if response['result'] == 'ok' else 'ERROR') + else: + raise ValueError('invalid voltage') + + except (IndexError, ValueError): + ctx.reply(command_usage('setbatuv', { + 'V': ctx.lang('setbatuv_v') + }, language=ctx.user_lang)) + + +def monstatus(ctx: Context) -> None: + msg = '' + st = monitor.dump_status() + for k, v in st.items(): + msg += k + ': ' + str(v) + '\n' + ctx.reply(msg) + + +def monsetcur(ctx: Context) -> None: + ctx.reply('not implemented yet') + + +def calcw(ctx: Context) -> None: + ctx.reply('not implemented yet') + + +def calcwadv(ctx: Context) -> None: + ctx.reply('not implemented yet') + + +def button_callback(ctx: Context) -> None: + query = ctx.callback_query + + if query.data.startswith('flag_'): + flag = query.data[5:] + found = False + json_key = None + for k, v in flags_map.items(): + if v == flag: + found = True + json_key = k + break + if not found: + query.answer(ctx.lang('flags_invalid')) + return + + flags = inverter.exec('get-flags')['data'] + cur_flag_value = flags[json_key] + target_flag_value = '0' if cur_flag_value else '1' + + # set flag + response = inverter.exec('set-flag', (flag, target_flag_value)) + + # notify user + query.answer(ctx.lang('done') if response['result'] == 'ok' else ctx.lang('flags_fail')) + + # edit message + flags[json_key] = not cur_flag_value + text, markup = build_flags_keyboard(flags, ctx) + query.edit_message_text(text, reply_markup=markup) + + else: + query.answer(ctx.lang('unexpected_callback_data')) + + +class InverterBot(Wrapper): + def __init__(self): + super().__init__() + + self.lang.ru( + status='Статус', + generation='Генерация', + battery="АКБ", + load="Нагрузка", + generator="Генератор", + done="Готово", + unexpected_callback_data="Ошибка: неверные данные", + + flags_press_button='Нажмите кнопку для переключения настройки', + flags_fail='Не удалось установить настройку', + flags_invalid='Неизвестная настройка', + + # generation + gen_today='Сегодня', + gen_yday1='Вчера', + gen_yday2='Позавчера', + gen_input_power='Зарядная мощность', + + # status + charging_at=', ', + pd_charging='заряжается', + pd_discharging='разряжается', + pd_nothing='не используется', + + # flags + flag_buzzer='Звуковой сигнал', + flag_overload_bypass='Разрешить перегрузку', + flag_escape_to_default_screen_after_1min_timeout='Возврат на главный экран через 1 минуту', + flag_overload_restart='Перезапуск при перегрузке', + flag_over_temp_restart='Перезапуск при перегреве', + flag_backlight_on='Подсветка экрана', + flag_alarm_on_on_primary_source_interrupt='Сигнал при разрыве основного источника питания', + flag_fault_code_record='Запись кодов ошибок', + + # commands + setbatuv_v=f'напряжение, 40.0 {LT} V {LT} 48.0', + setgenct_cv=f'напряжение включения заряда, 44 {LT} CV {LT} 51', + setgenct_dv=f'напряжение отключения заряда, 48 {LT} DV {LT} 58', + setgencc_a='максимальный ток заряда, допустимые значения: %s', + + # monitor + chrg_evt_started='✅ Начали заряжать от генератора.', + chrg_evt_finished='✅ Зарядили. Генератор пора выключать.', + chrg_evt_disconnected='ℹ️ Генератор отключен.', + chrg_evt_current_changed='ℹ️ Ток заряда от генератора установлен в %d A.', + chrg_evt_not_charging='ℹ️ Генератор подключен, но не заряжает.', + chrg_evt_na_solar='⛔️ Генератор подключен, но аккумуляторы не заряжаются из-за подключенных панелей.', + chrg_evt_mostly_charged='✅ Аккумуляторы более-менее заряжены, генератор пора выключать.', + battery_level_changed='Уровень заряда АКБ: <b>%s %s</b> (<b>%0.1f V</b> при нагрузке <b>%d W</b>)', + error_message='<b>Ошибка:</b> %s.', + + bat_state_normal='Нормальный', + bat_state_low='Низкий', + bat_state_critical='Критический', + ) + + self.lang.en( + status='Status', + generation='Generation', + battery="Battery", + load="Load", + generator="Generator", + done="Done", + unexpected_callback_data="Unexpected callback data", + + flags_press_button='Press a button to toggle a flag.', + flags_fail='Failed to toggle flag', + flags_invalid='Invalid flag', + + # generation + gen_today='Today', + gen_yday1='Yesterday', + gen_yday2='The day before yesterday', + gen_input_power='Input power', + + # status + charging_at=' @ ', + pd_charging='charging', + pd_discharging='discharging', + pd_nothing='not used', + + # flags + flag_buzzer='Buzzer', + flag_overload_bypass='Overload bypass', + flag_escape_to_default_screen_after_1min_timeout='Reset to default LCD page after 1min timeout', + flag_overload_restart='Restart on overload', + flag_over_temp_restart='Restart on overtemp', + flag_backlight_on='LCD backlight', + flag_alarm_on_on_primary_source_interrupt='Beep on primary source interruption', + flag_fault_code_record='Fault code recording', + + # commands + setbatuv_v=f'floating point number, 40.0 {LT} V {LT} 48.0', + setgenct_cv=f'charging voltage, 44 {LT} CV {LT} 51', + setgenct_dv=f'discharging voltage, 48 {LT} DV {LT} 58', + setgencc_a='max charging current, allowed values: %s', + + # monitor + chrg_evt_started='✅ Started charging from AC.', + chrg_evt_finished='✅ Finished charging, it\'s time to stop the generator.', + chrg_evt_disconnected='ℹ️ AC disconnected.', + chrg_evt_current_changed='ℹ️ AC charging current set to %d A.', + chrg_evt_not_charging='ℹ️ AC connected but not charging.', + chrg_evt_na_solar='⛔️ AC connected, but battery won\'t be charged due to active solar power line.', + chrg_evt_mostly_charged='✅ The battery is mostly charged now. The generator can be turned off.', + battery_level_changed='Battery level: <b>%s</b> (<b>%0.1f V</b> under <b>%d W</b> load)', + error_message='<b>Error:</b> %s.', + + bat_state_normal='Normal', + bat_state_low='Low', + bat_state_critical='Critical', + ) + + self.add_handler(MessageHandler(text_filter(self.lang.all('status')), self.wrap(status))) + self.add_handler(MessageHandler(text_filter(self.lang.all('generation')), self.wrap(generation))) + + self.add_handler(CommandHandler('setgencc', self.wrap(setgencc))) + self.add_handler(CommandHandler('setgenct', self.wrap(setgenct))) + self.add_handler(CommandHandler('setbatuv', self.wrap(setbatuv))) + self.add_handler(CommandHandler('monstatus', self.wrap(monstatus))) + self.add_handler(CommandHandler('monsetcur', self.wrap(monsetcur))) + self.add_handler(CommandHandler('calcw', self.wrap(calcw))) + self.add_handler(CommandHandler('calcwadv', self.wrap(calcwadv))) + + self.add_handler(CommandHandler('flags', self.wrap(flags))) + self.add_handler(CommandHandler('status', self.wrap(full_status))) + self.add_handler(CommandHandler('config', self.wrap(full_rated))) + self.add_handler(CommandHandler('errors', self.wrap(full_errors))) + + self.add_handler(CallbackQueryHandler(self.wrap(button_callback))) + + def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]: + button = [ + [ctx.lang('status'), ctx.lang('generation')] + ] + return ReplyKeyboardMarkup(button, one_time_keyboard=False) + + def exception_handler(self, e: Exception, ctx: Context) -> Optional[bool]: + if isinstance(e, InverterError): + try: + err = json.loads(str(e))['message'] + except json.decoder.JSONDecodeError: + err = str(e) + err = re.sub(r'((?:.*)?error:) (.*)', r'<b>\1</b> \2', err) + ctx.reply(err) + return True + + +if __name__ == '__main__': + config.load('inverter_bot') + + inverter.init(host=config['inverter']['ip'], port=config['inverter']['port']) + + monitor = InverterMonitor() + monitor.set_charging_event_handler(monitor_charging) + monitor.set_battery_event_handler(monitor_battery) + monitor.set_error_handler(monitor_error) + monitor.start() + + bot = InverterBot() + bot.enable_logging(BotType.INVERTER) + bot.run() + + monitor.stop() diff --git a/src/inverter_mqtt_receiver.py b/src/inverter_mqtt_receiver.py new file mode 100755 index 0000000..dd61d9a --- /dev/null +++ b/src/inverter_mqtt_receiver.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +import paho.mqtt.client as mqtt +import re +import logging + +from home.mqtt import MQTTBase +from home.mqtt.message import Status, Generation +from home.database import InverterDatabase +from home.config import config + +logger = logging.getLogger(__name__) + + +class MQTTReceiver(MQTTBase): + def __init__(self): + super().__init__(clean_session=False) + self.database = InverterDatabase() + + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + logger.info("subscribing to home/#") + client.subscribe('home/#', qos=1) + + def on_message(self, client: mqtt.Client, userdata, msg): + try: + match = re.match(r'home/(\d+)/(status|gen)', msg.topic) + if not match: + return + + home_id, what = int(match.group(1)), match.group(2) + if what == 'gen': + packer = Generation() + client_time, watts = packer.unpack(msg.payload) + self.database.add_generation(home_id, client_time, watts) + + elif what == 'status': + packer = Status() + client_time, data = packer.unpack(msg.payload) + self.database.add_status(home_id, + client_time, + grid_voltage=int(data['grid_voltage']*10), + grid_freq=int(data['grid_freq'] * 10), + ac_output_voltage=int(data['ac_output_voltage'] * 10), + ac_output_freq=int(data['ac_output_freq'] * 10), + ac_output_apparent_power=data['ac_output_apparent_power'], + ac_output_active_power=data['ac_output_active_power'], + output_load_percent=data['output_load_percent'], + battery_voltage=int(data['battery_voltage'] * 10), + battery_voltage_scc=int(data['battery_voltage_scc'] * 10), + battery_voltage_scc2=int(data['battery_voltage_scc2'] * 10), + battery_discharging_current=data['battery_discharging_current'], + battery_charging_current=data['battery_charging_current'], + battery_capacity=data['battery_capacity'], + inverter_heat_sink_temp=data['inverter_heat_sink_temp'], + mppt1_charger_temp=data['mppt1_charger_temp'], + mppt2_charger_temp=data['mppt2_charger_temp'], + pv1_input_power=data['pv1_input_power'], + pv2_input_power=data['pv2_input_power'], + pv1_input_voltage=int(data['pv1_input_voltage'] * 10), + pv2_input_voltage=int(data['pv2_input_voltage'] * 10), + mppt1_charger_status=data['mppt1_charger_status'], + mppt2_charger_status=data['mppt2_charger_status'], + battery_power_direction=data['battery_power_direction'], + dc_ac_power_direction=data['dc_ac_power_direction'], + line_power_direction=data['line_power_direction'], + load_connected=data['load_connected']) + + except Exception as e: + logger.exception(str(e)) + + +if __name__ == '__main__': + config.load('inverter_mqtt_receiver') + + server = MQTTReceiver() + server.connect_and_loop() + diff --git a/src/inverter_mqtt_sender.py b/src/inverter_mqtt_sender.py new file mode 100755 index 0000000..4e06436 --- /dev/null +++ b/src/inverter_mqtt_sender.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +import paho.mqtt.client as mqtt +import logging +import time +import datetime +import json +import inverterd + +from home.config import config +from home.mqtt import MQTTBase, poll_tick +from home.mqtt.message import Status, Generation + +logger = logging.getLogger(__name__) + + +class MQTTClient(MQTTBase): + def __init__(self): + super().__init__() + + self.inverter = inverterd.Client() + self.inverter.connect() + self.inverter.format(inverterd.Format.SIMPLE_JSON) + + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + + def poll_inverter(self): + freq = int(config['mqtt']['inverter']['poll_freq']) + gen_freq = int(config['mqtt']['inverter']['generation_poll_freq']) + + g = poll_tick(freq) + gen_prev = 0 + while True: + time.sleep(next(g)) + + # read status + now = time.time() + try: + raw = self.inverter.exec('get-status') + except inverterd.InverterError as e: + logger.error(f'inverter error: {str(e)}') + # TODO send to server + continue + + data = json.loads(raw)['data'] + + packer = Status() + self.client.publish(f'home/{self.home_id}/status', + payload=packer.pack(round(now), data), + qos=1) + + # read today's generation stat + now = time.time() + if gen_prev == 0 or now - gen_prev >= gen_freq: + gen_prev = now + today = datetime.date.today() + try: + raw = self.inverter.exec('get-day-generated', (today.year, today.month, today.day)) + except inverterd.InverterError as e: + logger.error(f'inverter error: {str(e)}') + # TODO send to server + continue + + # print('raw:', raw, type(raw)) + data = json.loads(raw)['data'] + packer = Generation() + self.client.publish(f'home/{self.home_id}/gen', + payload=packer.pack(round(now), data['wh']), + qos=1) + + +if __name__ == '__main__': + config.load('inverter_mqtt_sender') + + client = MQTTClient() + client.configure_tls() + client.connect_and_loop(loop_forever=False) + client.poll_inverter()
\ No newline at end of file diff --git a/src/openwrt_log_analyzer.py b/src/openwrt_log_analyzer.py new file mode 100644 index 0000000..f6d6413 --- /dev/null +++ b/src/openwrt_log_analyzer.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +from home.config import config +from home.database import BotsDatabase, SimpleState +from home.util import send_telegram + +""" +config.toml example: + +[simple_state] +file = "/home/user/.config/openwrt_log_analyzer/state.txt" + +[mysql] +host = "localhost" +database = ".." +user = ".." +password = ".." + +[devices] +Device1 = "00:00:00:00:00:00" +Device2 = "01:01:01:01:01:01" + +[telegram] +chat_id = ".." +token = ".." +parse_mode = "HTML" + +[openwrt_log_analyzer] +limit = 10 +""" + + +def main(mac: str, title: str) -> int: + db = BotsDatabase() + + data = db.get_openwrt_logs(filter_text=mac, + min_id=state['last_id'], + limit=config['openwrt_log_analyzer']['limit']) + if not data: + return 0 + + max_id = 0 + for log in data: + if log.id > max_id: + max_id = log.id + + text = '\n'.join(map(lambda s: str(s), data)) + send_telegram(f'<b>{title}</b>\n\n' + text) + + return max_id + + +if __name__ == '__main__': + config.load('openwrt_log_analyzer') + + state = SimpleState(file=config['simple_state']['file'], + default={'last_id': 0}) + + max_last_id = 0 + for name, mac in config['devices'].items(): + last_id = main(mac, title=name) + if last_id > max_last_id: + max_last_id = last_id + + if max_last_id: + state['last_id'] = max_last_id diff --git a/src/openwrt_logger.py b/src/openwrt_logger.py new file mode 100755 index 0000000..4d3b310 --- /dev/null +++ b/src/openwrt_logger.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +import os + +from datetime import datetime +from home.config import config +from home.database import SimpleState +from home.api import WebAPIClient + +log_file = '/var/log/openwrt.log' + +f""" +This script is supposed to be run by cron every 5 minutes or so. +It looks for new lines in {log_file} and sends them to remote server. + +OpenWRT must have remote logging enabled (UDP; IP of host this script is launched on; port 514) + +/etc/rsyslog.conf contains following (assuming 192.168.1.1 is the router IP): + +$ModLoad imudp +$UDPServerRun 514 +:fromhost-ip, isequal, "192.168.1.1" /var/log/openwrt.log +& ~ + +""" + + +def parse_line(line: str) -> tuple[int, str]: + space_pos = line.index(' ') + + date = line[:space_pos] + rest = line[space_pos+1:] + + return ( + int(datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z").timestamp()), + rest + ) + + +if __name__ == '__main__': + config.load('openwrt_logger') + + state = SimpleState(file=config['simple_state']['file'], + default={'seek': 0, 'size': 0}) + + fsize = os.path.getsize(log_file) + if fsize < state['size']: + state['seek'] = 0 + + with open(log_file, 'r') as f: + if state['seek']: + # jump to the latest read position + f.seek(state['seek']) + + # read till the end of the file + content = f.read() + + # save new position + state['seek'] = f.tell() + state['size'] = fsize + + lines: list[tuple[int, str]] = [] + + if content != '': + for line in content.strip().split('\n'): + if not line: + continue + + try: + lines.append(parse_line(line)) + except ValueError: + lines.append((0, line)) + + api = WebAPIClient() + api.log_openwrt(lines) diff --git a/src/pump_bot.py b/src/pump_bot.py new file mode 100755 index 0000000..ae36e27 --- /dev/null +++ b/src/pump_bot.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +from typing import Optional +from home.config import config +from home.bot import Wrapper, Context, text_filter, user_any_name +from home.relay import RelayClient +from home.api.types import BotType +from telegram import ReplyKeyboardMarkup, User +from telegram.ext import MessageHandler +from enum import Enum +from functools import partial + +bot: Optional[Wrapper] = None + + +class UserAction(Enum): + ON = 'on' + OFF = 'off' + + +def get_relay() -> RelayClient: + relay = RelayClient(host=config['relay']['ip'], port=config['relay']['port']) + relay.connect() + return relay + + +def on(silent: bool, ctx: Context) -> None: + get_relay().on() + ctx.reply(ctx.lang('done')) + if not silent: + notify(ctx.user, UserAction.ON) + + +def off(silent: bool, ctx: Context) -> None: + get_relay().off() + ctx.reply(ctx.lang('done')) + if not silent: + notify(ctx.user, UserAction.OFF) + + +def status(ctx: Context) -> None: + ctx.reply( + ctx.lang('enabled') if get_relay().status() == 'on' else ctx.lang('disabled') + ) + + +def notify(user: User, action: UserAction) -> None: + def text_getter(lang: str): + action_name = bot.lang.get(f'user_action_{action.value}', lang) + user_name = user_any_name(user) + return 'ℹ ' + bot.lang.get('user_action_notification', lang, + user.id, user_name, action_name) + + bot.notify_all(text_getter, exclude=(user.id,)) + + +class PumpBot(Wrapper): + def __init__(self): + super().__init__() + + self.lang.ru( + start_message="Выберите команду на клавиатуре", + unknown_command="Неизвестная команда", + + enable="Включить", + enable_silently="Включить тихо", + enabled="Включен ✅", + + disable="Выключить", + disable_silently="Выключить тихо", + disabled="Выключен ❌", + + status="Статус", + done="Готово 👌", + user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.', + user_action_on="включил", + user_action_off="выключил", + ) + + self.lang.en( + start_message="Select command on the keyboard", + unknown_command="Unknown command", + + enable="Turn ON", + enable_silently="Turn ON silently", + enabled="Turned ON ✅", + + disable="Turn OFF", + disable_silently="Turn OFF silently", + disabled="Turned OFF ❌", + + status="Status", + done="Done 👌", + user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.', + user_action_on="ON", + user_action_off="OFF", + ) + + self.add_handler(MessageHandler(text_filter(self.lang.all('enable')), self.wrap(partial(on, False)))) + self.add_handler(MessageHandler(text_filter(self.lang.all('disable')), self.wrap(partial(off, False)))) + + self.add_handler(MessageHandler(text_filter(self.lang.all('enable_silently')), self.wrap(partial(on, True)))) + self.add_handler(MessageHandler(text_filter(self.lang.all('disable_silently')), self.wrap(partial(off, True)))) + + self.add_handler(MessageHandler(text_filter(self.lang.all('status')), self.wrap(status))) + + def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [ + [ctx.lang('enable'), ctx.lang('disable')], + ] + + if ctx.user_id in config['bot']['silent_users']: + buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')]) + + buttons.append([ctx.lang('status')]) + + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +if __name__ == '__main__': + config.load('pump_bot') + + bot = PumpBot() + bot.enable_logging(BotType.PUMP) + bot.run() diff --git a/src/sensors_bot.py b/src/sensors_bot.py new file mode 100755 index 0000000..ea3dc9e --- /dev/null +++ b/src/sensors_bot.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +import json +import socket +import logging +import re +import gc + +from io import BytesIO +from typing import Optional +from functools import partial + +import matplotlib.pyplot as plt +import matplotlib.dates as mdates +import matplotlib.ticker as mticker + +from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton +from telegram.ext import MessageHandler, CallbackQueryHandler + +from home.config import config +from home.bot import Wrapper, Context, text_filter +from home.util import chunks, MySimpleSocketClient +from home.api import WebAPIClient +from home.api.types import ( + BotType, + TemperatureSensorLocation +) + +bot: Optional[Wrapper] = None +plt.rcParams['font.size'] = 7 +logger = logging.getLogger(__name__) +plot_hours = [3, 6, 12, 24] + + +def read_sensor(sensor: str, ctx: Context) -> None: + host = config['sensors'][sensor]['ip'] + port = config['sensors'][sensor]['port'] + + try: + client = MySimpleSocketClient(host, port) + client.write('read') + data = json.loads(client.read()) + except (socket.timeout, socket.error) as error: + return ctx.reply_exc(error) + + temp = round(data['temp'], 2) + humidity = round(data['humidity'], 2) + + text = ctx.lang('temperature') + f': <b>{temp} °C</b>\n' + text += ctx.lang('humidity') + f': <b>{humidity}%</b>' + + buttons = list(map( + lambda h: InlineKeyboardButton(ctx.lang(f'plot_{h}h'), callback_data=f'plot/{sensor}/{h}'), + plot_hours + )) + ctx.reply(text, markup=InlineKeyboardMarkup(chunks(buttons, 2))) + + +def callback_handler(ctx: Context) -> None: + query = ctx.callback_query + + sensors_variants = '|'.join(config['sensors'].keys()) + hour_variants = '|'.join(list(map( + lambda n: str(n), + plot_hours + ))) + + match = re.match(rf'plot/({sensors_variants})/({hour_variants})', query.data) + if not match: + query.answer(ctx.lang('unexpected_callback_data')) + return + + query.answer(ctx.lang('loading')) + + # retrieve data + sensor = TemperatureSensorLocation[match.group(1).upper()] + hours = int(match.group(2)) + + api = WebAPIClient() + data = api.get_sensors_data(sensor, hours) + + title = ctx.lang(sensor.name.lower()) + ' (' + ctx.lang('n_hrs', hours) + ')' + plot = draw_plot(data, title, + ctx.lang('temperature'), + ctx.lang('humidity')) + bot.updater.bot.send_photo(ctx.user_id, plot) + + gc.collect() + + +def draw_plot(data, + title: str, + label_temp: str, + label_hum: str) -> BytesIO: + tempval = [] + humval = [] + dates = [] + for date, temp, humidity in data: + dates.append(date) + tempval.append(temp) + humval.append(humidity) + + fig, axs = plt.subplots(2, 1) + df = mdates.DateFormatter('%H:%M') + + axs[0].set_title(label_temp) + axs[0].plot(dates, tempval) + axs[0].xaxis.set_major_formatter(df) + axs[0].yaxis.set_major_formatter(mticker.FormatStrFormatter('%2.2f °C')) + + fig.suptitle(title, fontsize=10) + + axs[1].set_title(label_hum) + axs[1].plot(dates, humval) + axs[1].xaxis.set_major_formatter(df) + axs[1].yaxis.set_major_formatter(mticker.FormatStrFormatter('%2.1f %%')) + + fig.autofmt_xdate() + + # should be called after all axes have been added + fig.tight_layout() + + buf = BytesIO() + fig.savefig(buf, format='png', dpi=160) + buf.seek(0) + + plt.clf() + plt.close('all') + + return buf + + +class SensorsBot(Wrapper): + def __init__(self): + super().__init__() + + self.lang.ru( + start_message="Выберите датчик на клавиатуре", + unknown_command="Неизвестная команда", + temperature="Температура", + humidity="Влажность", + plot_3h="График за 3 часа", + plot_6h="График за 6 часов", + plot_12h="График за 12 часов", + plot_24h="График за 24 часа", + unexpected_callback_data="Ошибка: неверные данные", + loading="Загрузка...", + n_hrs="график за %d ч." + ) + + self.lang.en( + start_message="Select the sensor on the keyboard", + unknown_command="Unknown command", + temperature="Temperature", + humidity="Relative humidity", + plot_3h="Graph for 3 hours", + plot_6h="Graph for 6 hours", + plot_12h="Graph for 12 hours", + plot_24h="Graph for 24 hours", + unexpected_callback_data="Unexpected callback data", + loading="Loading...", + n_hrs="graph for %d hours" + ) + + for k, v in config['sensors'].items(): + self.lang.set({k: v['label_ru']}, 'ru') + self.lang.set({k: v['label_en']}, 'en') + self.add_handler(MessageHandler(text_filter(self.lang.all(k)), self.wrap(partial(read_sensor, k)))) + + self.add_handler(CallbackQueryHandler(self.wrap(callback_handler))) + + def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [] + for k in config['sensors'].keys(): + buttons.append(ctx.lang(k)) + buttons = chunks(buttons, 2) + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +if __name__ == '__main__': + config.load('sensors_bot') + + bot = SensorsBot() + if 'api' in config: + bot.enable_logging(BotType.SENSORS) + bot.run() diff --git a/src/sensors_mqtt_receiver.py b/src/sensors_mqtt_receiver.py new file mode 100755 index 0000000..011ee44 --- /dev/null +++ b/src/sensors_mqtt_receiver.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +import paho.mqtt.client as mqtt +import logging +import re + +from home.mqtt import MQTTBase +from home.config import config +from home.mqtt.message import Temperature +from home.api.types import TemperatureSensorLocation +from home.database import SensorsDatabase + +logger = logging.getLogger(__name__) + + +def get_sensor_type(sensor: str) -> TemperatureSensorLocation: + for item in TemperatureSensorLocation: + if sensor == item.name.lower(): + return item + raise ValueError(f'unexpected sensor value: {sensor}') + + +class MQTTServer(MQTTBase): + def __init__(self): + super().__init__(clean_session=False) + self.database = SensorsDatabase() + + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + logger.info("subscribing to home/#") + client.subscribe('home/#', qos=1) + + def on_message(self, client: mqtt.Client, userdata, msg): + try: + variants = '|'.join([s.name.lower() for s in TemperatureSensorLocation]) + match = re.match(rf'home/(\d+)/si7021/({variants})', msg.topic) + if not match: + return + + home_id = int(match.group(1)) + sensor = get_sensor_type(match.group(2)) + + packer = Temperature() + client_time, temp, rh = packer.unpack(msg.payload) + + self.database.add_temperature(home_id, client_time, sensor, + temp=int(temp*100), + rh=int(rh*100)) + except Exception as e: + logger.exception(str(e)) + + +if __name__ == '__main__': + config.load('sensors_mqtt_receiver') + + server = MQTTServer() + server.connect_and_loop() diff --git a/src/sensors_mqtt_sender.py b/src/sensors_mqtt_sender.py new file mode 100755 index 0000000..f4f8ec9 --- /dev/null +++ b/src/sensors_mqtt_sender.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +import paho.mqtt.client as mqtt +import logging +import time +import json + +from home.util import parse_addr, MySimpleSocketClient +from home.mqtt import MQTTBase, poll_tick +from home.mqtt.message import Temperature +from home.config import config + +logger = logging.getLogger(__name__) + + +class MQTTClient(MQTTBase): + def on_connect(self, client: mqtt.Client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + + def poll(self): + freq = int(config['mqtt']['sensors']['poll_freq']) + logger.debug(f'freq={freq}') + + g = poll_tick(freq) + while True: + time.sleep(next(g)) + for k, v in config['mqtt']['sensors']['si7021'].items(): + host, port = parse_addr(v['addr']) + self.publish_si7021(host, port, k) + + def publish_si7021(self, host: str, port: int, name: str): + logging.debug(f"publish_si7021/{name}: {host}:{port}") + + try: + now = time.time() + socket = MySimpleSocketClient(host, port) + + socket.write('read') + response = json.loads(socket.read().strip()) + + temp = response['temp'] + humidity = response['humidity'] + + logging.debug(f'publish_si7021/{name}: temp={temp} humidity={humidity}') + + packer = Temperature() + self.client.publish(f'home/{self.home_id}/si7021/{name}', + payload=packer.pack(round(now), temp, humidity), + qos=1) + except Exception as e: + logger.exception(e) + + +if __name__ == '__main__': + config.load('sensors_mqtt_sender') + + client = MQTTClient() + client.configure_tls() + client.connect_and_loop(loop_forever=False) + client.poll() diff --git a/src/si7021d.py b/src/si7021d.py new file mode 100755 index 0000000..fe11787 --- /dev/null +++ b/src/si7021d.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +import smbus +import argparse +import asyncio +import json +import logging + +from home.config import config +from home.util import parse_addr + +logger = logging.getLogger(__name__) +bus = None +lock = asyncio.Lock() +delay = 0.01 + + +async def si7021_read(): + async with lock: + await asyncio.sleep(delay) + + # these are still blocking... meh + raw = bus.read_i2c_block_data(0x40, 0xE3, 2) + temp = 175.72 * (raw[0] << 8 | raw[1]) / 65536.0 - 46.85 + + raw = bus.read_i2c_block_data(0x40, 0xE5, 2) + rh = 125.0 * (raw[0] << 8 | raw[1]) / 65536.0 - 6.0 + + return rh, temp + + +async def handle_client(reader, writer): + request = None + while request != 'quit': + try: + request = await reader.read(255) + if request == b'\x04': + break + request = request.decode('utf-8').strip() + except Exception: + break + + if request == 'read': + try: + rh, temp = await asyncio.wait_for(si7021_read(), timeout=3) + data = dict(humidity=rh, temp=temp) + except asyncio.TimeoutError as e: + logger.exception(e) + data = dict(error='i2c call timed out') + else: + data = dict(error='invalid request') + + writer.write((json.dumps(data) + '\r\n').encode('utf-8')) + try: + await writer.drain() + except ConnectionResetError: + pass + + writer.close() + + +async def run_server(host, port): + server = await asyncio.start_server(handle_client, host, port) + async with server: + logger.info('Server started.') + await server.serve_forever() + + +if __name__ == '__main__': + config.load() + + host, port = parse_addr(config['server']['listen']) + + delay = float(config['smbus']['delay']) + bus = smbus.SMBus(int(config['smbus']['bus'])) + + try: + asyncio.run(run_server(host, port)) + except KeyboardInterrupt: + logging.info('Exiting...') diff --git a/src/sound_bot.py b/src/sound_bot.py new file mode 100755 index 0000000..ae54413 --- /dev/null +++ b/src/sound_bot.py @@ -0,0 +1,783 @@ +#!/usr/bin/env python3 +import logging +import os + +from enum import Enum +from datetime import datetime, timedelta +from html import escape +from typing import Optional +from home.config import config +from home.bot import Wrapper, Context, text_filter, user_any_name +from home.api.types import BotType +from home.api.errors import ApiResponseError +from home.sound import SoundNodeClient, RecordClient, RecordFile +from home.soundsensor import SoundSensorServerGuardClient +from home.util import parse_addr, chunks, filesize_fmt +from home.api import WebAPIClient +from home.api.types import SoundSensorLocation + +from telegram.error import TelegramError +from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton, User +from telegram.ext import ( + CallbackQueryHandler, + MessageHandler +) + +logger = logging.getLogger(__name__) +RenderedContent = tuple[str, Optional[InlineKeyboardMarkup]] +record_client: Optional[RecordClient] = None +bot: Optional[Wrapper] = None +node_client_links: dict[str, SoundNodeClient] = {} + + +def node_client(node: str) -> SoundNodeClient: + if node not in node_client_links: + node_client_links[node] = SoundNodeClient(parse_addr(config['nodes'][node]['addr'])) + return node_client_links[node] + + +def node_exists(node: str) -> bool: + return node in config['nodes'] + + +def sound_sensor_exists(node: str) -> bool: + return node in config['sound_sensors'] + + +def interval_defined(interval: int) -> bool: + return interval in config['bot']['record_intervals'] + + +def callback_unpack(ctx: Context) -> list[str]: + return ctx.callback_query.data[3:].split('/') + + +def manual_recording_allowed(user_id: int) -> bool: + return 'manual_record_allowlist' not in config['bot'] or user_id in config['bot']['manual_record_allowlist'] + + +def guard_client() -> SoundSensorServerGuardClient: + return SoundSensorServerGuardClient(parse_addr(config['bot']['guard_server'])) + + +# message renderers +# ----------------- + +class Renderer: + @classmethod + def places_markup(cls, ctx: Context, callback_prefix: str) -> InlineKeyboardMarkup: + buttons = [] + for node, nodeconfig in config['nodes'].items(): + buttons.append([InlineKeyboardButton(nodeconfig['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{node}')]) + return InlineKeyboardMarkup(buttons) + + @classmethod + def back_button(cls, + ctx: Context, + buttons: list, + callback_data: str): + buttons.append([ + InlineKeyboardButton(ctx.lang('back'), callback_data=callback_data) + ]) + + +class SettingsRenderer(Renderer): + @classmethod + def index(cls, ctx: Context) -> RenderedContent: + html = f'<b>{ctx.lang("settings")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='s0') + + @classmethod + def node(cls, ctx: Context, + controls: list[dict]) -> RenderedContent: + node, = callback_unpack(ctx) + + html = [] + buttons = [] + for control in controls: + html.append(f'<b>{control["name"]}</b>\n{escape(control["info"])}') + buttons.append([ + InlineKeyboardButton(control['name'], callback_data=f's1/{node}/{control["name"]}') + ]) + + html = "\n\n".join(html) + cls.back_button(ctx, buttons, callback_data='s0') + + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def control(cls, ctx: Context, data) -> RenderedContent: + node, control, *rest = callback_unpack(ctx) + + html = '<b>' + ctx.lang('control_state', control) + '</b>\n\n' + html += escape(data['info']) + buttons = [] + callback_prefix = f's2/{node}/{control}' + for cap in data['caps']: + if cap == 'mute': + muted = 'dB] [off]' in data['info'] + act = 'unmute' if muted else 'mute' + buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')]) + + elif cap == 'cap': + cap_dis = 'Capture [off]' in data['info'] + act = 'cap' if cap_dis else 'nocap' + buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')]) + + elif cap == 'volume': + buttons.append( + list(map(lambda s: InlineKeyboardButton(ctx.lang(s), callback_data=f'{callback_prefix}/{s}'), + ['decr', 'incr'])) + ) + + cls.back_button(ctx, buttons, callback_data=f's0/{node}') + + return html, InlineKeyboardMarkup(buttons) + + +class RecordRenderer(Renderer): + @classmethod + def index(cls, ctx: Context) -> RenderedContent: + html = f'<b>{ctx.lang("record")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='r0') + + @classmethod + def node(cls, ctx: Context, durations: list[int]) -> RenderedContent: + node, = callback_unpack(ctx) + + html = ctx.lang('select_interval') + + buttons = [] + for s in durations: + if s >= 60: + m = int(s / 60) + label = ctx.lang('n_min', m) + else: + label = ctx.lang('n_sec', s) + buttons.append(InlineKeyboardButton(label, callback_data=f'r1/{node}/{s}')) + buttons = list(chunks(buttons, 3)) + cls.back_button(ctx, buttons, callback_data=f'r0') + + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def record_started(cls, ctx: Context, rid: int) -> RenderedContent: + node, *rest = callback_unpack(ctx) + + place = config['nodes'][node]['label'][ctx.user_lang] + + html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})' + return html, None + + @classmethod + def record_done(cls, info: dict, node: str, uid: int) -> str: + ulang = bot.store.get_user_lang(uid) + + def lang(key, *args): + return bot.lang.get(key, ulang, *args) + + rid = info['id'] + fmt = '%d.%m.%y %H:%M:%S' + start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt) + stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt) + + place = config['nodes'][node]['label'][ulang] + + html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n' + html += f'<b>{lang("beginning")}</b>: {start_time}\n' + html += f'<b>{lang("end")}</b>: {stop_time}' + + return html + + @classmethod + def record_error(cls, info: dict, node: str, uid: int) -> str: + ulang = bot.store.get_user_lang(uid) + + def lang(key, *args): + return bot.lang.get(key, ulang, *args) + + place = config['nodes'][node]['label'][ulang] + rid = info['id'] + + html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})' + if 'error' in info: + html += '\n'+str(info['error']) + + return html + + +class FilesRenderer(Renderer): + @classmethod + def index(cls, ctx: Context) -> RenderedContent: + html = f'<b>{ctx.lang("files")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='f0') + + @classmethod + def filelist(cls, ctx: Context, files: list[RecordFile]) -> RenderedContent: + node, = callback_unpack(ctx) + + html_files = map(lambda file: cls.file(ctx, file, node), files) + html = '\n\n'.join(html_files) + + buttons = [] + cls.back_button(ctx, buttons, callback_data='f0') + + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def file(cls, ctx: Context, file: RecordFile, node: str) -> str: + html = ctx.lang('file_line', file.start_humantime, file.stop_humantime, filesize_fmt(file.filesize)) + if file.file_id is not None: + html += f'/audio_{node}_{file.file_id}' + return html + + +class RemoteFilesRenderer(FilesRenderer): + @classmethod + def index(cls, ctx: Context) -> RenderedContent: + html = f'<b>{ctx.lang("remote_files")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='g0') + + +class SoundSensorRenderer(Renderer): + @classmethod + def places_markup(cls, ctx: Context, callback_prefix: str) -> InlineKeyboardMarkup: + buttons = [] + for sensor, sensor_label in config['sound_sensors'].items(): + buttons.append( + [InlineKeyboardButton(sensor_label[ctx.user_lang], callback_data=f'{callback_prefix}/{sensor}')]) + return InlineKeyboardMarkup(buttons) + + @classmethod + def index(cls, ctx: Context) -> RenderedContent: + html = f'{ctx.lang("sound_sensors_info")}\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='S0') + + @classmethod + def hits(cls, ctx: Context, data, is_last=False) -> RenderedContent: + node, = callback_unpack(ctx) + buttons = [] + + if not data: + html = ctx.lang('sound_sensors_no_24h_data') + if not is_last: + buttons.append([InlineKeyboardButton(ctx.lang('sound_sensors_show_anything'), callback_data=f'S1/{node}')]) + else: + html = '' + prev_date = None + for item in data: + item_date = item['time'].strftime('%d.%m.%y') + if prev_date is None or prev_date != item_date: + if html != '': + html += '\n\n' + html += f'<b>{item_date}</b>' + prev_date = item_date + html += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})' + cls.back_button(ctx, buttons, callback_data='S0') + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def hits_plain(cls, ctx: Context, data, is_last=False) -> bytes: + node, = callback_unpack(ctx) + + text = '' + prev_date = None + for item in data: + item_date = item['time'].strftime('%d.%m.%y') + if prev_date is None or prev_date != item_date: + if text != '': + text += '\n\n' + text += item_date + prev_date = item_date + text += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})' + + return text.encode() + + +# settings handlers +# ----------------- + +def settings(ctx: Context): + text, markup = SettingsRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +def settings_place(ctx: Context) -> None: + node, = callback_unpack(ctx) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + + cl = node_client(node) + controls = cl.amixer_get_all() + + ctx.answer() + + text, markup = SettingsRenderer.node(ctx, controls) + ctx.edit(text, markup) + + +def settings_place_control(ctx: Context) -> None: + node, control = callback_unpack(ctx) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + + cl = node_client(node) + control_data = cl.amixer_get(control) + + ctx.answer() + + text, markup = SettingsRenderer.control(ctx, control_data) + ctx.edit(text, markup) + + +def settings_place_control_action(ctx: Context) -> None: + node, control, action = callback_unpack(ctx) + if not node_exists(node): + return + + cl = node_client(node) + if not hasattr(cl, f'amixer_{action}'): + ctx.answer(ctx.lang('invalid_action')) + return + + func = getattr(cl, f'amixer_{action}') + control_data = func(control) + + ctx.answer() + + text, markup = SettingsRenderer.control(ctx, control_data) + ctx.edit(text, markup) + + +# recording handlers +# ------------------ + +def record(ctx: Context): + if not manual_recording_allowed(ctx.user_id): + return ctx.reply(ctx.lang('access_denied')) + + text, markup = RecordRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +def record_place(ctx: Context) -> None: + node, = callback_unpack(ctx) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + + ctx.answer() + + text, markup = RecordRenderer.node(ctx, config['bot']['record_intervals']) + ctx.edit(text, markup) + + +def record_place_interval(ctx: Context) -> None: + node, interval = callback_unpack(ctx) + interval = int(interval) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + if not interval_defined(interval): + ctx.answer(ctx.lang('invalid_interval')) + return + + try: + record_id = record_client.record(node, interval, {'user_id': ctx.user_id, 'node': node}) + except ApiResponseError as e: + ctx.answer(e.error_message) + logger.error(e) + return + + ctx.answer() + + html, markup = RecordRenderer.record_started(ctx, record_id) + ctx.edit(html, markup) + + +# files handlers +# -------------- + +# def files(ctx: Context, remote=False): +# renderer = RemoteFilesRenderer if remote else FilesRenderer +# text, markup = renderer.index(ctx) +# if not ctx.is_callback_context(): +# return ctx.reply(text, markup=markup) +# else: +# ctx.answer() +# return ctx.edit(text, markup=markup) +# +# +# def files_list(ctx: Context): +# node, = callback_unpack(ctx) +# if not node_exists(node): +# ctx.answer(ctx.lang('invalid_location')) +# return +# +# ctx.answer() +# +# cl = node_client(node) +# files = cl.storage_list(extended=True, as_objects=True) +# +# text, markup = FilesRenderer.filelist(ctx, files) +# ctx.edit(text, markup) + + +# sound sensor handlers +# --------------------- + +def sound_sensors(ctx: Context): + text, markup = SoundSensorRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +def sound_sensors_last_24h(ctx: Context): + node, = callback_unpack(ctx) + if not sound_sensor_exists(node): + ctx.answer(ctx.lang('invalid location')) + return + + ctx.answer() + + cl = WebAPIClient() + data = cl.get_sound_sensor_hits(location=SoundSensorLocation[node.upper()], + after=datetime.now() - timedelta(hours=24)) + + text, markup = SoundSensorRenderer.hits(ctx, data) + if len(text) > 4096: + plain = SoundSensorRenderer.hits_plain(ctx, data) + bot.send_file(ctx.user_id, document=plain, filename='data.txt') + else: + ctx.edit(text, markup=markup) + + +def sound_sensors_last_anything(ctx: Context): + node, = callback_unpack(ctx) + if not sound_sensor_exists(node): + ctx.answer(ctx.lang('invalid location')) + return + + ctx.answer() + + cl = WebAPIClient() + data = cl.get_last_sound_sensor_hits(location=SoundSensorLocation[node.upper()], + last=20) + + text, markup = SoundSensorRenderer.hits(ctx, data, is_last=True) + if len(text) > 4096: + plain = SoundSensorRenderer.hits_plain(ctx, data) + bot.send_file(ctx.user_id, document=plain, filename='data.txt') + else: + ctx.edit(text, markup=markup) + + +# guard enable/disable handlers +# ----------------------------- + +class GuardUserAction(Enum): + ENABLE = 'enable' + DISABLE = 'disable' + + +def guard_status(ctx: Context): + guard = guard_client() + resp = guard.guard_status() + + key = 'enabled' if resp['enabled'] is True else 'disabled' + ctx.reply(ctx.lang(f'guard_status_{key}')) + + +def guard_enable(ctx: Context): + guard = guard_client() + guard.guard_enable() + ctx.reply(ctx.lang('done')) + + _guard_notify(ctx.user, GuardUserAction.ENABLE) + + +def guard_disable(ctx: Context): + guard = guard_client() + guard.guard_disable() + ctx.reply(ctx.lang('done')) + + _guard_notify(ctx.user, GuardUserAction.DISABLE) + + +def _guard_notify(user: User, action: GuardUserAction): + def text_getter(lang: str): + action_name = bot.lang.get(f'guard_user_action_{action.value}', lang) + user_name = user_any_name(user) + return 'ℹ ' + bot.lang.get('guard_user_action_notification', lang, + user.id, user_name, action_name) + + bot.notify_all(text_getter, exclude=(user.id,)) + + +# record client callbacks +# ----------------------- + +def record_onerror(info: dict, userdata: dict): + uid = userdata['user_id'] + node = userdata['node'] + + html = RecordRenderer.record_error(info, node, uid) + try: + bot.notify_user(userdata['user_id'], html) + except TelegramError as exc: + logger.exception(exc) + finally: + record_client.forget(node, info['id']) + + +def record_onfinished(info: dict, fn: str, userdata: dict): + logger.info('record finished: ' + str(info)) + + uid = userdata['user_id'] + node = userdata['node'] + + html = RecordRenderer.record_done(info, node, uid) + bot.notify_user(uid, html) + + try: + # sending audiofile to telegram + with open(fn, 'rb') as f: + bot.send_audio(uid, audio=f, filename='audio.mp3') + + # deleting temp file + try: + os.unlink(fn) + except OSError as exc: + logger.exception(exc) + bot.notify_user(uid, exc) + + # remove the recording from sound_node's history + record_client.forget(node, info['id']) + + # remove file from storage + # node_client(node).storage_delete(info['file']['fileid']) + except Exception as e: + logger.exception(e) + + +class SoundBot(Wrapper): + def __init__(self): + super().__init__() + + self.lang.ru( + start_message="Выберите команду на клавиатуре", + unknown_command="Неизвестная команда", + unexpected_callback_data="Ошибка: неверные данные", + settings="Настройки микшера", + record="Запись", + loading="Загрузка...", + select_place="Выберите место:", + invalid_location="Неверное место", + invalid_interval="Неверная длительность", + unsupported_action="Неподдерживаемое действие", + # select_control="Выберите контрол для изменения настроек:", + control_state="Состояние контрола %s", + incr="громкость +", + decr="громкость -", + back="◀️ Назад", + n_min="%d мин.", + n_sec="%d сек.", + select_interval="Выберите длительность:", + place="Место", + beginning="Начало", + end="Конец", + record_result="Результат записи", + record_started='Запись запущена!', + record_error="Ошибка записи", + files="Локальные файлы", + remote_files="Файлы на сервере", + file_line="— Запись с <b>%s</b> до <b>%s</b> <i>(%s)</i>", + access_denied="Доступ запрещён", + + guard_disable="Снять с охраны", + guard_enable="Поставить на охрану", + guard_status="Статус охраны", + guard_user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> %s.', + guard_user_action_enable="включил охрану ✅", + guard_user_action_disable="выключил охрану ❌", + guard_status_enabled="Включена ✅", + guard_status_disabled="Выключена ❌", + + done="Готово 👌", + + sound_sensors="Датчики звука", + sound_sensors_info="Здесь можно получить информацию о последних срабатываниях датчиков звука.", + sound_sensors_no_24h_data="За последние 24 часа данных нет.", + sound_sensors_show_anything="Показать, что есть" + ) + + self.lang.en( + start_message="Select command on the keyboard", + unknown_command="Unknown command", + settings="Mixer settings", + record="Record", + unexpected_callback_data="Unexpected callback data", + loading="Loading...", + select_place="Select place:", + invalid_location="Invalid place", + invalid_interval="Invalid duration", + unsupported_action="Unsupported action", + # select_control="Select control to adjust its parameters:", + control_state="%s control state", + incr="vol +", + decr="vol -", + back="◀️ Back", + n_min="%d min.", + n_sec="%d s.", + select_interval="Select duration:", + place="Place", + beginning="Started", + end="Ended", + record_result="Result", + record_started='Recording started!', + record_error="Recording error", + files="Local files", + remote_files="Remote files", + file_line="— From <b>%s</b> to <b>%s</b> <i>(%s)</i>", + access_denied="Access denied", + + guard_disable="Disable guard", + guard_enable="Enable guard", + guard_status="Guard status", + guard_user_action_notification='User <a href="tg://user?id=%d">%s</a> %s.', + guard_user_action_enable="turned the guard ON ✅", + guard_user_action_disable="turn the guard OFF ❌", + guard_status_enabled="Active ✅", + guard_status_disabled="Disabled ❌", + done="Done 👌", + + sound_sensors="Sound sensors", + sound_sensors_info="Here you can get information about last sound sensors hits.", + sound_sensors_no_24h_data="No data for the last 24 hours.", + sound_sensors_show_anything="Show me at least something" + ) + + # ------ + # settings + # ------------- + + # list of nodes + self.add_handler(MessageHandler(text_filter(self.lang.all('settings')), self.wrap(settings))) + self.add_handler(CallbackQueryHandler(self.wrap(settings), pattern=r'^s0$')) + + # list of controls + self.add_handler(CallbackQueryHandler(self.wrap(settings_place), pattern=r'^s0/.*')) + + # list of available tunes for control + self.add_handler(CallbackQueryHandler(self.wrap(settings_place_control), pattern=r'^s1/.*')) + + # tuning + self.add_handler(CallbackQueryHandler(self.wrap(settings_place_control_action), pattern=r'^s2/.*')) + + # ------ + # recording + # -------------- + + # list of nodes + self.add_handler(MessageHandler(text_filter(self.lang.all('record')), self.wrap(record))) + self.add_handler(CallbackQueryHandler(self.wrap(record), pattern=r'^r0$')) + + # list of available intervals + self.add_handler(CallbackQueryHandler(self.wrap(record_place), pattern=r'^r0/.*')) + + # do record! + self.add_handler(CallbackQueryHandler(self.wrap(record_place_interval), pattern=r'^r1/.*')) + + # --------- + # sound sensors + # ------------------ + + # list of places + self.add_handler(MessageHandler(text_filter(self.lang.all('sound_sensors')), self.wrap(sound_sensors))) + self.add_handler(CallbackQueryHandler(self.wrap(sound_sensors), pattern=r'^S0$')) + + # last 24h log + self.add_handler(CallbackQueryHandler(self.wrap(sound_sensors_last_24h), pattern=r'^S0/.*')) + + # last _something_ + self.add_handler(CallbackQueryHandler(self.wrap(sound_sensors_last_anything), pattern=r'^S1/.*')) + + # ------------- + # guard enable/disable + # ------------------------- + if 'guard_server' in config['bot']: + self.add_handler(MessageHandler(text_filter(self.lang.all('guard_enable')), self.wrap(guard_enable))) + self.add_handler(MessageHandler(text_filter(self.lang.all('guard_disable')), self.wrap(guard_disable))) + self.add_handler(MessageHandler(text_filter(self.lang.all('guard_status')), self.wrap(guard_status))) + + # -------- + # local files + # ---------------- + + # list of nodes + # self.add_handler(MessageHandler(text_filter(self.lang.all('files')), self.wrap(partial(files, remote=False)))) + # self.add_handler(CallbackQueryHandler(self.wrap(partial(files, remote=False)), pattern=r'^f0$')) + + # list of specific node's files + # self.add_handler(CallbackQueryHandler(self.wrap(files_list), pattern=r'^f0/.*')) + + # -------- + # remote files + # ----------------- + + # list of nodes + # self.add_handler(MessageHandler(text_filter(self.lang.all('remote_files')), self.wrap(partial(files, remote=True)))) + # self.add_handler(CallbackQueryHandler(self.wrap(partial(files, remote=True)), pattern=r'^g0$')) + + # list of specific node's files + # self.add_handler(CallbackQueryHandler(self.wrap(files_list), pattern=r'^g0/.*')) + + def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [ + [ctx.lang('record'), ctx.lang('settings')], + # [ctx.lang('files'), ctx.lang('remote_files')], + ] + if 'guard_server' in config['bot']: + buttons.append([ + ctx.lang('guard_enable'), ctx.lang('guard_disable'), ctx.lang('guard_status') + ]) + buttons.append([ctx.lang('sound_sensors')]) + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +if __name__ == '__main__': + config.load('sound_bot') + + nodes = {} + for nodename, nodecfg in config['nodes'].items(): + nodes[nodename] = parse_addr(nodecfg['addr']) + + record_client = RecordClient(nodes, + error_handler=record_onerror, + finished_handler=record_onfinished, + download_on_finish=True) + + bot = SoundBot() + if 'api' in config: + bot.enable_logging(BotType.SOUND) + bot.run() + + record_client.stop() diff --git a/src/sound_node.py b/src/sound_node.py new file mode 100755 index 0000000..8ba1b50 --- /dev/null +++ b/src/sound_node.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 +import os + +from typing import Optional +from aiohttp import web +from aiohttp.web_exceptions import ( + HTTPNotFound +) +from home.config import config +from home.util import parse_addr, stringify, format_tb +from home.sound import ( + amixer, + Recorder, + RecordStatus, + RecordStorage +) + + +""" +This script must be run as root as it runs arecord. + +This script implements HTTP API for amixer and arecord. +""" + + +# some global variables +# --------------------- + +recorder: Optional[Recorder] +routes = web.RouteTableDef() +storage: Optional[RecordStorage] + + +# common http funcs & helpers +# --------------------------- + +@web.middleware +async def errors_handler_middleware(request, handler): + try: + response = await handler(request) + return response + + except HTTPNotFound: + return web.json_response({'error': 'not found'}, status=404) + + except Exception as exc: + data = { + 'error': exc.__class__.__name__, + 'message': exc.message if hasattr(exc, 'message') else str(exc) + } + tb = format_tb(exc) + if tb: + data['stacktrace'] = tb + + return web.json_response(data, status=500) + + +def ok(data=None): + if data is None: + data = 1 + response = {'response': data} + return web.json_response(response, dumps=stringify) + + +# recording methods +# ----------------- + +@routes.get('/record/') +async def do_record(request): + duration = int(request.query['duration']) + max = Recorder.get_max_record_time()*15 + if not 0 < duration <= max: + raise ValueError(f'invalid duration: max duration is {max}') + + record_id = recorder.record(duration) + return ok({'id': record_id}) + + +@routes.get('/record/info/{id}/') +async def record_info(request): + record_id = int(request.match_info['id']) + info = recorder.get_info(record_id) + return ok(info.as_dict()) + + +@routes.get('/record/forget/{id}/') +async def record_forget(request): + record_id = int(request.match_info['id']) + + info = recorder.get_info(record_id) + assert info.status in (RecordStatus.FINISHED, RecordStatus.ERROR), f"can't forget: record status is {info.status}" + + recorder.forget(record_id) + return ok() + + +@routes.get('/record/download/{id}/') +async def record_download(request): + record_id = int(request.match_info['id']) + + info = recorder.get_info(record_id) + assert info.status == RecordStatus.FINISHED, f"record status is {info.status}" + + return web.FileResponse(info.file.path) + + +@routes.get('/storage/list/') +async def storage_list(request): + extended = 'extended' in request.query and int(request.query['extended']) == 1 + + files = storage.getfiles(as_objects=extended) + if extended: + files = list(map(lambda file: file.__dict__(), files)) + + return ok({ + 'files': files + }) + + +@routes.get('/storage/delete/') +async def storage_delete(request): + file_id = request.query['file_id'] + file = storage.find(file_id) + if not file: + raise ValueError(f'file {file} not found') + + storage.delete(file) + return ok() + + +@routes.get('/storage/download/') +async def storage_download(request): + file_id = request.query['file_id'] + file = storage.find(file_id) + if not file: + raise ValueError(f'file {file} not found') + + return web.FileResponse(file.path) + + +# ALSA mixer methods +# ------------------ + +def _amixer_control_response(control): + info = amixer.get(control) + caps = amixer.get_caps(control) + return ok({ + 'caps': caps, + 'info': info + }) + + +@routes.get('/amixer/get-all/') +async def amixer_get_all(request): + controls_info = amixer.get_all() + return ok(controls_info) + + +@routes.get('/amixer/get/{control}/') +async def amixer_get(request): + control = request.match_info['control'] + if not amixer.has_control(control): + raise ValueError(f'invalid control: {control}') + + return _amixer_control_response(control) + + +@routes.get('/amixer/{op:mute|unmute|cap|nocap}/{control}/') +async def amixer_set(request): + op = request.match_info['op'] + control = request.match_info['control'] + if not amixer.has_control(control): + raise ValueError(f'invalid control: {control}') + + f = getattr(amixer, op) + f(control) + + return _amixer_control_response(control) + + +@routes.get('/amixer/{op:incr|decr}/{control}/') +async def amixer_volume(request): + op = request.match_info['op'] + control = request.match_info['control'] + if not amixer.has_control(control): + raise ValueError(f'invalid control: {control}') + + def get_step() -> Optional[int]: + if 'step' in request.query: + step = int(request.query['step']) + if not 1 <= step <= 50: + raise ValueError('invalid step value') + return step + return None + + f = getattr(amixer, op) + f(control, step=get_step()) + + return _amixer_control_response(control) + + +# entry point +# ----------- + +if __name__ == '__main__': + if not os.getegid() == 0: + raise RuntimeError("Must be run as root.") + + config.load('sound_node') + + storage = RecordStorage(config['node']['storage']) + + recorder = Recorder(storage=storage) + recorder.start_thread() + + # start http server + host, port = parse_addr(config['node']['listen']) + app = web.Application() + app.add_routes(routes) + app.middlewares.append(errors_handler_middleware) + + web.run_app(app, + host=host, + port=port, + handle_signals=True) diff --git a/src/sound_sensor_node.py b/src/sound_sensor_node.py new file mode 100755 index 0000000..c5f21a3 --- /dev/null +++ b/src/sound_sensor_node.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 +import logging +import os +import sys + +from home.config import config +from home.util import parse_addr +from home.soundsensor import SoundSensorNode + +logger = logging.getLogger(__name__) + + +if __name__ == '__main__': + if not os.getegid() == 0: + sys.exit('Must be run as root.') + + config.load('sound_sensor_node') + + kwargs = {} + if 'delay' in config['node']: + kwargs['delay'] = config['node']['delay'] + + if 'server_addr' in config['node']: + server_addr = parse_addr(config['node']['server_addr']) + else: + server_addr = None + + node = SoundSensorNode(name=config['node']['name'], + pinname=config['node']['pin'], + server_addr=server_addr, + **kwargs) + node.run() diff --git a/src/sound_sensor_server.py b/src/sound_sensor_server.py new file mode 100755 index 0000000..b888429 --- /dev/null +++ b/src/sound_sensor_server.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +import logging +import threading +import os + +from time import sleep +from typing import Optional +from home.config import config +from home.util import parse_addr +from home.api import WebAPIClient, RequestParams +from home.api.types import SoundSensorLocation +from home.soundsensor import SoundSensorServer, SoundSensorHitHandler +from home.sound import RecordClient + +interrupted = False +logger = logging.getLogger(__name__) +server: SoundSensorServer + + +def get_related_sound_nodes(sensor_name: str) -> list[str]: + if sensor_name not in config['sensor_to_sound_nodes_relations']: + raise ValueError(f'unexpected sensor name {sensor_name}') + return config['sensor_to_sound_nodes_relations'][sensor_name] + + +def get_sound_node_config(name: str) -> Optional[dict]: + if name in config['sound_nodes']: + return config['sound_nodes'][name] + else: + return None + + +class HitCounter: + def __init__(self): + self.sensors = {} + self.lock = threading.Lock() + self._reset_sensors() + + def _reset_sensors(self): + for loc in SoundSensorLocation: + self.sensors[loc.name.lower()] = 0 + + def add(self, name: str, hits: int): + if name not in self.sensors: + raise ValueError(f'sensor {name} not found') + + with self.lock: + self.sensors[name] += hits + + def get_all(self) -> list[tuple[str, int]]: + vals = [] + with self.lock: + for name, hits in self.sensors.items(): + if hits > 0: + vals.append((name, hits)) + self._reset_sensors() + return vals + + +class HitHandler(SoundSensorHitHandler): + def handler(self, name: str, hits: int): + if not hasattr(SoundSensorLocation, name.upper()): + logger.error(f'invalid sensor name: {name}') + return + + node_config = get_sound_node_config(name) + if node_config is None: + logger.error(f'config for node {name} not found') + return + + min_hits = node_config['min_hits'] if 'min_hits' in node_config else 1 + if hits < min_hits: + return + + hc.add(name, hits) + + if server.is_recording_enabled(): + try: + nodes = get_related_sound_nodes(name) + for node in nodes: + durations = config['sound_nodes'][node]['durations'] + dur = durations[1] if hits > min_hits else durations[0] + record.record(node, dur*60, {'node': node}) + except ValueError as exc: + logger.exception(exc) + + +def hits_sender(): + while not interrupted: + all_hits = hc.get_all() + if all_hits: + api.add_sound_sensor_hits(all_hits) + sleep(5) + + +api: Optional[WebAPIClient] = None +hc: Optional[HitCounter] = None +record: Optional[RecordClient] = None + + +# record callbacks + +# ---------------- + +def record_error(info: dict, userdata: dict): + node = userdata['node'] + logger.error('recording ' + str(dict) + ' from node ' + node + ' failed') + + record.forget(node, info['id']) + + +def record_finished(info: dict, fn: str, userdata: dict): + logger.debug('record finished: ' + str(info)) + + # audio could have been requested by other user (telegram bot, for example) + # so we shouldn't 'forget' it here + + # node = userdata['node'] + # record.forget(node, info['id']) + + +# api client callbacks +# -------------------- + +def api_error_handler(exc, name, req: RequestParams): + if name == 'upload_recording': + logger.error('failed to upload recording, exception below') + logger.exception(exc) + + else: + logger.error(f'api call ({name}, params={req.params}) failed, exception below') + logger.exception(exc) + + +def api_success_handler(response, name, req: RequestParams): + if name == 'upload_recording': + node = req.params['node'] + rid = req.params['record_id'] + + logger.debug(f'successfully uploaded recording (node={node}, record_id={rid}), api response:' + str(response)) + + # deleting temp file + try: + os.unlink(req.files['file']) + except OSError as exc: + logger.error(f'error while deleting temp file:') + logger.exception(exc) + + record.forget(node, rid) + + +if __name__ == '__main__': + config.load('sound_sensor_server') + + hc = HitCounter() + api = WebAPIClient(timeout=(10, 60)) + api.enable_async(error_handler=api_error_handler, + success_handler=api_success_handler) + + t = threading.Thread(target=hits_sender) + t.daemon = True + t.start() + + nodes = {} + for nodename, nodecfg in config['sound_nodes'].items(): + nodes[nodename] = parse_addr(nodecfg['addr']) + + record = RecordClient(nodes, + error_handler=record_error, + finished_handler=record_finished) + + try: + server = SoundSensorServer(parse_addr(config['server']['listen']), HitHandler) + server.run() + except KeyboardInterrupt: + interrupted = True + record.stop() + logging.info('keyboard interrupt, exiting...') diff --git a/src/test/__init__.py b/src/test/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/test/__init__.py diff --git a/src/test/test.py b/src/test/test.py new file mode 100755 index 0000000..7ea37e6 --- /dev/null +++ b/src/test/test.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +from home.relay import RelayClient + + +if __name__ == '__main__': + c = RelayClient() + print(c, c._host)
\ No newline at end of file diff --git a/src/test/test_amixer.py b/src/test/test_amixer.py new file mode 100755 index 0000000..ac96881 --- /dev/null +++ b/src/test/test_amixer.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +import sys, os.path +sys.path.extend([ + os.path.realpath(os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..')), +]) + +from argparse import ArgumentParser +from src.home.config import config +from src.home.sound import amixer + + +def validate_control(input: str): + for control in config['amixer']['controls']: + if control['name'] == input: + return + raise ValueError(f'invalid control name: {input}') + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--get-all', action='store_true') + parser.add_argument('--mute', type=str) + parser.add_argument('--unmute', type=str) + parser.add_argument('--cap', type=str) + parser.add_argument('--nocap', type=str) + parser.add_argument('--get', type=str) + parser.add_argument('--incr', type=str) + parser.add_argument('--decr', type=str) + # parser.add_argument('--dump-config', action='store_true') + + args = config.load('test_amixer', parser=parser) + + # if args.dump_config: + # print(config.data) + # sys.exit() + + if args.get_all: + for control in amixer.get_all(): + print(f'control = {control["name"]}') + for line in control['info'].split('\n'): + print(f' {line}') + print() + sys.exit() + + if args.get: + info = amixer.get(args.get) + print(info) + sys.exit() + + for action in ['incr', 'decr']: + if hasattr(args, action): + control = getattr(args, action) + if control is None: + continue + + print(f'attempting to {action} {control}') + validate_control(control) + func = getattr(amixer, action) + try: + func(control, step=5) + except amixer.AmixerError as e: + print('error: ' + str(e)) + sys.exit() + + for action in ['mute', 'unmute', 'cap', 'nocap']: + if hasattr(args, action): + control = getattr(args, action) + if control is None: + continue + + print(f"attempting to {action} {control}") + + validate_control(control) + func = getattr(amixer, action) + try: + func(control) + except amixer.AmixerError as e: + print('error: ' + str(e)) + sys.exit() diff --git a/src/test/test_api.py b/src/test/test_api.py new file mode 100755 index 0000000..959b2b3 --- /dev/null +++ b/src/test/test_api.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +from home.api import WebAPIClient +from home.api.types import BotType +from home.config import config + + +if __name__ == '__main__': + config.load('test_api') + + api = WebAPIClient() + print(api.log_bot_request(BotType.ADMIN, 1, "test_api.py")) diff --git a/src/test/test_inverter_monitor.py b/src/test/test_inverter_monitor.py new file mode 100755 index 0000000..d9b63d3 --- /dev/null +++ b/src/test/test_inverter_monitor.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +import cmd +import time +import logging +import socket +import sys +import threading +import os.path +sys.path.extend([ + os.path.realpath( + os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..') + ) +]) + +from enum import Enum, auto +from typing import Optional +from src.home.util import stringify +from src.home.config import config +from src.home.inverter import ( + wrapper_instance as inverter, + + InverterMonitor, + ChargingEvent, + BatteryState, + BatteryPowerDirection, +) + + +def monitor_charging(event: ChargingEvent, **kwargs) -> None: + msg = 'event: ' + event.name + if event == ChargingEvent.AC_CURRENT_CHANGED: + msg += f' (current={kwargs["current"]})' + evt_logger.info(msg) + + +def monitor_battery(state: BatteryState, v: float, load_watts: int) -> None: + evt_logger.info(f'bat: {state.name}, v: {v}, load_watts: {load_watts}') + + +def monitor_error(error: str) -> None: + evt_logger.warning('error: ' + error) + + +class InverterTestShell(cmd.Cmd): + intro = 'Welcome to the test shell. Type help or ? to list commands.\n' + prompt = '(test) ' + file = None + + def do_connect_ac(self, arg): + server.connect_ac() + + def do_disconnect_ac(self, arg): + server.disconnect_ac() + + def do_pd_charge(self, arg): + server.set_pd(BatteryPowerDirection.CHARGING) + + def do_pd_nothing(self, arg): + server.set_pd(BatteryPowerDirection.DO_NOTHING) + + def do_pd_discharge(self, arg): + server.set_pd(BatteryPowerDirection.DISCHARGING) + + +class ChargerMode(Enum): + NONE = auto() + CHARGING = auto() + + +class ChargerEmulator(threading.Thread): + def __init__(self): + super().__init__() + self.setName('ChargerEmulator') + + self.logger = logging.getLogger('charger') + self.interrupted = False + self.mode = ChargerMode.NONE + + self.pd = None + self.ac_connected = False + self.mppt_connected = False + + def run(self): + while not self.interrupted: + if self.pd == BatteryPowerDirection.CHARGING\ + and self.ac_connected\ + and not self.mppt_connected: + + v = server._get_voltage() + 0.02 + self.logger.info('incrementing voltage') + server.set_voltage(v) + + time.sleep(2) + + def stop(self): + self.interrupted = True + + def setmode(self, mode: ChargerMode): + self.mode = mode + + def ac_changed(self, connected: bool): + self.ac_connected = connected + + def mppt_changed(self, connected: bool): + self.mppt_connected = connected + + def current_changed(self, amps): + # FIXME + # this method is not being called and voltage is not changing] + # when current changes + v = None + if amps == 2: + v = 49 + elif amps == 10: + v = 51 + elif amps == 20: + v = 52.5 + elif amps == 30: + v = 53.5 + elif amps == 40: + v = 54.5 + if v is not None: + self.logger.info(f'setting voltage {v}') + server.set_voltage(v) + + def pd_changed(self, pd: BatteryPowerDirection): + self.pd = pd + + +class InverterEmulator(threading.Thread): + def __init__(self, host: str, port: int): + super().__init__() + self.setName('InverterEmulatorServer') + self.lock = threading.Lock() + + self.status = {"grid_voltage": {"unit": "V", "value": 0.0}, + "grid_freq": {"unit": "Hz", "value": 0.0}, + "ac_output_voltage": {"unit": "V", "value": 230.0}, + "ac_output_freq": {"unit": "Hz", "value": 50.0}, + "ac_output_apparent_power": {"unit": "VA", "value": 92}, + "ac_output_active_power": {"unit": "Wh", "value": 30}, + "output_load_percent": {"unit": "%", "value": 1}, + "battery_voltage": {"unit": "V", "value": 48.4}, + "battery_voltage_scc": {"unit": "V", "value": 0.0}, + "battery_voltage_scc2": {"unit": "V", "value": 0.0}, + "battery_discharging_current": {"unit": "A", "value": 0}, + "battery_charging_current": {"unit": "A", "value": 0}, + "battery_capacity": {"unit": "%", "value": 62}, + "inverter_heat_sink_temp": {"unit": "°C", "value": 8}, + "mppt1_charger_temp": {"unit": "°C", "value": 0}, + "mppt2_charger_temp": {"unit": "°C", "value": 0}, + "pv1_input_power": {"unit": "Wh", "value": 0}, + "pv2_input_power": {"unit": "Wh", "value": 0}, + "pv1_input_voltage": {"unit": "V", "value": 0.0}, + "pv2_input_voltage": {"unit": "V", "value": 0.0}, + "configuration_status": "Default", + "mppt1_charger_status": "Abnormal", + "mppt2_charger_status": "Abnormal", + "load_connected": "Connected", + "battery_power_direction": "Discharge", + "dc_ac_power_direction": "DC/AC", + "line_power_direction": "Do nothing", + "local_parallel_id": 0} + self.rated = {"ac_input_rating_voltage": {"unit": "V", "value": 230.0}, + "ac_input_rating_current": {"unit": "A", "value": 21.7}, + "ac_output_rating_voltage": {"unit": "V", "value": 230.0}, + "ac_output_rating_freq": {"unit": "Hz", "value": 50.0}, + "ac_output_rating_current": {"unit": "A", "value": 21.7}, + "ac_output_rating_apparent_power": {"unit": "VA", "value": 5000}, + "ac_output_rating_active_power": {"unit": "Wh", "value": 5000}, + "battery_rating_voltage": {"unit": "V", "value": 48.0}, + "battery_recharge_voltage": {"unit": "V", "value": 51.0}, + "battery_redischarge_voltage": {"unit": "V", "value": 58.0}, + "battery_under_voltage": {"unit": "V", "value": 42.0}, + "battery_bulk_voltage": {"unit": "V", "value": 57.6}, + "battery_float_voltage": {"unit": "V", "value": 54.0}, + "battery_type": "User", + "max_charging_current": {"unit": "A", "value": 60}, + "max_ac_charging_current": {"unit": "A", "value": 10}, + "input_voltage_range": "Appliance", + "output_source_priority": "Parallel output", + "charge_source_priority": "Solar-and-Utility", + "parallel_max_num": 6, + "machine_type": "Off-Grid-Tie", + "topology": "Transformer-less", + "output_model_setting": "Single module", + "solar_power_priority": "Load-Battery-Utility", + "mppt": "2"} + + self.host = host + self.port = port + self.interrupted = False + self.logger = logging.getLogger('srv') + + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind((self.host, self.port)) + + def run(self): + self.sock.listen(5) + + while not self.interrupted: + conn, address = self.sock.accept() + + alive = True + while alive: + try: + buf = conn.recv(2048) + message = buf.decode().strip() + except OSError as exc: + self.logger.error('failed to recv()') + self.logger.exception(exc) + + alive = False + + try: + conn.close() + except: + pass + + continue # exit the loop + + self.logger.log(0, f'< {message}') + + if message.strip() == '': + continue + + if message == 'format json': + # self.logger.info(f'got {message}') + self.reply_ok(conn) + + elif message.startswith('exec '): + command = message[5:].split() + args = command[1:] + command = command[0] + + if command == 'get-allowed-ac-charging-currents': + self.reply_ok(conn, [2, 10, 20, 30, 40, 50, 60]) + elif command == 'get-status': + self.reply_ok(conn, self._get_status()) + elif command == 'get-rated': + self.reply_ok(conn, self._get_rated()) + elif command == 'set-max-ac-charging-current': + self.set_ac_current(args[1]) + self.reply_ok(conn, 1) + else: + raise ValueError('unsupported command: ' + command) + else: + raise ValueError('unexpected request: ' + message) + + def reply_ok(self, connection, data=None): + buf = 'ok' + '\r\n' + if data: + if not isinstance(data, str): + data = stringify({'result': 'ok', 'data': data}) + buf += data + '\r\n' + buf += '\r\n' + self.logger.log(0, f'> {buf.strip()}') + connection.sendall(buf.encode()) + + def _get_status(self) -> dict: + with self.lock: + return self.status + + def _get_rated(self) -> dict: + with self.lock: + return self.rated + + def _get_voltage(self) -> float: + with self.lock: + return self.status['battery_voltage']['value'] + + def stop(self): + self.interrupted = True + self.sock.close() + + def connect_ac(self): + with self.lock: + self.status['grid_voltage']['value'] = 230 + self.status['grid_freq']['value'] = 50 + charger.ac_changed(True) + + def disconnect_ac(self): + with self.lock: + self.status['grid_voltage']['value'] = 0 + self.status['grid_freq']['value'] = 0 + #self.status['battery_voltage']['value'] = 48.4 # revert to initial value + charger.ac_changed(False) + + def connect_mppt(self): + with self.lock: + self.status['pv1_input_power']['value'] = 1 + self.status['pv1_input_voltage']['value'] = 50 + self.status['mppt1_charger_status'] = 'Charging' + charger.mppt_changed(True) + + def disconnect_mppt(self): + with self.lock: + self.status['pv1_input_power']['value'] = 0 + self.status['pv1_input_voltage']['value'] = 0 + self.status['mppt1_charger_status'] = 'Abnormal' + charger.mppt_changed(False) + + def set_voltage(self, v: float): + with self.lock: + self.status['battery_voltage']['value'] = v + + def set_ac_current(self, amps): + with self.lock: + self.rated['max_ac_charging_current']['value'] = amps + charger.current_changed(amps) + + def set_pd(self, pd: BatteryPowerDirection): + if pd == BatteryPowerDirection.CHARGING: + val = 'Charge' + elif pd == BatteryPowerDirection.DISCHARGING: + val = 'Discharge' + else: + val = 'Do nothing' + with self.lock: + self.status['battery_power_direction'] = val + charger.pd_changed(pd) + + +logger = logging.getLogger(__name__) +evt_logger = logging.getLogger('evt') +server: Optional[InverterEmulator] = None +charger: Optional[ChargerEmulator] = None + + +def main(): + global server, charger + + # start fake inverterd server + try: + server = InverterEmulator(host=config['inverter']['host'], + port=config['inverter']['port']) + server.start() + except OSError as e: + logger.error('failed to start server') + logger.exception(e) + return + logger.info('server started') + + # start charger thread + charger = ChargerEmulator() + charger.start() + + # init inverterd wrapper + inverter.init(host=config['inverter']['host'], + port=config['inverter']['port']) + + # start monitor + mon = InverterMonitor() + mon.set_charging_event_handler(monitor_charging) + mon.set_battery_event_handler(monitor_battery) + mon.set_error_handler(monitor_error) + mon.start() + logger.info('monitor started') + + try: + InverterTestShell().cmdloop() + + server.join() + mon.join() + charger.join() + + except KeyboardInterrupt: + server.stop() + mon.stop() + charger.stop() + + +if __name__ == '__main__': + config.load('test_inverter_monitor') + main() diff --git a/src/test/test_record_upload.py b/src/test/test_record_upload.py new file mode 100755 index 0000000..54ff06f --- /dev/null +++ b/src/test/test_record_upload.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +import logging +import sys +import os.path +sys.path.extend([ + os.path.realpath( + os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..') + ) +]) + +import time + +from src.home.api import WebAPIClient, RequestParams +from src.home.config import config +from src.home.sound import RecordClient +from src.home.util import parse_addr + +logger = logging.getLogger(__name__) + + +# record callbacks +# ---------------- + +def record_error(info: dict, userdata: dict): + node = userdata['node'] + # TODO + + +def record_finished(info: dict, fn: str, userdata: dict): + logger.info('record finished: ' + str(info)) + + node = userdata['node'] + api.upload_recording(fn, node, info['id'], int(info['start_time']), int(info['stop_time'])) + + +# api client callbacks +# -------------------- + +def api_error_handler(exc, name, req: RequestParams): + if name == 'upload_recording': + logger.error('failed to upload recording, exception below') + logger.exception(exc) + + else: + logger.error(f'api call ({name}, params={req.params}) failed, exception below') + logger.exception(exc) + + +def api_success_handler(response, name, req: RequestParams): + if name == 'upload_recording': + node = req.params['node'] + rid = req.params['record_id'] + + logger.debug(f'successfully uploaded recording (node={node}, record_id={rid}), api response:' + str(response)) + + # deleting temp file + try: + os.unlink(req.files['file']) + except OSError as exc: + logger.error(f'error while deleting temp file:') + logger.exception(exc) + + record.forget(node, rid) + + +if __name__ == '__main__': + config.load('test_record_upload') + + nodes = {} + for name, addr in config['nodes'].items(): + nodes[name] = parse_addr(addr) + record = RecordClient(nodes, + error_handler=record_error, + finished_handler=record_finished, + download_on_finish=True) + + api = WebAPIClient() + api.enable_async(error_handler=api_error_handler, + success_handler=api_success_handler) + + record_id = record.record('localhost', 3, {'node': 'localhost'}) + print(f'record_id: {record_id}') + + while True: + try: + time.sleep(0.1) + except (KeyboardInterrupt, SystemExit): + break
\ No newline at end of file diff --git a/src/test/test_send_fake_sound_hit.py b/src/test/test_send_fake_sound_hit.py new file mode 100755 index 0000000..af6b7eb --- /dev/null +++ b/src/test/test_send_fake_sound_hit.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +import sys +import os.path +sys.path.extend([ + os.path.realpath( + os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..') + ) +]) + +from argparse import ArgumentParser +from src.home.util import send_datagram, stringify, parse_addr + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--name', type=str, required=True, + help='node name, like `diana`') + parser.add_argument('--hits', type=int, required=True, + help='hits count') + parser.add_argument('--server', type=str, required=True, + help='center server addr in host:port format') + + args = parser.parse_args() + + send_datagram(stringify([args.name, args.hits]), parse_addr(args.server)) diff --git a/src/test/test_sensors_plot.py b/src/test/test_sensors_plot.py new file mode 100755 index 0000000..e69de29 --- /dev/null +++ b/src/test/test_sensors_plot.py diff --git a/src/test/test_sound_node_client.py b/src/test/test_sound_node_client.py new file mode 100755 index 0000000..795165a --- /dev/null +++ b/src/test/test_sound_node_client.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +import sys, os.path +sys.path.extend([ + os.path.realpath(os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..')), +]) + +from src.home.api.errors import ApiResponseError +from src.home.sound import SoundNodeClient + + +if __name__ == '__main__': + client = SoundNodeClient(('127.0.0.1', 8313)) + print(client.amixer_get_all()) + + try: + client.amixer_get('invalidname') + except ApiResponseError as exc: + print(exc) + diff --git a/src/test/test_sound_server_api.py b/src/test/test_sound_server_api.py new file mode 100755 index 0000000..568ea7e --- /dev/null +++ b/src/test/test_sound_server_api.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +import sys +import os.path +sys.path.extend([ + os.path.realpath( + os.path.join(os.path.dirname(os.path.join(__file__)), '..', '..') + ) +]) +import threading + +from time import sleep +from src.home.config import config +from src.home.api import WebAPIClient +from src.home.api.types import SoundSensorLocation + +interrupted = False + + +class HitCounter: + def __init__(self): + self.sensors = {} + self.lock = threading.Lock() + self._reset_sensors() + + def _reset_sensors(self): + for loc in SoundSensorLocation: + self.sensors[loc.name.lower()] = 0 + + def add(self, name: str, hits: int): + if name not in self.sensors: + raise ValueError(f'sensor {name} not found') + + with self.lock: + self.sensors[name] += hits + + def get_all(self) -> list[tuple[str, int]]: + vals = [] + with self.lock: + for name, hits in self.sensors.items(): + if hits > 0: + vals.append((name, hits)) + self._reset_sensors() + return vals + + +def hits_sender(): + while True: + try: + all_hits = hc.get_all() + if all_hits: + api.add_sound_sensor_hits(all_hits) + sleep(5) + except (KeyboardInterrupt, SystemExit): + return + + +if __name__ == '__main__': + config.load('test_api') + + hc = HitCounter() + api = WebAPIClient() + + hc.add('spb1', 1) + # hc.add('big_house', 123) + + hits_sender() diff --git a/src/test/test_stopwatch.py b/src/test/test_stopwatch.py new file mode 100755 index 0000000..6ff2c0e --- /dev/null +++ b/src/test/test_stopwatch.py @@ -0,0 +1,16 @@ +from home.util import Stopwatch, StopwatchError +from time import sleep + + +if __name__ == '__main__': + s = Stopwatch() + s.go() + sleep(2) + s.pause() + s.go() + sleep(1) + print(s.get_elapsed_time()) + sleep(1) + print(s.get_elapsed_time()) + s.pause() + print(s.get_elapsed_time()) diff --git a/src/web_api.py b/src/web_api.py new file mode 100755 index 0000000..beaab57 --- /dev/null +++ b/src/web_api.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 +from home.web_api import get_app +from typing import Optional +from flask import Flask + +app: Optional[Flask] = None + + +if __name__ in ('__main__', 'app'): + app = get_app() + +if __name__ == '__main__': + app.run(host='0.0.0.0') diff --git a/src/web_api_uwsgi.py b/src/web_api_uwsgi.py new file mode 100755 index 0000000..e46f518 --- /dev/null +++ b/src/web_api_uwsgi.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python3 +from home.web_api import get_app + +app = get_app() + + +if __name__ == '__main__': + app.run() diff --git a/systemd/gpiorelayd.service b/systemd/gpiorelayd.service new file mode 100644 index 0000000..d8ea8bf --- /dev/null +++ b/systemd/gpiorelayd.service @@ -0,0 +1,13 @@ +[Unit] +Description=GPIO Relay Daemon +After=network-online.target + +[Service] +User=root +Group=root +Restart=on-failure +ExecStart=/home/user/homekit/src/gpiorelayd.py -с /etc/gpiorelayd_pump.toml +WorkingDirectory=/root + +[Install] +WantedBy=multi-user.target
\ No newline at end of file diff --git a/systemd/inverter_bot.service b/systemd/inverter_bot.service new file mode 100644 index 0000000..96612ae --- /dev/null +++ b/systemd/inverter_bot.service @@ -0,0 +1,13 @@ +[Unit] +Description=InfiniSolar V 5KW Inverter Telegram Bot +After=inverterd.service + +[Service] +User=user +Group=user +Restart=on-failure +ExecStart=/home/user/homekit/src/inverter_bot.py +WorkingDirectory=/home/user + +[Install] +WantedBy=multi-user.target diff --git a/systemd/inverter_mqtt_sender.service b/systemd/inverter_mqtt_sender.service new file mode 100644 index 0000000..e3925f6 --- /dev/null +++ b/systemd/inverter_mqtt_sender.service @@ -0,0 +1,13 @@ +[Unit] +Description=Inverter MQTT sender +After=inverterd.service + +[Service] +User=user +Group=user +Restart=on-failure +ExecStart=/home/user/homekit/src/inverter_mqtt_sender.py +WorkingDirectory=/home/user + +[Install] +WantedBy=multi-user.target
\ No newline at end of file diff --git a/systemd/pump_bot.service b/systemd/pump_bot.service new file mode 100644 index 0000000..dd8a46b --- /dev/null +++ b/systemd/pump_bot.service @@ -0,0 +1,13 @@ +[Unit] +Description=Water Pump Telegram Bot +After=gpiorelayd.service + +[Service] +User=user +Group=user +Restart=on-failure +ExecStart=/home/user/homekit/src/pump_bot.py +WorkingDirectory=/home/user + +[Install] +WantedBy=multi-user.target
\ No newline at end of file diff --git a/systemd/sensors_bot.service b/systemd/sensors_bot.service new file mode 100644 index 0000000..50128b3 --- /dev/null +++ b/systemd/sensors_bot.service @@ -0,0 +1,12 @@ +[Unit] +Description=Sensors Telegram Bot +After=network-online.target + +[Service] +Restart=on-failure +User=user +WorkingDirectory=/home/user +ExecStart=/home/user/homekit/src/sensors_bot.py + +[Install] +WantedBy=multi-user.target
\ No newline at end of file diff --git a/systemd/sensors_mqtt_receiver.service b/systemd/sensors_mqtt_receiver.service new file mode 100644 index 0000000..e67c112 --- /dev/null +++ b/systemd/sensors_mqtt_receiver.service @@ -0,0 +1,13 @@ +[Unit] +Description=sensors mqtt receiver +After=network.target + +[Service] +User=user +Group=user +Restart=on-failure +ExecStart=python3 /home/user/home/src/sensors_mqtt_receiver.py +WorkingDirectory=/home/user + +[Install] +WantedBy=multi-user.target diff --git a/systemd/sensors_mqtt_sender.service b/systemd/sensors_mqtt_sender.service new file mode 100644 index 0000000..54da7f6 --- /dev/null +++ b/systemd/sensors_mqtt_sender.service @@ -0,0 +1,13 @@ +[Unit] +Description=Sensors MQTT sender +After=si7021d.service + +[Service] +User=user +Group=user +Restart=on-failure +ExecStart=/home/user/homekit/src/sensors_mqtt_sender.py +WorkingDirectory=/home/user + +[Install] +WantedBy=multi-user.target
\ No newline at end of file diff --git a/systemd/si7021d.service b/systemd/si7021d.service new file mode 100644 index 0000000..646746b --- /dev/null +++ b/systemd/si7021d.service @@ -0,0 +1,10 @@ +[Unit] +Description=si7021 daemon +After=network-online.target + +[Service] +Restart=on-failure +ExecStart=/home/user/homekit/src/si7021d.py --config /etc/si7021d.toml + +[Install] +WantedBy=multi-user.target diff --git a/systemd/sound_bot.service b/systemd/sound_bot.service new file mode 100644 index 0000000..63bde89 --- /dev/null +++ b/systemd/sound_bot.service @@ -0,0 +1,12 @@ +[Unit] +Description=MyHomeKit's Sound Bot for Telegram +After=network-online.target + +[Service] +Restart=on-failure +User=user +WorkingDirectory=/home/user +ExecStart=/home/user/homekit/src/sound_bot.py + +[Install] +WantedBy=multi-user.target
\ No newline at end of file diff --git a/systemd/sound_node.service b/systemd/sound_node.service new file mode 100644 index 0000000..9d47a4f --- /dev/null +++ b/systemd/sound_node.service @@ -0,0 +1,13 @@ +[Unit] +Description=MyHomeKit Sound Node (ALSA HTTP Frontend) +After=network-online.target + +[Service] +User=root +Group=root +Restart=on-failure +ExecStart=/home/user/homekit/src/sound_node.py --config /etc/sound_node.toml +WorkingDirectory=/root + +[Install] +WantedBy=multi-user.target
\ No newline at end of file diff --git a/systemd/sound_sensor_node.service b/systemd/sound_sensor_node.service new file mode 100644 index 0000000..595e050 --- /dev/null +++ b/systemd/sound_sensor_node.service @@ -0,0 +1,13 @@ +[Unit] +Description=MyHomeKit Sound Sensor Node +After=network-online.target + +[Service] +User=root +Group=root +Restart=on-failure +ExecStart=/home/user/homekit/src/sound_sensor_node.py --config /etc/sound_sensor_node.toml +WorkingDirectory=/root + +[Install] +WantedBy=multi-user.target
\ No newline at end of file diff --git a/systemd/sound_sensor_server.service b/systemd/sound_sensor_server.service new file mode 100644 index 0000000..f45abed --- /dev/null +++ b/systemd/sound_sensor_server.service @@ -0,0 +1,13 @@ +[Unit] +Description=MyHomeKit Sound Sensor Central Server +After=network-online.target + +[Service] +User=user +Group=user +Restart=on-failure +ExecStart=/home/user/homekit/src/sound_sensor_server.py +WorkingDirectory=/home/user + +[Install] +WantedBy=multi-user.target
\ No newline at end of file diff --git a/tools/clickhouse-backup.sh b/tools/clickhouse-backup.sh new file mode 100644 index 0000000..6e938e4 --- /dev/null +++ b/tools/clickhouse-backup.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +DIR=/var/lib/clickhouse/backup +MAX_COUNT=3 +NAME=backup_$(date -u +%Y-%m-%d) + +create() { + local name="$1" + clickhouse-backup create "$name" +} + +del() { + local name="$1" + clickhouse-backup delete local "$name" +} + +# create a backup +create "$NAME" + +# compress backup +cd "$DIR" +tar czvf $NAME.tar.gz $NAME + +# delete uncompressed files +del "$NAME" + +# delete old backups +for file in $(ls -t "${DIR}" | tail -n +$(( MAX_COUNT+1 ))); do + echo "removing $file..." + rm "$file" +done
\ No newline at end of file diff --git a/tools/merge-recordings.py b/tools/merge-recordings.py new file mode 100755 index 0000000..637858e --- /dev/null +++ b/tools/merge-recordings.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +import os +import re +import subprocess +import tempfile +import sys + +from datetime import datetime, timedelta +from argparse import ArgumentParser + + +fmt = '%d%m%y-%H%M%S' + +File = dict +FileList = list[File] + + +def get_files(source_directory: str) -> FileList: + files = [] + for f in os.listdir(source_directory): + m = re.match(r'^(\d{6}-\d{6})_(\d{6}-\d{6})_id(\d+)(_\w+)?\.mp3$', f) + if not m: + continue + + files.append({ + 'filename': os.path.join(source_directory, f), + 'start': datetime.strptime(m.group(1), fmt), + 'stop': datetime.strptime(m.group(2), fmt) + }) + files.sort(key=lambda f: f['start'].timestamp()) + return files + + +def group_files(files: FileList) -> list[FileList]: + groups = [] + group_idx = None + + for info in files: + # if group_idx is not None: + # print(info['start'], groups[group_idx][-1]['stop']) + # print(' ', info['start'] - groups[group_idx][-1]['stop']) + # print() + + if group_idx is None or \ + not groups[group_idx] or \ + info['start'] - groups[group_idx][-1]['stop'] <= timedelta(seconds=1): + if group_idx is None: + groups.append([]) + group_idx = 0 + else: + group_idx += 1 + groups.append([]) + groups[group_idx].append(info) + + return groups + + +def merge(groups: list[FileList], + output_directory: str, + delete_source_files=False, + vbr=False) -> None: + for g in groups: + success = False + + fd = tempfile.NamedTemporaryFile(delete=False) + try: + for file in g: + line = f'file \'{file["filename"]}\'\n' + # print(line.strip()) + fd.write(line.encode()) + fd.close() + + start = g[0]['start'].strftime(fmt) + stop = g[-1]['stop'].strftime(fmt) + fn = f'{start}_{stop}_merged.mp3' + output = os.path.join(output_directory, fn) + + cmd = ['ffmpeg', '-y', + '-f', 'concat', + '-safe', '0', + '-i', fd.name, + '-map_metadata', '-1', + '-codec:a', 'libmp3lame'] + if vbr: + cmd.extend(['-codec:a', 'libmp3lame', '-q:a', '4']) + else: + cmd.extend(['-codec:a', 'copy']) + cmd.append(output) + + p = subprocess.run(cmd, capture_output=False) + if p.returncode != 0: + print(f'error: ffmpeg returned {p.returncode}') + else: + success = True + finally: + os.unlink(fd.name) + + if success and delete_source_files: + for file in g: + os.unlink(file['filename']) + + +def main(): + default_dir = os.getcwd() + + parser = ArgumentParser() + parser.add_argument('--input-directory', '-i', type=str, default=default_dir, + help='Directory with files') + parser.add_argument('--output-directory', '-o', type=str, default=default_dir, + help='Output directory') + parser.add_argument('-D', '--delete-source-files', action='store_true') + parser.add_argument('--vbr', action='store_true', + help='Re-encode using VBR (-q:a 4)') + args = parser.parse_args() + + files = get_files(os.path.realpath(args.input_directory)) + if not len(files): + print(f"No mp3 files found in {args.input_directory}.") + sys.exit() + + groups = group_files(files) + + merge(groups, + os.path.realpath(args.output_directory), + delete_source_files=args.delete_source_files, + vbr=args.vbr) + + +if __name__ == '__main__': + main() diff --git a/tools/remove-old-recordings.sh b/tools/remove-old-recordings.sh new file mode 100644 index 0000000..d376572 --- /dev/null +++ b/tools/remove-old-recordings.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +# to be launched by cron on remote server + +find /var/recordings -type f -mtime +14 -delete diff --git a/tools/sync-recordings-to-remote.sh b/tools/sync-recordings-to-remote.sh new file mode 100755 index 0000000..cf979d1 --- /dev/null +++ b/tools/sync-recordings-to-remote.sh @@ -0,0 +1,72 @@ +#!/bin/bash + +PROGNAME="$0" +NODE_CONFIG="/etc/sound_node.toml" +REMOTE_USER=user +REMOTE_SERVER=solarmon.ru +REMOTE_DIRECTORY=/var/recordings + +set -e + +echoerr() { + >&2 echo "error: $@" +} + +echowarn() { + >&2 echo "warning: $@" +} + +telegram_alert() { + if [ -z "$TG_TOKEN" ] || [ -z "$TG_CHAT_ID" ]; then return; fi + curl -X POST \ + -F "chat_id=${TG_CHAT_ID}" \ + -F "text=$1" \ + "https://api.telegram.org/bot${TG_TOKEN}/sendMessage" +} + +fatal() { + echoerr "$@" + telegram_alert "$PROGNAME: $@" + exit 1 +} + +get_config_var() { + local varname="$1" + cat "$NODE_CONFIG" | grep "^$varname = \"" | awk '{print $3}' | tr -d '"' +} + +get_mp3_count() { + find "$LOCAL_DIR" -mindepth 1 -type f -name "*.mp3" -printf x | wc -c +} + +[ -z "$TG_TOKEN" ] && echowarn "TG_TOKEN is not set" +[ -z "$TG_CHAT_ID" ] && echowarn "TG_CHAT_ID is not set" + +NODE_NAME=$(get_config_var name) +LOCAL_DIR=$(get_config_var storage) + +[ -z "$NODE_NAME" ] && fatal "failed to parse NODE_NAME" +[ -z "$LOCAL_DIR" ] && fatal "failed to parse LOCAL_DIR" + +[ -d "$LOCAL_DIR" ] || fatal "$LOCAL_DIR is not a directory" + +COUNT=$(get_mp3_count) +(( $COUNT < 1 )) && { + echo "seems there's nothing to sync" + exit +} + +cd "$LOCAL_DIR" || fatal "failed to change to $LOCAL_DIR" + +rsync -azPv -e "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o LogLevel=ERROR" \ + *.mp3 \ + ${REMOTE_USER}@${REMOTE_SERVER}:"${REMOTE_DIRECTORY}/${NODE_NAME}/" \ + --exclude temp.mp3 + +RC=$? + +if [ $RC -eq 0 ]; then + find "$LOCAL_DIR" -name "*.mp3" -type f -mmin +1440 -delete || fatal "find failed to delete old files" +else + fatal "failed to rsync: code $RC" +fi diff --git a/tools/vkos.sh b/tools/vkos.sh new file mode 100755 index 0000000..ebe0d66 --- /dev/null +++ b/tools/vkos.sh @@ -0,0 +1,99 @@ +#!/bin/bash + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +PROGNAME="$0" + +die() { + >&2 echo "error: $@" + exit 1 +} + +usage() { + cat <<EOF +usage: $PROGNAME [OPTIONS] COMMAND + +Options: + -b use backup server + -d don't delete files after merge + +Supported commands: + list NODE + fetch NODE PREFIX + merge +EOF + exit +} + +[ -z "$1" ] && usage + +COMMAND= +NODE= +PREFIX= +FROM_BACKUP=0 +DONT_DELETE=0 +while [[ $# -gt 0 ]]; do + case "$1" in + list) + COMMAND="$1" + NODE="$2" + shift + ;; + + fetch) + COMMAND="$1" + NODE="$2" + PREFIX="$3" + shift; shift + ;; + + merge) + COMMAND="$1" + ;; + + -b) + FROM_BACKUP=1 + ;; + + -d) + DONT_DELETE=1 + ;; + + *) + die "unrecognized argument: $1" + ;; + esac + shift +done + +[ -z "$COMMAND" ] && usage + +if [ "$FROM_BACKUP" = "0" ]; then + SRV_HOST=solarmon.ru + SRV_PORT=60681 + SRV_USER=user + SRV_DIR=/var/recordings +else + SRV_HOST=srv_nas4 + SRV_PORT=22 + SRV_USER=root + SRV_DIR=/var/storage1/solarmon/recordings +fi + +case "$COMMAND" in + list) + [ -z "$NODE" ] && usage + ssh -p${SRV_PORT} ${SRV_USER}@${SRV_HOST} "ls -rt --time creation \"${SRV_DIR}/${NODE}\"" + ;; + + fetch) + [ -z "$NODE" ] && usage + [ -z "$PREFIX" ] && usage + rsync -azPv -e "ssh -p${SRV_PORT}" ${SRV_USER}@${SRV_HOST}:"${SRV_DIR}/${NODE}/${PREFIX}*" . + ;; + + merge) + args= + if [ "$DONT_DELETE" = "0" ]; then args="-D"; fi + $DIR/merge-recordings.py $args + ;; +esac |