aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2023-09-17 04:38:26 +0300
committerEvgeny Zinoviev <me@ch1p.io>2023-09-17 04:38:26 +0300
commit9b78ccca3546f93955571f4a20a44a1739e718b8 (patch)
tree8a4f50e7a56163dfcda06ab34deb81490448794c
parenta32e4a1629a20026c364059c7bbaec1dbd64353b (diff)
bin: add lugobaya_pump_mqtt_bot test app
-rwxr-xr-xbin/lugovaya_pump_mqtt_bot.py201
1 files changed, 201 insertions, 0 deletions
diff --git a/bin/lugovaya_pump_mqtt_bot.py b/bin/lugovaya_pump_mqtt_bot.py
new file mode 100755
index 0000000..72a2e87
--- /dev/null
+++ b/bin/lugovaya_pump_mqtt_bot.py
@@ -0,0 +1,201 @@
+#!/usr/bin/env python3
+import datetime
+import __py_include
+
+from enum import Enum
+from typing import Optional
+from telegram import ReplyKeyboardMarkup, User
+
+from homekit.config import config, AppConfigUnit
+from homekit.telegram import bot
+from homekit.telegram.config import TelegramBotConfig
+from homekit.telegram._botutil import user_any_name
+from homekit.mqtt import MqttNode, MqttPayload, MqttNodesConfig, MqttWrapper
+from homekit.mqtt.module.relay import MqttRelayState, MqttRelayModule
+from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
+
+
+class LugovayaPumpMqttBotConfig(TelegramBotConfig, AppConfigUnit):
+ NAME = 'lugovaya_pump_mqtt_bot'
+
+ @classmethod
+ def schema(cls) -> Optional[dict]:
+ return {
+ **TelegramBotConfig.schema(),
+ 'relay_node_id': {
+ 'type': 'string',
+ 'required': True
+ },
+ }
+
+ @staticmethod
+ def custom_validator(data):
+ relay_node_names = MqttNodesConfig().get_nodes(filters=('relay',), only_names=True)
+ if data['relay_node_id'] not in relay_node_names:
+ raise ValueError('unknown relay node "%s"' % (data['relay_node_id'],))
+
+
+config.load_app(LugovayaPumpMqttBotConfig)
+
+bot.initialize()
+bot.lang.ru(
+ start_message="Выберите команду на клавиатуре",
+ start_message_no_access="Доступ запрещён. Вы можете отправить заявку на получение доступа.",
+ unknown_command="Неизвестная команда",
+ send_access_request="Отправить заявку",
+ management="Админка",
+
+ enable="Включить",
+ enabled="Включен ✅",
+
+ disable="Выключить",
+ disabled="Выключен ❌",
+
+ status="Статус",
+ status_updated=' (обновлено %s)',
+
+ done="Готово 👌",
+ user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.',
+ user_action_on="включил",
+ user_action_off="выключил",
+ date_yday="вчера",
+ date_yyday="позавчера",
+ date_at="в"
+)
+bot.lang.en(
+ start_message="Select command on the keyboard",
+ start_message_no_access="You have no access.",
+ unknown_command="Unknown command",
+ send_access_request="Send request",
+ management="Admin options",
+
+ enable="Turn ON",
+ enable_silently="Turn ON silently",
+ enabled="Turned ON ✅",
+
+ disable="Turn OFF",
+ disable_silently="Turn OFF silently",
+ disabled="Turned OFF ❌",
+
+ status="Status",
+ status_updated=' (updated %s)',
+
+ done="Done 👌",
+ user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.',
+ user_action_on="ON",
+ user_action_off="OFF",
+
+ date_yday="yesterday",
+ date_yyday="the day before yesterday",
+ date_at="at"
+)
+
+
+mqtt: MqttWrapper
+relay_state = MqttRelayState()
+relay_module: MqttRelayModule
+
+
+class UserAction(Enum):
+ ON = 'on'
+ OFF = 'off'
+
+
+# def on_mqtt_message(home_id, message: MqttPayload):
+# if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
+# kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
+# if isinstance(message, InitialDiagnosticsPayload):
+# kwargs['fw_version'] = message.fw_version
+# relay_state.update(**kwargs)
+
+
+async def notify(user: User, action: UserAction) -> None:
+ def text_getter(lang: str):
+ action_name = bot.lang.get(f'user_action_{action.value}', lang)
+ user_name = user_any_name(user)
+ return 'ℹ ' + bot.lang.get('user_action_notification', lang,
+ user.id, user_name, action_name)
+
+ await bot.notify_all(text_getter, exclude=(user.id,))
+
+
+@bot.handler(message='enable')
+async def enable_handler(ctx: bot.Context) -> None:
+ relay_module.switchpower(True)
+ await ctx.reply(ctx.lang('done'))
+ await notify(ctx.user, UserAction.ON)
+
+
+@bot.handler(message='disable')
+async def disable_handler(ctx: bot.Context) -> None:
+ relay_module.switchpower(False)
+ await ctx.reply(ctx.lang('done'))
+ await notify(ctx.user, UserAction.OFF)
+
+
+@bot.handler(message='status')
+async def status(ctx: bot.Context) -> None:
+ label = ctx.lang('enabled') if relay_state.enabled else ctx.lang('disabled')
+ if relay_state.ever_updated:
+ date_label = ''
+ today = datetime.date.today()
+ if today != relay_state.update_time.date():
+ yday = today - datetime.timedelta(days=1)
+ yyday = today - datetime.timedelta(days=2)
+ if yday == relay_state.update_time.date():
+ date_label = ctx.lang('date_yday')
+ elif yyday == relay_state.update_time.date():
+ date_label = ctx.lang('date_yyday')
+ else:
+ date_label = relay_state.update_time.strftime('%d.%m.%Y')
+ date_label += ' '
+ date_label += ctx.lang('date_at') + ' '
+ date_label += relay_state.update_time.strftime('%H:%M')
+ label += ctx.lang('status_updated', date_label)
+ await ctx.reply(label)
+
+
+async def start(ctx: bot.Context) -> None:
+ if ctx.user_id in config['bot']['users']:
+ await ctx.reply(ctx.lang('start_message'))
+ else:
+ buttons = [
+ [ctx.lang('send_access_request')]
+ ]
+ await ctx.reply(ctx.lang('start_message_no_access'),
+ markup=ReplyKeyboardMarkup(buttons, one_time_keyboard=False))
+
+
+@bot.exceptionhandler
+def exception_handler(e: Exception, ctx: bot.Context) -> bool:
+ return False
+
+
+@bot.defaultreplymarkup
+def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
+ buttons = [[ctx.lang('enable'), ctx.lang('disable')], [ctx.lang('status')]]
+ # if ctx.user_id in config['bot']['admin_users']:
+ # buttons.append([ctx.lang('management')])
+ return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
+
+
+node_data = MqttNodesConfig().get_node(config.app_config['relay_node_id'])
+
+mqtt = MqttWrapper(client_id='lugovaya_pump_mqtt_bot')
+mqtt_node = MqttNode(node_id=config.app_config['relay_node_id'],
+ node_secret=node_data['password'])
+module_kwargs = {}
+try:
+ if node_data['relay']['legacy_topics']:
+ module_kwargs['legacy_topics'] = True
+except KeyError:
+ pass
+relay_module = mqtt_node.load_module('relay', **module_kwargs)
+mqtt_node.add_payload_callback(on_mqtt_message)
+mqtt.add_node(mqtt_node)
+
+mqtt.connect_and_loop(loop_forever=False)
+
+bot.run(start_handler=start)
+
+mqtt.disconnect()