#!/usr/bin/env python3 from enum import Enum from typing import Optional from telegram import ReplyKeyboardMarkup, User from time import time from datetime import datetime from home.config import config, is_development_mode from home.telegram import bot from home.telegram._botutil import user_any_name from home.relay.sunxi_h3_client import RelayClient from home.api.types import BotType from home.mqtt import MqttNode, MqttWrapper, MqttPayload from home.mqtt.module.relay import MqttPowerStatusPayload, MqttRelayModule from home.mqtt.module.temphum import MqttTemphumDataPayload from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload config.load_app('pump_bot') mqtt: Optional[MqttWrapper] = None mqtt_node: Optional[MqttNode] = None mqtt_relay_module: Optional[MqttRelayModule] = None time_format = '%d.%m.%Y, %H:%M:%S' watering_mcu_status = { 'last_time': 0, 'last_boot_time': 0, 'relay_opened': False, 'ambient_temp': 0.0, 'ambient_rh': 0.0, } bot.initialize() bot.lang.ru( start_message="Выберите команду на клавиатуре", unknown_command="Неизвестная команда", enable="Включить", enable_silently="Включить тихо", enabled="Насос включен ✅", disable="Выключить", disable_silently="Выключить тихо", disabled="Насос выключен ❌", start_watering="Включить полив", stop_watering="Отключить полив", status="Статус насоса", watering_status="Статус полива", done="Готово 👌", sent="Команда отправлена", user_action_notification='Пользователь %s %s насос.', user_watering_notification='Пользователь %s %s полив.', user_action_on="включил", user_action_off="выключил", user_action_watering_on="включил", user_action_watering_off="выключил", ) bot.lang.en( start_message="Select command on the keyboard", unknown_command="Unknown command", enable="Turn ON", enable_silently="Turn ON silently", enabled="The pump is turned ON ✅", disable="Turn OFF", disable_silently="Turn OFF silently", disabled="The pump is turned OFF ❌", start_watering="Start watering", stop_watering="Stop watering", status="Pump status", watering_status="Watering status", done="Done 👌", sent="Request sent", user_action_notification='User %s turned the pump %s.', user_watering_notification='User %s %s the watering.', user_action_on="ON", user_action_off="OFF", user_action_watering_on="started", user_action_watering_off="stopped", ) class UserAction(Enum): ON = 'on' OFF = 'off' WATERING_ON = 'watering_on' WATERING_OFF = 'watering_off' def get_relay() -> RelayClient: relay = RelayClient(host=config['relay']['ip'], port=config['relay']['port']) relay.connect() return relay def on(ctx: bot.Context, silent=False) -> None: get_relay().on() ctx.reply(ctx.lang('done')) if not silent: notify(ctx.user, UserAction.ON) def off(ctx: bot.Context, silent=False) -> None: get_relay().off() ctx.reply(ctx.lang('done')) if not silent: notify(ctx.user, UserAction.OFF) def watering_on(ctx: bot.Context) -> None: mqtt_relay_module.switchpower(True, config.get('mqtt_water_relay.secret')) ctx.reply(ctx.lang('sent')) notify(ctx.user, UserAction.WATERING_ON) def watering_off(ctx: bot.Context) -> None: mqtt_relay_module.switchpower(False, config.get('mqtt_water_relay.secret')) ctx.reply(ctx.lang('sent')) notify(ctx.user, UserAction.WATERING_OFF) def notify(user: User, action: UserAction) -> None: notification_key = 'user_watering_notification' if action in (UserAction.WATERING_ON, UserAction.WATERING_OFF) else 'user_action_notification' def text_getter(lang: str): action_name = bot.lang.get(f'user_action_{action.value}', lang) user_name = user_any_name(user) return 'ℹ ' + bot.lang.get(notification_key, lang, user.id, user_name, action_name) bot.notify_all(text_getter, exclude=(user.id,)) @bot.handler(message='enable') def enable_handler(ctx: bot.Context) -> None: on(ctx) @bot.handler(message='enable_silently') def enable_s_handler(ctx: bot.Context) -> None: on(ctx, True) @bot.handler(message='disable') def disable_handler(ctx: bot.Context) -> None: off(ctx) @bot.handler(message='start_watering') def start_watering(ctx: bot.Context) -> None: watering_on(ctx) @bot.handler(message='stop_watering') def stop_watering(ctx: bot.Context) -> None: watering_off(ctx) @bot.handler(message='disable_silently') def disable_s_handler(ctx: bot.Context) -> None: off(ctx, True) @bot.handler(message='status') def status(ctx: bot.Context) -> None: ctx.reply( ctx.lang('enabled') if get_relay().status() == 'on' else ctx.lang('disabled') ) def _get_timestamp_as_string(timestamp: int) -> str: if timestamp != 0: return datetime.fromtimestamp(timestamp).strftime(time_format) else: return 'unknown' @bot.handler(message='watering_status') def watering_status(ctx: bot.Context) -> None: buf = '' if 0 < watering_mcu_status["last_time"] < time()-1800: buf += 'WARNING! long time no reports from mcu! maybe something\'s wrong\n' buf += f'last report time: {_get_timestamp_as_string(watering_mcu_status["last_time"])}\n' if watering_mcu_status["last_boot_time"] != 0: buf += f'boot time: {_get_timestamp_as_string(watering_mcu_status["last_boot_time"])}\n' buf += 'relay opened: ' + ('yes' if watering_mcu_status['relay_opened'] else 'no') + '\n' buf += f'ambient temp & humidity: {watering_mcu_status["ambient_temp"]} °C, {watering_mcu_status["ambient_rh"]}%' ctx.reply(buf) @bot.defaultreplymarkup def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: buttons = [] if ctx.user_id in config['bot']['silent_users']: buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')]) buttons.append([ctx.lang('enable'), ctx.lang('disable'), ctx.lang('status')],) buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering'), ctx.lang('watering_status')]) return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) def mqtt_payload_callback(mqtt_node: MqttNode, payload: MqttPayload): global watering_mcu_status types_the_node_can_send = ( InitialDiagnosticsPayload, DiagnosticsPayload, MqttTemphumDataPayload, MqttPowerStatusPayload ) for cl in types_the_node_can_send: if isinstance(payload, cl): watering_mcu_status['last_time'] = int(time()) break if isinstance(payload, InitialDiagnosticsPayload): watering_mcu_status['last_boot_time'] = int(time()) elif isinstance(payload, MqttTemphumDataPayload): watering_mcu_status['ambient_temp'] = payload.temp watering_mcu_status['ambient_rh'] = payload.rh elif isinstance(payload, MqttPowerStatusPayload): watering_mcu_status['relay_opened'] = payload.opened if __name__ == '__main__': mqtt = MqttWrapper() mqtt_node = MqttNode(node_id=config.get('mqtt_water_relay.node_id')) if is_development_mode(): mqtt_node.load_module('diagnostics') mqtt_node.load_module('temphum') mqtt_relay_module = mqtt_node.load_module('relay') mqtt_node.add_payload_callback(mqtt_payload_callback) mqtt.connect_and_loop(loop_forever=False) bot.enable_logging(BotType.PUMP) bot.run() try: mqtt.disconnect() except: pass