#!/usr/bin/env python3 from __future__ import annotations import logging import locale import queue import time import threading import paho.mqtt.client as mqtt from home.bot import Wrapper, Context, text_filter, handlermethod from home.api.types import BotType from home.mqtt import MQTTBase from home.config import config from polaris import ( Kettle, PowerType, DeviceListener, IncomingMessageListener, ConnectionStatusListener, ConnectionStatus ) import polaris.protocol as kettle_proto from typing import Optional, Tuple, List from collections import namedtuple from functools import partial from datetime import datetime from abc import abstractmethod from telegram.error import TelegramError from telegram import ( ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton, Message ) from telegram.ext import ( CallbackQueryHandler, MessageHandler, CommandHandler ) logger = logging.getLogger(__name__) kc: Optional[KettleController] = None bot: Optional[Wrapper] = None RenderedContent = Tuple[str, Optional[InlineKeyboardMarkup]] tasks_lock = threading.Lock() class KettleInfoListener: @abstractmethod def info_updated(self, field: str): pass # class that holds data coming from the kettle over mqtt class KettleInfo: update_time: int _mode: Optional[PowerType] _temperature: Optional[int] _target_temperature: Optional[int] _update_listener: KettleInfoListener def __init__(self, update_listener: KettleInfoListener): self.update_time = 0 self._mode = None self._temperature = None self._target_temperature = None self._update_listener = update_listener def _update(self, field: str): self.update_time = int(time.time()) if self._update_listener: self._update_listener.info_updated(field) @property def temperature(self) -> int: return self._temperature @temperature.setter def temperature(self, value: int): self._temperature = value self._update('temperature') @property def mode(self) -> PowerType: return self._mode @mode.setter def mode(self, value: PowerType): self._mode = value self._update('mode') @property def target_temperature(self) -> int: return self._target_temperature @target_temperature.setter def target_temperature(self, value: int): self._target_temperature = value self._update('target_temperature') class KettleController(threading.Thread, MQTTBase, DeviceListener, IncomingMessageListener, KettleInfoListener, ConnectionStatusListener): kettle: Kettle info: KettleInfo _logger: logging.Logger _stopped: bool _restart_server_at: int _lock: threading.Lock _info_lock: threading.Lock _accumulated_updates: dict _info_flushed_time: float _mqtt_root_topic: str _muts: List[MessageUpdatingTarget] def __init__(self): # basic setup MQTTBase.__init__(self, clean_session=False) threading.Thread.__init__(self) self._logger = logging.getLogger(self.__class__.__name__) self.kettle = Kettle(mac=config['kettle']['mac'], device_token=config['kettle']['token']) self.kettle_reconnect() # info self.info = KettleInfo(update_listener=self) self._accumulated_updates = {} self._info_flushed_time = 0 # mqtt self._mqtt_root_topic = '/polaris/6/'+config['kettle']['token']+'/#' self.connect_and_loop(loop_forever=False) # thread loop related self._stopped = False # self._lock = threading.Lock() self._info_lock = threading.Lock() self._restart_server_at = 0 # bot self._muts = [] self._muts_lock = threading.Lock() self.start() def kettle_reconnect(self): self.kettle.discover(wait=False, listener=self) def stop_all(self): self.kettle.stop_all() self._stopped = True def add_updating_message(self, mut: MessageUpdatingTarget): with self._muts_lock: for m in self._muts: if m.user_id == m.user_id and m.user_did_turn_on() or m.user_did_turn_on() != mut.user_did_turn_on(): m.delete() self._muts.append(mut) # --------------------- # threading.Thread impl def run(self): while not self._stopped: # do_restart_srv = False # # with self._lock: # if self._restart_server_at != 0 and time.time() - self._restart_server_at: # self._restart_server_at = 0 # do_restart_srv = True # # if do_restart_srv: # self.kettle_connect() updates = [] deletions = [] with self._muts_lock and self._info_lock: # self._logger.debug('muts size: '+str(len(self._muts))) if self._muts and self._accumulated_updates and (self._info_flushed_time == 0 or time.time() - self._info_flushed_time >= 1): forget = [] deletions = [] for mut in self._muts: upd = mut.update( mode=self.info.mode, current_temp=self.info.temperature, target_temp=self.info.target_temperature) if upd.finished or upd.delete: forget.append(mut) if upd.delete: deletions.append(upd) elif upd.changed: updates.append(upd) if forget: for mut in forget: self._logger.debug(f'loop: removing mut {mut}') self._muts.remove(mut) self._info_flushed_time = time.time() self._accumulated_updates = {} for upd in updates: self._logger.debug(f'loop: got update: {upd}') try: bot.edit_message_text(upd.user_id, upd.message_id, text=upd.html, reply_markup=upd.markup) except TelegramError as exc: self._logger.error(f'loop: edit_message_text failed for update: {upd}') self._logger.exception(exc) for upd in deletions: self._logger.debug(f'loop: got deletion: {upd}') try: bot.delete_message(upd.user_id, upd.message_id) except TelegramError as exc: self._logger.error(f'loop: delete_message failed for update: {upd}') self._logger.exception(exc) time.sleep(0.5) # ------------------- # DeviceListener impl def device_updated(self): self._logger.info(f'device updated: {self.kettle.device.si}') self.kettle.start_server_if_needed(incoming_message_listener=self, connection_status_listener=self) # ----------------------- # KettleInfoListener impl def info_updated(self, field: str): with self._info_lock: newval = getattr(self.info, field) self._logger.debug(f'info_updated: updated {field}, new value is {newval}') self._accumulated_updates[field] = newval # ---------------------------- # IncomingMessageListener impl def incoming_message(self, message: kettle_proto.Message) -> Optional[kettle_proto.Message]: self._logger.info(f'incoming message: {message}') if isinstance(message, kettle_proto.ModeMessage): self.info.mode = message.pt elif isinstance(message, kettle_proto.CurrentTemperatureMessage): self.info.temperature = message.current_temperature elif isinstance(message, kettle_proto.TargetTemperatureMessage): self.info.target_temperature = message.temperature return kettle_proto.AckMessage() # ----------------------------- # ConnectionStatusListener impl def connection_status_updated(self, status: ConnectionStatus): self._logger.info(f'connection status updated: {status}') if status == ConnectionStatus.DISCONNECTED: self.kettle.stop_all() self.kettle_reconnect() # ------------- # MQTTBase impl def on_connect(self, client: mqtt.Client, userdata, flags, rc): super().on_connect(client, userdata, flags, rc) client.subscribe(self._mqtt_root_topic, qos=1) self._logger.info(f'subscribed to {self._mqtt_root_topic}') def on_message(self, client: mqtt.Client, userdata, msg): try: topic = msg.topic[len(self._mqtt_root_topic)-2:] pld = msg.payload.decode() self._logger.debug(f'mqtt: on message: topic={topic} pld={pld}') if topic == 'state/sensor/temperature': self.info.temperature = int(float(pld)) elif topic == 'state/mode': self.info.mode = PowerType(int(pld)) elif topic == 'state/temperature': self.info.target_temperature = int(float(pld)) except Exception as e: self._logger.exception(str(e)) class Renderer: @classmethod def index(cls, ctx: Context) -> RenderedContent: html = f'{ctx.lang("settings")}\n\n' html += ctx.lang('select_place') return html, None @classmethod def status(cls, ctx: Context, connected: bool, mode: PowerType, current_temp: int, target_temp: int, update_time: int) -> RenderedContent: if not connected: return cls.not_connected(ctx) else: # power status if mode != PowerType.OFF: html = ctx.lang('status_on', target_temp) else: html = ctx.lang('status_off') # current temperature html += '\n' html += ctx.lang('status_current_temp', current_temp) # updated on html += '\n' html += cls.updated(ctx, update_time) return html, None @classmethod def turned_on(cls, ctx: Context, target_temp: int, current_temp: int, mode: PowerType, update_time: Optional[int] = None, reached=False, no_keyboard=False) -> RenderedContent: if mode == PowerType.OFF and not reached: html = ctx.lang('enabling') else: if not reached: emoji = '♨️' if current_temp <= 90 else '🔥' html = ctx.lang('enabled', emoji, target_temp) # current temperature html += '\n' html += ctx.lang('status_current_temp', current_temp) else: html = ctx.lang('enabled_reached', current_temp) # updated on if not reached and update_time is not None: html += '\n' html += cls.updated(ctx, update_time) return html, None if no_keyboard else cls.wait_buttons(ctx) @classmethod def turned_off(cls, ctx: Context, mode: PowerType, update_time: Optional[int] = None, reached=False, no_keyboard=False) -> RenderedContent: if mode != PowerType.OFF: html = ctx.lang('disabling') else: html = ctx.lang('disabled') # updated on if not reached and update_time is not None: html += '\n' html += cls.updated(ctx, update_time) return html, None if no_keyboard else cls.wait_buttons(ctx) @classmethod def not_connected(cls, ctx: Context) -> RenderedContent: return ctx.lang('status_not_connected'), None @classmethod def smth_went_wrong(cls, ctx: Context) -> RenderedContent: html = ctx.lang('smth_went_wrong') return html, None @classmethod def updated(cls, ctx: Context, update_time: int): locale_bak = locale.getlocale(locale.LC_TIME) locale.setlocale(locale.LC_TIME, 'ru_RU.UTF-8' if ctx.user_lang == 'ru' else 'en_US.UTF-8') dt = datetime.fromtimestamp(update_time) html = ctx.lang('status_update_time', dt.strftime(ctx.lang('status_update_time_fmt'))) locale.setlocale(locale.LC_TIME, locale_bak) return html @classmethod def wait_buttons(cls, ctx: Context): return InlineKeyboardMarkup([ [ InlineKeyboardButton(ctx.lang('please_wait'), callback_data='wait') ] ]) def run_tasks(tasks: queue.SimpleQueue, done: callable): def next_task(r: Optional[kettle_proto.MessageResponse]): if r is not None: try: assert r is not False, 'server error' except AssertionError as exc: logger.exception(exc) tasks_lock.release() return done(False) if not tasks.empty(): task = tasks.get() args = task[1:] args.append(next_task) f = getattr(kc.kettle, task[0]) f(*args) else: tasks_lock.release() return done(True) tasks_lock.acquire() next_task(None) MUTUpdate = namedtuple('MUTUpdate', 'message_id, user_id, finished, changed, delete, html, markup') class MessageUpdatingTarget: ctx: Context message: Message user_target_temp: Optional[int] user_enabled_power_mode: PowerType initial_power_mode: PowerType need_to_delete: bool rendered_content: Optional[RenderedContent] def __init__(self, ctx: Context, message: Message, user_enabled_power_mode: PowerType, initial_power_mode: PowerType, user_target_temp: Optional[int] = None): self.ctx = ctx self.message = message self.initial_power_mode = initial_power_mode self.user_enabled_power_mode = user_enabled_power_mode self.ignore_pm = initial_power_mode is PowerType.OFF and self.user_did_turn_on() self.user_target_temp = user_target_temp self.need_to_delete = False self.rendered_content = None self.last_reported_temp = None def set_rendered_content(self, content: RenderedContent): self.rendered_content = content def rendered_content_changed(self, content: RenderedContent) -> bool: return content != self.rendered_content def update(self, mode: PowerType, current_temp: int, target_temp: int) -> MUTUpdate: # determine whether status updating is finished finished = False reached = False if self.ignore_pm: if mode != PowerType.OFF: self.ignore_pm = False elif mode == PowerType.OFF: reached = True if self.user_did_turn_on(): # when target is 100 degrees, this kettle sometimes turns off at 91, sometimes at 95, sometimes at 98. # it's totally unpredictable, so in this case, we keep updating the message until it reaches at least 97 # degrees, or if temperature started dropping. if self.user_target_temp < 100 \ or current_temp >= self.user_target_temp - 3 \ or current_temp < self.last_reported_temp: finished = True else: finished = True self.last_reported_temp = current_temp # render message if self.user_did_turn_on(): rc = Renderer.turned_on(self.ctx, target_temp=target_temp, current_temp=current_temp, mode=mode, reached=reached, no_keyboard=finished) else: rc = Renderer.turned_off(self.ctx, mode=mode, reached=reached, no_keyboard=finished) changed = self.rendered_content_changed(rc) update = MUTUpdate(message_id=self.message.message_id, user_id=self.ctx.user_id, finished=finished, changed=changed, delete=self.need_to_delete, html=rc[0], markup=rc[1]) if changed: self.set_rendered_content(rc) return update def user_did_turn_on(self) -> bool: return self.user_enabled_power_mode in (PowerType.ON, PowerType.CUSTOM) def delete(self): self.need_to_delete = True @property def user_id(self) -> int: return self.ctx.user_id class KettleBot(Wrapper): def __init__(self): super().__init__() self.lang.ru( start_message="Выберите команду на клавиатуре", unknown_command="Неизвестная команда", unexpected_callback_data="Ошибка: неверные данные", enable_70="♨️ 70 °C", enable_80="♨️ 80 °C", enable_90="♨️ 90 °C", enable_100="🔥 100 °C", disable="❌ Выключить", server_error="Ошибка сервера", # /status status_not_connected="😟 Связь с чайником не установлена", status_on="✅ Чайник включён (до %d °C)", status_off="❌ Чайник выключен", status_current_temp="Сейчас: %d °C", status_update_time="Обновлено %s", status_update_time_fmt="%d %b в %H:%M:%S", # enable enabling="💤 Чайник включается...", disabling="💤 Чайник выключается...", enabled="%s Чайник включён.\nЦель: %d °C", enabled_reached="✅ Готово! Чайник вскипел, температура %d °C.", disabled="✅ Чайник выключен.", please_wait="⏳ Ожидайте..." ) self.lang.en( start_message="Select command on the keyboard", unknown_command="Unknown command", unexpected_callback_data="Unexpected callback data", enable_70="♨️ 70 °C", enable_80="♨️ 80 °C", enable_90="♨️ 90 °C", enable_100="🔥 100 °C", disable="❌ Turn OFF", server_error="Server error", # /status not_connected="😟 Connection has not been established", status_on="✅ Turned ON! Target: %d °C", status_off="❌ Turned OFF", status_current_temp="Now: %d °C", status_update_time="Updated on %s", status_update_time_fmt="%b %d, %Y at %H:%M:%S", # enable enabling="💤 Turning on...", disabling="💤 Turning off...", enabled="%s The kettle is turned ON.\nTarget: %d °C", enabled_reached="✅ It's done! The kettle has boiled, the temperature is %d °C.", disabled="✅ The kettle is turned OFF.", please_wait="⏳ Please wait..." ) # commands self.add_handler(CommandHandler('status', self.status)) # messages for temp in (70, 80, 90, 100): self.add_handler(MessageHandler(text_filter(self.lang.all(f'enable_{temp}')), self.wrap(partial(self.on, temp)))) self.add_handler(MessageHandler(text_filter(self.lang.all('disable')), self.off)) def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]: buttons = [ [ctx.lang(f'enable_{x}') for x in (70, 80, 90, 100)], [ctx.lang('disable')] ] return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) def on(self, temp: int, ctx: Context) -> None: if not kc.kettle.is_connected(): text, markup = Renderer.not_connected(ctx) ctx.reply(text, markup=markup) return tasks = queue.SimpleQueue() if temp == 100: power_mode = PowerType.ON else: power_mode = PowerType.CUSTOM tasks.put(['set_target_temperature', temp]) tasks.put(['set_power', power_mode]) def done(ok: bool): if not ok: html, markup = Renderer.smth_went_wrong(ctx) else: html, markup = Renderer.turned_on(ctx, target_temp=temp, current_temp=kc.info.temperature, mode=kc.info.mode) message = ctx.reply(html, markup=markup) logger.info(f'ctx.reply returned message: {message}') mut = MessageUpdatingTarget(ctx, message, initial_power_mode=kc.info.mode, user_enabled_power_mode=power_mode, user_target_temp=temp) mut.set_rendered_content((html, markup)) kc.add_updating_message(mut) run_tasks(tasks, done) @handlermethod def off(self, ctx: Context) -> None: if not kc.kettle.is_connected(): text, markup = Renderer.not_connected(ctx) ctx.reply(text, markup=markup) return def done(ok: bool): if not ok: html, markup = Renderer.smth_went_wrong(ctx) else: html, markup = Renderer.turned_off(ctx, mode=kc.info.mode) message = ctx.reply(html, markup=markup) logger.info(f'ctx.reply returned message: {message}') mut = MessageUpdatingTarget(ctx, message, initial_power_mode=kc.info.mode, user_enabled_power_mode=PowerType.OFF) mut.set_rendered_content((html, markup)) kc.add_updating_message(mut) tasks = queue.SimpleQueue() tasks.put(['set_power', PowerType.OFF]) run_tasks(tasks, done) @handlermethod def status(self, ctx: Context): text, markup = Renderer.status(ctx, connected=kc.kettle.is_connected(), mode=kc.info.mode, current_temp=kc.info.temperature, target_temp=kc.info.target_temperature, update_time=kc.info.update_time) return ctx.reply(text, markup=markup) if __name__ == '__main__': config.load('polaris_kettle_bot') kc = KettleController() bot = KettleBot() if 'api' in config: bot.enable_logging(BotType.POLARIS_KETTLE) bot.run() # bot library handles signals, so when sigterm or something like that happens, we should stop all other threads here kc.stop_all()