summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/home/api/types/types.py1
-rw-r--r--src/home/api/web_api_client.py6
-rw-r--r--src/home/mqtt/__init__.py1
-rw-r--r--src/home/mqtt/mqtt.py21
-rw-r--r--src/home/mqtt/payload/__init__.py1
-rw-r--r--src/home/mqtt/payload/relay.py45
-rw-r--r--src/home/mqtt/relay.py93
-rw-r--r--src/home/telegram/bot.py2
-rwxr-xr-xsrc/pump_mqtt_bot.py191
9 files changed, 350 insertions, 11 deletions
diff --git a/src/home/api/types/types.py b/src/home/api/types/types.py
index 1d5596f..3f77b19 100644
--- a/src/home/api/types/types.py
+++ b/src/home/api/types/types.py
@@ -8,6 +8,7 @@ class BotType(Enum):
ADMIN = auto()
SOUND = auto()
POLARIS_KETTLE = auto()
+ PUMP_MQTT = auto()
class TemperatureSensorLocation(Enum):
diff --git a/src/home/api/web_api_client.py b/src/home/api/web_api_client.py
index f74b5a1..ca9a9ee 100644
--- a/src/home/api/web_api_client.py
+++ b/src/home/api/web_api_client.py
@@ -183,9 +183,9 @@ class WebAPIClient:
data = json.loads(r.text)
if r.status_code != 200:
raise ApiResponseError(r.status_code,
- data['error']['type'],
- data['error']['message'],
- data['error']['stacktrace'] if 'stacktrace' in data['error'] else None)
+ data['error'],
+ data['message'],
+ data['stacktrace'] if 'stacktrace' in data['error'] else None)
return data['response'] if 'response' in data else True
finally:
diff --git a/src/home/mqtt/__init__.py b/src/home/mqtt/__init__.py
index c0ef9ba..a6f5f5e 100644
--- a/src/home/mqtt/__init__.py
+++ b/src/home/mqtt/__init__.py
@@ -1,2 +1,3 @@
from .mqtt import MQTTBase
from .util import poll_tick
+from .relay import MQTTRelay \ No newline at end of file
diff --git a/src/home/mqtt/mqtt.py b/src/home/mqtt/mqtt.py
index b3334b5..9dd973b 100644
--- a/src/home/mqtt/mqtt.py
+++ b/src/home/mqtt/mqtt.py
@@ -6,8 +6,6 @@ import logging
from typing import Tuple
from ..config import config
-logger = logging.getLogger(__name__)
-
def username_and_password() -> Tuple[str, str]:
username = config['mqtt']['username'] if 'username' in config['mqtt'] else None
@@ -23,11 +21,15 @@ class MQTTBase:
self._client.on_connect = self.on_connect
self._client.on_disconnect = self.on_disconnect
self._client.on_message = self.on_message
+ self._client.on_log = self.on_log
+ self._client.on_publish = self.on_publish
+ self._loop_started = False
self._logger = logging.getLogger(self.__class__.__name__)
username, password = username_and_password()
if username and password:
+ self._logger.debug(f'username={username} password={password}')
self._client.username_pw_set(username, password)
def configure_tls(self):
@@ -50,6 +52,12 @@ class MQTTBase:
self._client.loop_forever()
else:
self._client.loop_start()
+ self._loop_started = True
+
+ def disconnect(self):
+ self._client.disconnect()
+ self._client.loop_write()
+ self._client.loop_stop()
def on_connect(self, client: mqtt.Client, userdata, flags, rc):
self._logger.info("Connected with result code " + str(rc))
@@ -57,5 +65,12 @@ class MQTTBase:
def on_disconnect(self, client: mqtt.Client, userdata, rc):
self._logger.info("Disconnected with result code " + str(rc))
+ def on_log(self, client: mqtt.Client, userdata, level, buf):
+ level = mqtt.LOGGING_LEVEL[level] if level in mqtt.LOGGING_LEVEL else logging.INFO
+ self._logger.log(level, f'MQTT: {buf}')
+
def on_message(self, client: mqtt.Client, userdata, msg):
- self._logger.info(msg.topic + ": " + str(msg.payload))
+ self._logger.debug(msg.topic + ": " + str(msg.payload))
+
+ def on_publish(self, client: mqtt.Client, userdata, mid):
+ self._logger.debug(f'publish done, mid={mid}') \ No newline at end of file
diff --git a/src/home/mqtt/payload/__init__.py b/src/home/mqtt/payload/__init__.py
index e69de29..9fcaf3e 100644
--- a/src/home/mqtt/payload/__init__.py
+++ b/src/home/mqtt/payload/__init__.py
@@ -0,0 +1 @@
+from .base_payload import MQTTPayload \ No newline at end of file
diff --git a/src/home/mqtt/payload/relay.py b/src/home/mqtt/payload/relay.py
index 2a327ba..debc2c8 100644
--- a/src/home/mqtt/payload/relay.py
+++ b/src/home/mqtt/payload/relay.py
@@ -1,6 +1,10 @@
+import hashlib
+
from .base_payload import MQTTPayload, MQTTPayloadCustomField
+# _logger = logging.getLogger(__name__)
+
class StatFlags(MQTTPayloadCustomField):
state: bool
config_changed_value_present: bool
@@ -8,9 +12,11 @@ class StatFlags(MQTTPayloadCustomField):
@staticmethod
def unpack(flags: int):
+ # _logger.debug(f'StatFlags.unpack: flags={flags}')
state = flags & 0x1
ccvp = (flags >> 1) & 0x1
cc = (flags >> 2) & 0x1
+ # _logger.debug(f'StatFlags.unpack: state={state}')
return StatFlags(state=(state == 1),
config_changed_value_present=(ccvp == 1),
config_changed=(cc == 1))
@@ -24,7 +30,7 @@ class StatFlags(MQTTPayloadCustomField):
class InitialStatPayload(MQTTPayload):
- FORMAT = 'IBbIB'
+ FORMAT = '=IBbIB'
ip: int
fw_version: int
@@ -34,7 +40,7 @@ class InitialStatPayload(MQTTPayload):
class StatPayload(MQTTPayload):
- FORMAT = 'bIB'
+ FORMAT = '=bIB'
rssi: int
free_heap: int
@@ -42,13 +48,42 @@ class StatPayload(MQTTPayload):
class PowerPayload(MQTTPayload):
- FORMAT = '12sB'
+ FORMAT = '=12sB'
PACKER = {
- 'state': lambda n: int(n)
+ 'state': lambda n: int(n),
+ 'secret': lambda s: s.encode('utf-8')
}
UNPACKER = {
- 'state': lambda n: bool(n)
+ 'state': lambda n: bool(n),
+ 'secret': lambda s: s.decode('utf-8')
}
secret: str
state: bool
+
+
+class OTAPayload(MQTTPayload):
+ secret: str
+ filename: str
+
+ # structure of returned data:
+ #
+ # uint8_t[len(secret)] secret;
+ # uint8_t[16] md5;
+ # *uint8_t data
+
+ def pack(self):
+ buf = bytearray(self.secret.encode())
+ m = hashlib.md5()
+ with open(self.filename, 'rb') as fd:
+ content = fd.read()
+ m.update(content)
+ buf.extend(m.digest())
+ buf.extend(content)
+ return buf
+
+ def unpack(cls, buf: bytes):
+ raise RuntimeError(f'{cls.__class__.__name__}.unpack: not implemented')
+ # secret = buf[:12].decode()
+ # filename = buf[12:].decode()
+ # return OTAPayload(secret=secret, filename=filename)
diff --git a/src/home/mqtt/relay.py b/src/home/mqtt/relay.py
new file mode 100644
index 0000000..0f97b5b
--- /dev/null
+++ b/src/home/mqtt/relay.py
@@ -0,0 +1,93 @@
+import paho.mqtt.client as mqtt
+import re
+
+from .mqtt import MQTTBase
+from typing import Optional, Union
+from .payload.relay import (
+ InitialStatPayload,
+ StatPayload,
+ PowerPayload,
+ OTAPayload
+)
+
+
+class MQTTRelay(MQTTBase):
+ _home_id: Union[str, int]
+ _secret: str
+ _message_callback: Optional[callable]
+ _ota_publish_callback: Optional[callable]
+
+ def __init__(self,
+ home_id: Union[str, int],
+ secret: str,
+ subscribe_to_updates=True):
+ super().__init__(clean_session=True)
+ self._home_id = home_id
+ self._secret = secret
+ self._message_callback = None
+ self._ota_publish_callback = None
+ self._subscribe_to_updates = subscribe_to_updates
+ self._ota_mid = None
+
+ def on_connect(self, client: mqtt.Client, userdata, flags, rc):
+ super().on_connect(client, userdata, flags, rc)
+
+ if self._subscribe_to_updates:
+ topic = f'hk/{self._home_id}/relay/#'
+ self._logger.info(f"subscribing to {topic}")
+
+ client.subscribe(topic, qos=1)
+
+ def on_publish(self, client: mqtt.Client, userdata, mid):
+ if self._ota_mid is not None and mid == self._ota_mid and self._ota_publish_callback:
+ self._ota_publish_callback()
+
+ def set_message_callback(self, callback: callable):
+ self._message_callback = callback
+
+ def on_message(self, client: mqtt.Client, userdata, msg):
+ try:
+ match = re.match(r'^hk/(.*?)/relay/(stat|stat1|power|otares)$', msg.topic)
+ self._logger.debug(f'topic: {msg.topic}')
+ if not match:
+ return
+
+ name = match.group(1)
+ subtopic = match.group(2)
+
+ if name != self._home_id:
+ return
+
+ message = None
+ if subtopic == 'stat':
+ message = StatPayload.unpack(msg.payload)
+ elif subtopic == 'stat1':
+ message = InitialStatPayload.unpack(msg.payload)
+ elif subtopic == 'power':
+ message = PowerPayload.unpack(msg.payload)
+
+ if message and self._message_callback:
+ self._message_callback(message)
+
+ except Exception as e:
+ self._logger.exception(str(e))
+
+ def set_power(self, enable: bool):
+ payload = PowerPayload(secret=self._secret,
+ state=enable)
+ self._client.publish(f'hk/{self._home_id}/relay/power',
+ payload=payload.pack(),
+ qos=1)
+ self._client.loop_write()
+
+ def push_ota(self,
+ filename: str,
+ publish_callback: callable,
+ qos: int):
+ self._ota_publish_callback = publish_callback
+ payload = OTAPayload(secret=self._secret, filename=filename)
+ publish_result = self._client.publish(f'hk/{self._home_id}/relay/admin/ota',
+ payload=payload.pack(),
+ qos=qos)
+ self._ota_mid = publish_result.mid
+ self._client.loop_write()
diff --git a/src/home/telegram/bot.py b/src/home/telegram/bot.py
index 3c82587..bd09f42 100644
--- a/src/home/telegram/bot.py
+++ b/src/home/telegram/bot.py
@@ -110,6 +110,8 @@ def _handler_of_handler(*args, **kwargs):
ctx.reply_exc(e)
else:
notify_user(ctx.user_id, exc2text(e))
+ else:
+ _logger.exception(e)
def handler(**kwargs):
diff --git a/src/pump_mqtt_bot.py b/src/pump_mqtt_bot.py
new file mode 100755
index 0000000..d87234b
--- /dev/null
+++ b/src/pump_mqtt_bot.py
@@ -0,0 +1,191 @@
+#!/usr/bin/env python3
+import datetime
+
+from enum import Enum
+from typing import Optional
+from telegram import ReplyKeyboardMarkup, User
+
+from home.config import config
+from home.telegram import bot
+from home.telegram._botutil import user_any_name
+from home.api.types import BotType
+from home.mqtt import MQTTRelay
+from home.mqtt.payload import MQTTPayload
+from home.mqtt.payload.relay import InitialStatPayload, StatPayload
+
+
+config.load('pump_mqtt_bot')
+
+bot.initialize()
+bot.lang.ru(
+ start_message="Выберите команду на клавиатуре",
+ start_message_no_access="Доступ запрещён. Вы можете отправить заявку на получение доступа.",
+ unknown_command="Неизвестная команда",
+ send_access_request="Отправить заявку",
+ management="Админка",
+
+ enable="Включить",
+ enabled="Включен ✅",
+
+ disable="Выключить",
+ disabled="Выключен ❌",
+
+ status="Статус",
+ status_updated=' (обновлено %s)',
+
+ done="Готово 👌",
+ user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.',
+ user_action_on="включил",
+ user_action_off="выключил",
+ date_yday="вчера",
+ date_yyday="позавчера",
+ date_at="в"
+)
+bot.lang.en(
+ start_message="Select command on the keyboard",
+ start_message_no_access="You have no access.",
+ unknown_command="Unknown command",
+ send_access_request="Send request",
+ management="Admin options",
+
+ enable="Turn ON",
+ enable_silently="Turn ON silently",
+ enabled="Turned ON ✅",
+
+ disable="Turn OFF",
+ disable_silently="Turn OFF silently",
+ disabled="Turned OFF ❌",
+
+ status="Status",
+ status_updated=' (updated %s)',
+
+ done="Done 👌",
+ user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.',
+ user_action_on="ON",
+ user_action_off="OFF",
+
+ date_yday="yesterday",
+ date_yyday="the day before yesterday",
+ date_at="at"
+)
+
+
+class RelayState:
+ enabled: bool
+ update_time: datetime.datetime
+ rssi: int
+ fw_version: int
+ ever_updated: bool
+
+ def __init__(self):
+ self.ever_updated = False
+
+ def update(self,
+ enabled: bool,
+ rssi: int,
+ fw_version=None):
+ self.ever_updated = True
+ self.enabled = enabled
+ self.rssi = rssi
+ self.update_time = datetime.datetime.now()
+ if fw_version:
+ self.fw_version = fw_version
+
+
+mqtt_relay: Optional[MQTTRelay] = None
+relay_state = RelayState()
+
+
+class UserAction(Enum):
+ ON = 'on'
+ OFF = 'off'
+
+
+def on_mqtt_message(message: MQTTPayload):
+ if isinstance(message, InitialStatPayload) or isinstance(message, StatPayload):
+ kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
+ if isinstance(message, InitialStatPayload):
+ kwargs['fw_version'] = message.fw_version
+ relay_state.update(**kwargs)
+
+
+def notify(user: User, action: UserAction) -> None:
+ def text_getter(lang: str):
+ action_name = bot.lang.get(f'user_action_{action.value}', lang)
+ user_name = user_any_name(user)
+ return 'ℹ ' + bot.lang.get('user_action_notification', lang,
+ user.id, user_name, action_name)
+
+ bot.notify_all(text_getter, exclude=(user.id,))
+
+
+@bot.handler(message='enable')
+def enable_handler(ctx: bot.Context) -> None:
+ mqtt_relay.set_power(True)
+ ctx.reply(ctx.lang('done'))
+ notify(ctx.user, UserAction.ON)
+
+
+@bot.handler(message='disable')
+def disable_handler(ctx: bot.Context) -> None:
+ mqtt_relay.set_power(False)
+ ctx.reply(ctx.lang('done'))
+ notify(ctx.user, UserAction.OFF)
+
+
+@bot.handler(message='status')
+def status(ctx: bot.Context) -> None:
+ label = ctx.lang('enabled') if relay_state.enabled else ctx.lang('disabled')
+ if relay_state.ever_updated:
+ date_label = ''
+ today = datetime.date.today()
+ if today != relay_state.update_time.date():
+ yday = today - datetime.timedelta(days=1)
+ yyday = today - datetime.timedelta(days=2)
+ if yday == relay_state.update_time.date():
+ date_label = ctx.lang('date_yday')
+ elif yyday == relay_state.update_time.date():
+ date_label = ctx.lang('date_yyday')
+ else:
+ date_label = relay_state.update_time.strftime('%d.%m.%Y')
+ date_label += ' '
+ date_label += ctx.lang('date_at') + ' '
+ date_label += relay_state.update_time.strftime('%H:%M')
+ label += ctx.lang('status_updated', date_label)
+ ctx.reply(label)
+
+
+def start(ctx: bot.Context) -> None:
+ if ctx.user_id in config['bot']['users'] or ctx.user_id in config['bot']['admin_users']:
+ ctx.reply(ctx.lang('start_message'))
+ else:
+ buttons = [
+ [ctx.lang('send_access_request')]
+ ]
+ ctx.reply(ctx.lang('start_message_no_access'), markup=ReplyKeyboardMarkup(buttons, one_time_keyboard=False))
+
+
+@bot.exceptionhandler
+def exception_handler(e: Exception, ctx: bot.Context) -> bool:
+ return False
+
+
+@bot.defaultreplymarkup
+def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
+ buttons = [[ctx.lang('enable'), ctx.lang('disable')], [ctx.lang('status')]]
+ if ctx.user_id in config['bot']['admin_users']:
+ buttons.append([ctx.lang('management')])
+ return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
+
+
+if __name__ == '__main__':
+ mqtt_relay = MQTTRelay(home_id=config['mqtt']['home_id'],
+ secret=config['mqtt']['relay']['secret'])
+ mqtt_relay.set_message_callback(on_mqtt_message)
+ mqtt_relay.configure_tls()
+ mqtt_relay.connect_and_loop(loop_forever=False)
+
+ # bot.enable_logging(BotType.PUMP_MQTT)
+ bot.run(start_handler=start)
+
+ mqtt_relay.disconnect()