#!/usr/bin/env python3
import datetime
from enum import Enum
from typing import Optional
from telegram import ReplyKeyboardMarkup, User
from home.config import config
from home.telegram import bot
from home.telegram._botutil import user_any_name
from home.mqtt import MqttNode, MqttPayload
from home.mqtt.module.relay import MqttRelayState
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
config.load_app('pump_mqtt_bot')
bot.initialize()
bot.lang.ru(
start_message="Выберите команду на клавиатуре",
start_message_no_access="Доступ запрещён. Вы можете отправить заявку на получение доступа.",
unknown_command="Неизвестная команда",
send_access_request="Отправить заявку",
management="Админка",
enable="Включить",
enabled="Включен ✅",
disable="Выключить",
disabled="Выключен ❌",
status="Статус",
status_updated=' (обновлено %s)',
done="Готово 👌",
user_action_notification='Пользователь %s %s насос.',
user_action_on="включил",
user_action_off="выключил",
date_yday="вчера",
date_yyday="позавчера",
date_at="в"
)
bot.lang.en(
start_message="Select command on the keyboard",
start_message_no_access="You have no access.",
unknown_command="Unknown command",
send_access_request="Send request",
management="Admin options",
enable="Turn ON",
enable_silently="Turn ON silently",
enabled="Turned ON ✅",
disable="Turn OFF",
disable_silently="Turn OFF silently",
disabled="Turned OFF ❌",
status="Status",
status_updated=' (updated %s)',
done="Done 👌",
user_action_notification='User %s turned the pump %s.',
user_action_on="ON",
user_action_off="OFF",
date_yday="yesterday",
date_yyday="the day before yesterday",
date_at="at"
)
mqtt: Optional[MqttNode] = None
relay_state = MqttRelayState()
class UserAction(Enum):
ON = 'on'
OFF = 'off'
def on_mqtt_message(home_id, message: MqttPayload):
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
if isinstance(message, InitialDiagnosticsPayload):
kwargs['fw_version'] = message.fw_version
relay_state.update(**kwargs)
def notify(user: User, action: UserAction) -> None:
def text_getter(lang: str):
action_name = bot.lang.get(f'user_action_{action.value}', lang)
user_name = user_any_name(user)
return 'ℹ ' + bot.lang.get('user_action_notification', lang,
user.id, user_name, action_name)
bot.notify_all(text_getter, exclude=(user.id,))
@bot.handler(message='enable')
def enable_handler(ctx: bot.Context) -> None:
mqtt.set_power(config['mqtt']['home_id'], True)
ctx.reply(ctx.lang('done'))
notify(ctx.user, UserAction.ON)
@bot.handler(message='disable')
def disable_handler(ctx: bot.Context) -> None:
mqtt.set_power(config['mqtt']['home_id'], False)
ctx.reply(ctx.lang('done'))
notify(ctx.user, UserAction.OFF)
@bot.handler(message='status')
def status(ctx: bot.Context) -> None:
label = ctx.lang('enabled') if relay_state.enabled else ctx.lang('disabled')
if relay_state.ever_updated:
date_label = ''
today = datetime.date.today()
if today != relay_state.update_time.date():
yday = today - datetime.timedelta(days=1)
yyday = today - datetime.timedelta(days=2)
if yday == relay_state.update_time.date():
date_label = ctx.lang('date_yday')
elif yyday == relay_state.update_time.date():
date_label = ctx.lang('date_yyday')
else:
date_label = relay_state.update_time.strftime('%d.%m.%Y')
date_label += ' '
date_label += ctx.lang('date_at') + ' '
date_label += relay_state.update_time.strftime('%H:%M')
label += ctx.lang('status_updated', date_label)
ctx.reply(label)
def start(ctx: bot.Context) -> None:
if ctx.user_id in config['bot']['users'] or ctx.user_id in config['bot']['admin_users']:
ctx.reply(ctx.lang('start_message'))
else:
buttons = [
[ctx.lang('send_access_request')]
]
ctx.reply(ctx.lang('start_message_no_access'), markup=ReplyKeyboardMarkup(buttons, one_time_keyboard=False))
@bot.exceptionhandler
def exception_handler(e: Exception, ctx: bot.Context) -> bool:
return False
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = [[ctx.lang('enable'), ctx.lang('disable')], [ctx.lang('status')]]
if ctx.user_id in config['bot']['admin_users']:
buttons.append([ctx.lang('management')])
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
if __name__ == '__main__':
mqtt = MqttRelay(devices=MqttEspDevice(id=config['mqtt']['home_id'],
secret=config['mqtt']['home_secret']))
mqtt.set_message_callback(on_mqtt_message)
mqtt.connect_and_loop(loop_forever=False)
# bot.enable_logging(BotType.PUMP_MQTT)
bot.run(start_handler=start)
mqtt.disconnect()