diff options
Diffstat (limited to 'src/polaris_kettle_bot.py')
-rwxr-xr-x | src/polaris_kettle_bot.py | 746 |
1 files changed, 0 insertions, 746 deletions
diff --git a/src/polaris_kettle_bot.py b/src/polaris_kettle_bot.py deleted file mode 100755 index 088707d..0000000 --- a/src/polaris_kettle_bot.py +++ /dev/null @@ -1,746 +0,0 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -import logging -import locale -import queue -import time -import threading -import paho.mqtt.client as mqtt - -from home.telegram import bot -from home.api.types import BotType -from home.mqtt import MqttBase -from home.config import config -from home.util import chunks -from syncleo import ( - Kettle, - PowerType, - DeviceListener, - IncomingMessageListener, - ConnectionStatusListener, - ConnectionStatus -) -import syncleo.protocol as kettle_proto -from typing import Optional, Tuple, List, Union -from collections import namedtuple -from functools import partial -from datetime import datetime -from abc import abstractmethod -from telegram.error import TelegramError -from telegram import ( - ReplyKeyboardMarkup, - InlineKeyboardMarkup, - InlineKeyboardButton, - Message -) -from telegram.ext import ( - CallbackQueryHandler, - MessageHandler, - CommandHandler -) - -logger = logging.getLogger(__name__) -config.load('polaris_kettle_bot') - -primary_choices = (70, 80, 90, 100) -all_choices = range( - config['kettle']['temp_min'], - config['kettle']['temp_max']+1, - config['kettle']['temp_step']) - -bot.initialize() -bot.lang.ru( - start_message="Выберите команду на клавиатуре:", - invalid_command="Неизвестная команда", - unexpected_callback_data="Ошибка: неверные данные", - disable="❌ Выключить", - server_error="Ошибка сервера", - back="🔙 Назад", - smth_went_wrong="😱 Что-то пошло не так", - - # /status - status_not_connected="😟 Связь с чайником не установлена", - status_on="🟢 Чайник <b>включён</b> (до <b>%d °C</b>)", - status_off="🔴 Чайник <b>выключен</b>", - status_current_temp="Сейчас: <b>%d °C</b>", - status_update_time="<i>Обновлено %s</i>", - status_update_time_fmt="%d %b в %H:%M:%S", - - # /temp - select_temperature="Выберите температуру:", - - # enable/disable - enabling="💤 Чайник включается...", - disabling="💤 Чайник выключается...", - enabled="🟢 Чайник <b>включён</b>.", - enabled_target="%s Цель: <b>%d °C</b>", - enabled_reached="✅ <b>Готово!</b> Чайник вскипел, температура <b>%d °C</b>.", - disabled="✅ Чайник <b>выключен</b>.", - please_wait="⏳ Ожидайте..." -) -bot.lang.en( - start_message="Select command on the keyboard:", - invalid_command="Unknown command", - unexpected_callback_data="Unexpected callback data", - disable="❌ Turn OFF", - server_error="Server error", - back="🔙 Back", - smth_went_wrong="😱 Something went wrong", - - # /status - status_not_connected="😟 No connection", - status_on="🟢 Turned <b>ON</b>! Target: <b>%d °C</b>", - status_off="🔴 Turned <b>OFF</b>", - status_current_temp="Now: <b>%d °C</b>", - status_update_time="<i>Updated on %s</i>", - status_update_time_fmt="%b %d, %Y at %H:%M:%S", - - # /temp - select_temperature="Select a temperature:", - - # enable/disable - enabling="💤 Turning on...", - disabling="💤 Turning off...", - enabled="🟢 The kettle is <b>turned ON</b>.", - enabled_target="%s Target: <b>%d °C</b>", - enabled_reached="✅ <b>Done</b>! The kettle has boiled, the temperature is <b>%d °C</b>.", - disabled="✅ The kettle is <b>turned OFF</b>.", - please_wait="⏳ Please wait..." -) - -kc: Optional[KettleController] = None -RenderedContent = Tuple[str, Optional[Union[InlineKeyboardMarkup, ReplyKeyboardMarkup]]] -tasks_lock = threading.Lock() - - -def run_tasks(tasks: queue.SimpleQueue, done: callable): - def next_task(r: Optional[kettle_proto.MessageResponse]): - if r is not None: - try: - assert r is not False, 'server error' - except AssertionError as exc: - logger.exception(exc) - tasks_lock.release() - return done(False) - - if not tasks.empty(): - task = tasks.get() - args = task[1:] - args.append(next_task) - f = getattr(kc.kettle, task[0]) - f(*args) - else: - tasks_lock.release() - return done(True) - - tasks_lock.acquire() - next_task(None) - - -def temperature_emoji(temp: int) -> str: - if temp > 90: - return '🔥' - elif temp >= 40: - return '♨️' - elif temp >= 35: - return '🌡' - else: - return '❄️' - - -class KettleInfoListener: - @abstractmethod - def info_updated(self, field: str): - pass - - -# class that holds data coming from the kettle over mqtt -class KettleInfo: - update_time: int - _mode: Optional[PowerType] - _temperature: Optional[int] - _target_temperature: Optional[int] - _update_listener: KettleInfoListener - - def __init__(self, update_listener: KettleInfoListener): - self.update_time = 0 - self._mode = None - self._temperature = None - self._target_temperature = None - self._update_listener = update_listener - - def _update(self, field: str): - self.update_time = int(time.time()) - if self._update_listener: - self._update_listener.info_updated(field) - - @property - def temperature(self) -> int: - return self._temperature - - @temperature.setter - def temperature(self, value: int): - self._temperature = value - self._update('temperature') - - @property - def mode(self) -> PowerType: - return self._mode - - @mode.setter - def mode(self, value: PowerType): - self._mode = value - self._update('mode') - - @property - def target_temperature(self) -> int: - return self._target_temperature - - @target_temperature.setter - def target_temperature(self, value: int): - self._target_temperature = value - self._update('target_temperature') - - -class KettleController(threading.Thread, - MqttBase, - DeviceListener, - IncomingMessageListener, - KettleInfoListener, - ConnectionStatusListener): - kettle: Kettle - info: KettleInfo - - _logger: logging.Logger - _stopped: bool - _restart_server_at: int - _lock: threading.Lock - _info_lock: threading.Lock - _accumulated_updates: dict - _info_flushed_time: float - _mqtt_root_topic: str - _muts: List[MessageUpdatingTarget] - - def __init__(self): - # basic setup - MqttBase.__init__(self, clean_session=False) - threading.Thread.__init__(self) - - self._logger = logging.getLogger(self.__class__.__name__) - - self.kettle = Kettle(mac=config['kettle']['mac'], - device_token=config['kettle']['token'], - read_timeout=config['kettle']['read_timeout']) - self.kettle_reconnect() - - # info - self.info = KettleInfo(update_listener=self) - self._accumulated_updates = {} - self._info_flushed_time = 0 - - # mqtt - self._mqtt_root_topic = '/polaris/6/'+config['kettle']['token']+'/#' - self.connect_and_loop(loop_forever=False) - - # thread loop related - self._stopped = False - # self._lock = threading.Lock() - self._info_lock = threading.Lock() - self._restart_server_at = 0 - - # bot - self._muts = [] - self._muts_lock = threading.Lock() - - self.start() - - def kettle_reconnect(self): - self.kettle.discover(wait=False, listener=self) - - def stop_all(self): - self.kettle.stop_all() - self._stopped = True - - def add_updating_message(self, mut: MessageUpdatingTarget): - with self._muts_lock: - for m in self._muts: - if m.user_id == m.user_id and m.user_did_turn_on() or m.user_did_turn_on() != mut.user_did_turn_on(): - m.delete() - self._muts.append(mut) - - # --------------------- - # threading.Thread impl - - def run(self): - while not self._stopped: - updates = [] - deletions = [] - forget = [] - - with self._muts_lock and self._info_lock: - if self._muts and self._accumulated_updates and (self._info_flushed_time == 0 or time.time() - self._info_flushed_time >= 1): - deletions = [] - - for mut in self._muts: - upd = mut.update( - mode=self.info.mode, - current_temp=self.info.temperature, - target_temp=self.info.target_temperature) - - if upd.finished or upd.delete: - forget.append(mut) - - if upd.delete: - deletions.append((mut, upd)) - - elif upd.changed: - updates.append((mut, upd)) - - self._info_flushed_time = time.time() - self._accumulated_updates = {} - - # edit messages - for mut, upd in updates: - self._logger.debug(f'loop: got update: {upd}') - try: - do_edit = True - if upd.finished: - # try to delete the old message and send a new one, to notify user more effectively - try: - bot.delete_message(upd.user_id, upd.message_id) - do_edit = False - except TelegramError as exc: - self._logger.error(f'loop: failed to delete old message (in order to send a new one)') - self._logger.exception(exc) - - if do_edit: - bot.edit_message_text(upd.user_id, upd.message_id, - text=upd.html, - reply_markup=upd.markup) - else: - bot.notify_user(upd.user_id, upd.html, reply_markup=upd.markup) - except TelegramError as exc: - if "Message can't be edited" in exc.message: - self._logger.warning("message can't be edited, adding it to forget list") - forget.append(upd) - - self._logger.error(f'loop: edit_message_text failed for update: {upd}') - self._logger.exception(exc) - - # delete messages - for mut, upd in deletions: - self._logger.debug(f'loop: got deletion: {upd}') - try: - bot.delete_message(upd.user_id, upd.message_id) - except TelegramError as exc: - self._logger.error(f'loop: delete_message failed for update: {upd}') - self._logger.exception(exc) - - # delete muts, if needed - if forget: - with self._muts_lock: - for mut in forget: - self._logger.debug(f'loop: removing mut {mut}') - self._muts.remove(mut) - - time.sleep(0.5) - - # ------------------- - # DeviceListener impl - - def device_updated(self): - self._logger.info(f'device updated: {self.kettle.device.si}') - self.kettle.start_server_if_needed(incoming_message_listener=self, - connection_status_listener=self) - - # ----------------------- - # KettleInfoListener impl - - def info_updated(self, field: str): - with self._info_lock: - newval = getattr(self.info, field) - self._logger.debug(f'info_updated: updated {field}, new value is {newval}') - self._accumulated_updates[field] = newval - - # ---------------------------- - # IncomingMessageListener impl - - def incoming_message(self, message: kettle_proto.Message) -> Optional[kettle_proto.Message]: - self._logger.info(f'incoming message: {message}') - - if isinstance(message, kettle_proto.ModeMessage): - self.info.mode = message.pt - elif isinstance(message, kettle_proto.CurrentTemperatureMessage): - self.info.temperature = message.current_temperature - elif isinstance(message, kettle_proto.TargetTemperatureMessage): - self.info.target_temperature = message.temperature - - return kettle_proto.AckMessage() - - # ----------------------------- - # ConnectionStatusListener impl - - def connection_status_updated(self, status: ConnectionStatus): - self._logger.info(f'connection status updated: {status}') - if status == ConnectionStatus.DISCONNECTED: - self.kettle.stop_all() - self.kettle_reconnect() - - # ------------- - # MQTTBase impl - - def on_connect(self, client: mqtt.Client, userdata, flags, rc): - super().on_connect(client, userdata, flags, rc) - client.subscribe(self._mqtt_root_topic, qos=1) - self._logger.info(f'subscribed to {self._mqtt_root_topic}') - - def on_message(self, client: mqtt.Client, userdata, msg): - try: - topic = msg.topic[len(self._mqtt_root_topic)-2:] - pld = msg.payload.decode() - - self._logger.debug(f'mqtt: on message: topic={topic} pld={pld}') - - if topic == 'state/sensor/temperature': - self.info.temperature = int(float(pld)) - elif topic == 'state/mode': - self.info.mode = PowerType(int(pld)) - elif topic == 'state/temperature': - self.info.target_temperature = int(float(pld)) - - except Exception as e: - self._logger.exception(str(e)) - - -class Renderer: - @classmethod - def index(cls, ctx: bot.Context) -> RenderedContent: - html = f'<b>{ctx.lang("settings")}</b>\n\n' - html += ctx.lang('select_place') - return html, None - - @classmethod - def status(cls, ctx: bot.Context, - connected: bool, - mode: PowerType, - current_temp: int, - target_temp: int, - update_time: int) -> RenderedContent: - if not connected: - return cls.not_connected(ctx) - else: - # power status - if mode != PowerType.OFF: - html = ctx.lang('status_on', target_temp) - else: - html = ctx.lang('status_off') - - # current temperature - html += '\n' - html += ctx.lang('status_current_temp', current_temp) - - # updated on - html += '\n' - html += cls.updated(ctx, update_time) - - return html, None - - @classmethod - def temp(cls, ctx: bot.Context, choices) -> RenderedContent: - buttons = [] - for chunk in chunks(choices, 5): - buttons.append([f'{temperature_emoji(n)} {n}' for n in chunk]) - buttons.append([ctx.lang('back')]) - return ctx.lang('select_temperature'), ReplyKeyboardMarkup(buttons) - - @classmethod - def turned_on(cls, ctx: bot.Context, - target_temp: int, - current_temp: int, - mode: PowerType, - update_time: Optional[int] = None, - reached=False, - no_keyboard=False) -> RenderedContent: - if mode == PowerType.OFF and not reached: - html = ctx.lang('enabling') - else: - if not reached: - html = ctx.lang('enabled') - - # target temperature - html += '\n' - html += ctx.lang('enabled_target', temperature_emoji(target_temp), target_temp) - - # current temperature - html += '\n' - html += temperature_emoji(current_temp) + ' ' - html += ctx.lang('status_current_temp', current_temp) - else: - html = ctx.lang('enabled_reached', current_temp) - - # updated on - if not reached and update_time is not None: - html += '\n' - html += cls.updated(ctx, update_time) - - return html, None if no_keyboard else cls.wait_buttons(ctx) - - @classmethod - def turned_off(cls, ctx: bot.Context, - mode: PowerType, - update_time: Optional[int] = None, - reached=False, - no_keyboard=False) -> RenderedContent: - if mode != PowerType.OFF: - html = ctx.lang('disabling') - else: - html = ctx.lang('disabled') - - # updated on - if not reached and update_time is not None: - html += '\n' - html += cls.updated(ctx, update_time) - - return html, None if no_keyboard else cls.wait_buttons(ctx) - - @classmethod - def not_connected(cls, ctx: bot.Context) -> RenderedContent: - return ctx.lang('status_not_connected'), None - - @classmethod - def smth_went_wrong(cls, ctx: bot.Context) -> RenderedContent: - html = ctx.lang('smth_went_wrong') - return html, None - - @classmethod - def updated(cls, ctx: bot.Context, update_time: int): - locale_bak = locale.getlocale(locale.LC_TIME) - locale.setlocale(locale.LC_TIME, 'ru_RU.UTF-8' if ctx.user_lang == 'ru' else 'en_US.UTF-8') - dt = datetime.fromtimestamp(update_time) - html = ctx.lang('status_update_time', dt.strftime(ctx.lang('status_update_time_fmt'))) - locale.setlocale(locale.LC_TIME, locale_bak) - return html - - @classmethod - def wait_buttons(cls, ctx: bot.Context): - return InlineKeyboardMarkup([ - [ - InlineKeyboardButton(ctx.lang('please_wait'), callback_data='wait') - ] - ]) - - -MUTUpdate = namedtuple('MUTUpdate', 'message_id, user_id, finished, changed, delete, html, markup') - - -class MessageUpdatingTarget: - ctx: bot.Context - message: Message - user_target_temp: Optional[int] - user_enabled_power_mode: PowerType - initial_power_mode: PowerType - need_to_delete: bool - rendered_content: Optional[RenderedContent] - - def __init__(self, - ctx: bot.Context, - message: Message, - user_enabled_power_mode: PowerType, - initial_power_mode: PowerType, - user_target_temp: Optional[int] = None): - self.ctx = ctx - self.message = message - self.initial_power_mode = initial_power_mode - self.user_enabled_power_mode = user_enabled_power_mode - self.ignore_pm = initial_power_mode is PowerType.OFF and self.user_did_turn_on() - self.user_target_temp = user_target_temp - self.need_to_delete = False - self.rendered_content = None - self.last_reported_temp = None - - def set_rendered_content(self, content: RenderedContent): - self.rendered_content = content - - def rendered_content_changed(self, content: RenderedContent) -> bool: - return content != self.rendered_content - - def update(self, - mode: PowerType, - current_temp: int, - target_temp: int) -> MUTUpdate: - - # determine whether status updating is finished - finished = False - reached = False - if self.ignore_pm: - if mode != PowerType.OFF: - self.ignore_pm = False - elif mode == PowerType.OFF: - reached = True - if self.user_did_turn_on(): - # when target is 100 degrees, this kettle sometimes turns off at 91, sometimes at 95, sometimes at 98. - # it's totally unpredictable, so in this case, we keep updating the message until it reaches at least 97 - # degrees, or if temperature started dropping. - if self.user_target_temp < 100 \ - or current_temp >= self.user_target_temp - 3 \ - or current_temp < self.last_reported_temp: - finished = True - else: - finished = True - - self.last_reported_temp = current_temp - - # render message - if self.user_did_turn_on(): - rc = Renderer.turned_on(self.ctx, - target_temp=target_temp, - current_temp=current_temp, - mode=mode, - reached=reached, - no_keyboard=finished) - else: - rc = Renderer.turned_off(self.ctx, - mode=mode, - reached=reached, - no_keyboard=finished) - - changed = self.rendered_content_changed(rc) - update = MUTUpdate(message_id=self.message.message_id, - user_id=self.ctx.user_id, - finished=finished, - changed=changed, - delete=self.need_to_delete, - html=rc[0], - markup=rc[1]) - if changed: - self.set_rendered_content(rc) - return update - - def user_did_turn_on(self) -> bool: - return self.user_enabled_power_mode in (PowerType.ON, PowerType.CUSTOM) - - def delete(self): - self.need_to_delete = True - - @property - def user_id(self) -> int: - return self.ctx.user_id - - -@bot.handler(command='status') -def status(ctx: bot.Context) -> None: - text, markup = Renderer.status(ctx, - connected=kc.kettle.is_connected(), - mode=kc.info.mode, - current_temp=kc.info.temperature, - target_temp=kc.info.target_temperature, - update_time=kc.info.update_time) - ctx.reply(text, markup=markup) - - -@bot.handler(command='temp') -def temp(ctx: bot.Context) -> None: - text, markup = Renderer.temp( - ctx, choices=all_choices) - ctx.reply(text, markup=markup) - - -def enable(temp: int, ctx: bot.Context) -> None: - if not kc.kettle.is_connected(): - text, markup = Renderer.not_connected(ctx) - ctx.reply(text, markup=markup) - return - - tasks = queue.SimpleQueue() - if temp == 100: - power_mode = PowerType.ON - else: - power_mode = PowerType.CUSTOM - tasks.put(['set_target_temperature', temp]) - tasks.put(['set_power', power_mode]) - - def done(ok: bool): - if not ok: - html, markup = Renderer.smth_went_wrong(ctx) - else: - html, markup = Renderer.turned_on(ctx, - target_temp=temp, - current_temp=kc.info.temperature, - mode=kc.info.mode) - message = ctx.reply(html, markup=markup) - logger.debug(f'ctx.reply returned message: {message}') - - if ok: - mut = MessageUpdatingTarget(ctx, message, - initial_power_mode=kc.info.mode, - user_enabled_power_mode=power_mode, - user_target_temp=temp) - mut.set_rendered_content((html, markup)) - kc.add_updating_message(mut) - - run_tasks(tasks, done) - - -@bot.handler(message='disable') -def disable(ctx: bot.Context): - if not kc.kettle.is_connected(): - text, markup = Renderer.not_connected(ctx) - ctx.reply(text, markup=markup) - return - - def done(ok: bool): - mode = kc.info.mode - if not ok: - html, markup = Renderer.smth_went_wrong(ctx) - else: - kw = {} - if mode == PowerType.OFF: - kw['reached'] = True - kw['no_keyboard'] = True - html, markup = Renderer.turned_off(ctx, mode=mode, **kw) - message = ctx.reply(html, markup=markup) - logger.debug(f'ctx.reply returned message: {message}') - - if ok and mode != PowerType.OFF: - mut = MessageUpdatingTarget(ctx, message, - initial_power_mode=mode, - user_enabled_power_mode=PowerType.OFF) - mut.set_rendered_content((html, markup)) - kc.add_updating_message(mut) - - tasks = queue.SimpleQueue() - tasks.put(['set_power', PowerType.OFF]) - run_tasks(tasks, done) - - -@bot.handler(message='back') -def back(ctx: bot.Context): - bot.start(ctx) - - -@bot.defaultreplymarkup -def defaultmarkup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: - buttons = [ - [f'{temperature_emoji(n)} {n}' for n in primary_choices], - [ctx.lang('disable')] - ] - return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) - - -if __name__ == '__main__': - for temp in primary_choices: - bot.handler(text=f'{temperature_emoji(temp)} {temp}')(partial(enable, temp)) - - for temp in all_choices: - bot.handler(text=f'{temperature_emoji(temp)} {temp}')(partial(enable, temp)) - - kc = KettleController() - - if 'api' in config: - bot.enable_logging(BotType.POLARIS_KETTLE) - - bot.run() - - # bot library handles signals, so when sigterm or something like that happens, we should stop all other threads here - kc.stop_all() |