#!/usr/bin/env python3
from enum import Enum
from typing import Optional
from telegram import ReplyKeyboardMarkup, User
from time import time
from datetime import datetime
from home.config import config, is_development_mode
from home.telegram import bot
from home.telegram._botutil import user_any_name
from home.relay.sunxi_h3_client import RelayClient
from home.api.types import BotType
from home.mqtt import MqttNode, MqttWrapper, MqttPayload
from home.mqtt.module.relay import MqttPowerStatusPayload, MqttRelayModule
from home.mqtt.module.temphum import MqttTemphumDataPayload
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
config.load_app('pump_bot')
mqtt: Optional[MqttWrapper] = None
mqtt_node: Optional[MqttNode] = None
mqtt_relay_module: Optional[MqttRelayModule] = None
time_format = '%d.%m.%Y, %H:%M:%S'
watering_mcu_status = {
'last_time': 0,
'last_boot_time': 0,
'relay_opened': False,
'ambient_temp': 0.0,
'ambient_rh': 0.0,
}
bot.initialize()
bot.lang.ru(
start_message="Выберите команду на клавиатуре",
unknown_command="Неизвестная команда",
enable="Включить",
enable_silently="Включить тихо",
enabled="Насос включен ✅",
disable="Выключить",
disable_silently="Выключить тихо",
disabled="Насос выключен ❌",
start_watering="Включить полив",
stop_watering="Отключить полив",
status="Статус насоса",
watering_status="Статус полива",
done="Готово 👌",
sent="Команда отправлена",
user_action_notification='Пользователь %s %s насос.',
user_watering_notification='Пользователь %s %s полив.',
user_action_on="включил",
user_action_off="выключил",
user_action_watering_on="включил",
user_action_watering_off="выключил",
)
bot.lang.en(
start_message="Select command on the keyboard",
unknown_command="Unknown command",
enable="Turn ON",
enable_silently="Turn ON silently",
enabled="The pump is turned ON ✅",
disable="Turn OFF",
disable_silently="Turn OFF silently",
disabled="The pump is turned OFF ❌",
start_watering="Start watering",
stop_watering="Stop watering",
status="Pump status",
watering_status="Watering status",
done="Done 👌",
sent="Request sent",
user_action_notification='User %s turned the pump %s.',
user_watering_notification='User %s %s the watering.',
user_action_on="ON",
user_action_off="OFF",
user_action_watering_on="started",
user_action_watering_off="stopped",
)
class UserAction(Enum):
ON = 'on'
OFF = 'off'
WATERING_ON = 'watering_on'
WATERING_OFF = 'watering_off'
def get_relay() -> RelayClient:
relay = RelayClient(host=config['relay']['ip'], port=config['relay']['port'])
relay.connect()
return relay
def on(ctx: bot.Context, silent=False) -> None:
get_relay().on()
ctx.reply(ctx.lang('done'))
if not silent:
notify(ctx.user, UserAction.ON)
def off(ctx: bot.Context, silent=False) -> None:
get_relay().off()
ctx.reply(ctx.lang('done'))
if not silent:
notify(ctx.user, UserAction.OFF)
def watering_on(ctx: bot.Context) -> None:
mqtt_relay_module.switchpower(True, config.get('mqtt_water_relay.secret'))
ctx.reply(ctx.lang('sent'))
notify(ctx.user, UserAction.WATERING_ON)
def watering_off(ctx: bot.Context) -> None:
mqtt_relay_module.switchpower(False, config.get('mqtt_water_relay.secret'))
ctx.reply(ctx.lang('sent'))
notify(ctx.user, UserAction.WATERING_OFF)
def notify(user: User, action: UserAction) -> None:
notification_key = 'user_watering_notification' if action in (UserAction.WATERING_ON, UserAction.WATERING_OFF) else 'user_action_notification'
def text_getter(lang: str):
action_name = bot.lang.get(f'user_action_{action.value}', lang)
user_name = user_any_name(user)
return 'ℹ ' + bot.lang.get(notification_key, lang,
user.id, user_name, action_name)
bot.notify_all(text_getter, exclude=(user.id,))
@bot.handler(message='enable')
def enable_handler(ctx: bot.Context) -> None:
on(ctx)
@bot.handler(message='enable_silently')
def enable_s_handler(ctx: bot.Context) -> None:
on(ctx, True)
@bot.handler(message='disable')
def disable_handler(ctx: bot.Context) -> None:
off(ctx)
@bot.handler(message='start_watering')
def start_watering(ctx: bot.Context) -> None:
watering_on(ctx)
@bot.handler(message='stop_watering')
def stop_watering(ctx: bot.Context) -> None:
watering_off(ctx)
@bot.handler(message='disable_silently')
def disable_s_handler(ctx: bot.Context) -> None:
off(ctx, True)
@bot.handler(message='status')
def status(ctx: bot.Context) -> None:
ctx.reply(
ctx.lang('enabled') if get_relay().status() == 'on' else ctx.lang('disabled')
)
def _get_timestamp_as_string(timestamp: int) -> str:
if timestamp != 0:
return datetime.fromtimestamp(timestamp).strftime(time_format)
else:
return 'unknown'
@bot.handler(message='watering_status')
def watering_status(ctx: bot.Context) -> None:
buf = ''
if 0 < watering_mcu_status["last_time"] < time()-1800:
buf += 'WARNING! long time no reports from mcu! maybe something\'s wrong\n'
buf += f'last report time: {_get_timestamp_as_string(watering_mcu_status["last_time"])}\n'
if watering_mcu_status["last_boot_time"] != 0:
buf += f'boot time: {_get_timestamp_as_string(watering_mcu_status["last_boot_time"])}\n'
buf += 'relay opened: ' + ('yes' if watering_mcu_status['relay_opened'] else 'no') + '\n'
buf += f'ambient temp & humidity: {watering_mcu_status["ambient_temp"]} °C, {watering_mcu_status["ambient_rh"]}%'
ctx.reply(buf)
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = []
if ctx.user_id in config['bot']['silent_users']:
buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')])
buttons.append([ctx.lang('enable'), ctx.lang('disable'), ctx.lang('status')],)
buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering'), ctx.lang('watering_status')])
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
def mqtt_payload_callback(mqtt_node: MqttNode, payload: MqttPayload):
global watering_mcu_status
types_the_node_can_send = (
InitialDiagnosticsPayload,
DiagnosticsPayload,
MqttTemphumDataPayload,
MqttPowerStatusPayload
)
for cl in types_the_node_can_send:
if isinstance(payload, cl):
watering_mcu_status['last_time'] = int(time())
break
if isinstance(payload, InitialDiagnosticsPayload):
watering_mcu_status['last_boot_time'] = int(time())
elif isinstance(payload, MqttTemphumDataPayload):
watering_mcu_status['ambient_temp'] = payload.temp
watering_mcu_status['ambient_rh'] = payload.rh
elif isinstance(payload, MqttPowerStatusPayload):
watering_mcu_status['relay_opened'] = payload.opened
if __name__ == '__main__':
mqtt = MqttWrapper()
mqtt_node = MqttNode(node_id=config.get('mqtt_water_relay.node_id'))
if is_development_mode():
mqtt_node.load_module('diagnostics')
mqtt_node.load_module('temphum')
mqtt_relay_module = mqtt_node.load_module('relay')
mqtt_node.add_payload_callback(mqtt_payload_callback)
mqtt.configure_tls()
mqtt.connect_and_loop(loop_forever=False)
bot.enable_logging(BotType.PUMP)
bot.run()
try:
mqtt.disconnect()
except:
pass