#!/usr/bin/env python3
from __future__ import annotations
import logging
import locale
import queue
import time
import threading
import paho.mqtt.client as mqtt
from home.bot import Wrapper, Context, text_filter, handlermethod
from home.api.types import BotType
from home.mqtt import MQTTBase
from home.config import config
from polaris import (
Kettle,
PowerType,
DeviceListener,
IncomingMessageListener,
ConnectionStatusListener,
ConnectionStatus
)
import polaris.protocol as kettle_proto
from typing import Optional, Tuple, List
from collections import namedtuple
from functools import partial
from datetime import datetime
from abc import abstractmethod
from telegram.error import TelegramError
from telegram import (
ReplyKeyboardMarkup,
InlineKeyboardMarkup,
InlineKeyboardButton,
Message
)
from telegram.ext import (
CallbackQueryHandler,
MessageHandler,
CommandHandler
)
logger = logging.getLogger(__name__)
kc: Optional[KettleController] = None
bot: Optional[Wrapper] = None
RenderedContent = Tuple[str, Optional[InlineKeyboardMarkup]]
tasks_lock = threading.Lock()
class KettleInfoListener:
@abstractmethod
def info_updated(self, field: str):
pass
# class that holds data coming from the kettle over mqtt
class KettleInfo:
update_time: int
_mode: Optional[PowerType]
_temperature: Optional[int]
_target_temperature: Optional[int]
_update_listener: KettleInfoListener
def __init__(self, update_listener: KettleInfoListener):
self.update_time = 0
self._mode = None
self._temperature = None
self._target_temperature = None
self._update_listener = update_listener
def _update(self, field: str):
self.update_time = int(time.time())
if self._update_listener:
self._update_listener.info_updated(field)
@property
def temperature(self) -> int:
return self._temperature
@temperature.setter
def temperature(self, value: int):
self._temperature = value
self._update('temperature')
@property
def mode(self) -> PowerType:
return self._mode
@mode.setter
def mode(self, value: PowerType):
self._mode = value
self._update('mode')
@property
def target_temperature(self) -> int:
return self._target_temperature
@target_temperature.setter
def target_temperature(self, value: int):
self._target_temperature = value
self._update('target_temperature')
class KettleController(threading.Thread,
MQTTBase,
DeviceListener,
IncomingMessageListener,
KettleInfoListener,
ConnectionStatusListener):
kettle: Kettle
info: KettleInfo
_logger: logging.Logger
_stopped: bool
_restart_server_at: int
_lock: threading.Lock
_info_lock: threading.Lock
_accumulated_updates: dict
_info_flushed_time: float
_mqtt_root_topic: str
_muts: List[MessageUpdatingTarget]
def __init__(self):
# basic setup
MQTTBase.__init__(self, clean_session=False)
threading.Thread.__init__(self)
self._logger = logging.getLogger(self.__class__.__name__)
self.kettle = Kettle(mac=config['kettle']['mac'],
device_token=config['kettle']['token'])
self.kettle_reconnect()
# info
self.info = KettleInfo(update_listener=self)
self._accumulated_updates = {}
self._info_flushed_time = 0
# mqtt
self._mqtt_root_topic = '/polaris/6/'+config['kettle']['token']+'/#'
self.connect_and_loop(loop_forever=False)
# thread loop related
self._stopped = False
# self._lock = threading.Lock()
self._info_lock = threading.Lock()
self._restart_server_at = 0
# bot
self._muts = []
self._muts_lock = threading.Lock()
self.start()
def kettle_reconnect(self):
self.kettle.discover(wait=False, listener=self)
def stop_all(self):
self.kettle.stop_all()
self._stopped = True
def add_updating_message(self, mut: MessageUpdatingTarget):
with self._muts_lock:
for m in self._muts:
if m.user_id == m.user_id and m.user_did_turn_on() or m.user_did_turn_on() != mut.user_did_turn_on():
m.delete()
self._muts.append(mut)
# ---------------------
# threading.Thread impl
def run(self):
while not self._stopped:
# do_restart_srv = False
#
# with self._lock:
# if self._restart_server_at != 0 and time.time() - self._restart_server_at:
# self._restart_server_at = 0
# do_restart_srv = True
#
# if do_restart_srv:
# self.kettle_connect()
updates = []
deletions = []
with self._muts_lock and self._info_lock:
# self._logger.debug('muts size: '+str(len(self._muts)))
if self._muts and self._accumulated_updates and (self._info_flushed_time == 0 or time.time() - self._info_flushed_time >= 1):
forget = []
deletions = []
for mut in self._muts:
upd = mut.update(
mode=self.info.mode,
current_temp=self.info.temperature,
target_temp=self.info.target_temperature)
if upd.finished or upd.delete:
forget.append(mut)
if upd.delete:
deletions.append(upd)
elif upd.changed:
updates.append(upd)
if forget:
for mut in forget:
self._logger.debug(f'loop: removing mut {mut}')
self._muts.remove(mut)
self._info_flushed_time = time.time()
self._accumulated_updates = {}
for upd in updates:
self._logger.debug(f'loop: got update: {upd}')
try:
bot.edit_message_text(upd.user_id, upd.message_id,
text=upd.html,
reply_markup=upd.markup)
except TelegramError as exc:
self._logger.error(f'loop: edit_message_text failed for update: {upd}')
self._logger.exception(exc)
for upd in deletions:
self._logger.debug(f'loop: got deletion: {upd}')
try:
bot.delete_message(upd.user_id, upd.message_id)
except TelegramError as exc:
self._logger.error(f'loop: delete_message failed for update: {upd}')
self._logger.exception(exc)
time.sleep(0.5)
# -------------------
# DeviceListener impl
def device_updated(self):
self._logger.info(f'device updated: {self.kettle.device.si}')
self.kettle.start_server_if_needed(incoming_message_listener=self,
connection_status_listener=self)
# -----------------------
# KettleInfoListener impl
def info_updated(self, field: str):
with self._info_lock:
newval = getattr(self.info, field)
self._logger.debug(f'info_updated: updated {field}, new value is {newval}')
self._accumulated_updates[field] = newval
# ----------------------------
# IncomingMessageListener impl
def incoming_message(self, message: kettle_proto.Message) -> Optional[kettle_proto.Message]:
self._logger.info(f'incoming message: {message}')
if isinstance(message, kettle_proto.ModeMessage):
self.info.mode = message.pt
elif isinstance(message, kettle_proto.CurrentTemperatureMessage):
self.info.temperature = message.current_temperature
elif isinstance(message, kettle_proto.TargetTemperatureMessage):
self.info.target_temperature = message.temperature
return kettle_proto.AckMessage()
# -----------------------------
# ConnectionStatusListener impl
def connection_status_updated(self, status: ConnectionStatus):
self._logger.info(f'connection status updated: {status}')
if status == ConnectionStatus.DISCONNECTED:
self.kettle.stop_all()
self.kettle_reconnect()
# -------------
# MQTTBase impl
def on_connect(self, client: mqtt.Client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
client.subscribe(self._mqtt_root_topic, qos=1)
self._logger.info(f'subscribed to {self._mqtt_root_topic}')
def on_message(self, client: mqtt.Client, userdata, msg):
try:
topic = msg.topic[len(self._mqtt_root_topic)-2:]
pld = msg.payload.decode()
self._logger.debug(f'mqtt: on message: topic={topic} pld={pld}')
if topic == 'state/sensor/temperature':
self.info.temperature = int(float(pld))
elif topic == 'state/mode':
self.info.mode = PowerType(int(pld))
elif topic == 'state/temperature':
self.info.target_temperature = int(float(pld))
except Exception as e:
self._logger.exception(str(e))
class Renderer:
@classmethod
def index(cls, ctx: Context) -> RenderedContent:
html = f'{ctx.lang("settings")}\n\n'
html += ctx.lang('select_place')
return html, None
@classmethod
def status(cls, ctx: Context,
connected: bool,
mode: PowerType,
current_temp: int,
target_temp: int,
update_time: int) -> RenderedContent:
if not connected:
return cls.not_connected(ctx)
else:
# power status
if mode != PowerType.OFF:
html = ctx.lang('status_on', target_temp)
else:
html = ctx.lang('status_off')
# current temperature
html += '\n'
html += ctx.lang('status_current_temp', current_temp)
# updated on
html += '\n'
html += cls.updated(ctx, update_time)
return html, None
@classmethod
def turned_on(cls, ctx: Context,
target_temp: int,
current_temp: int,
mode: PowerType,
update_time: Optional[int] = None,
reached=False,
no_keyboard=False) -> RenderedContent:
if mode == PowerType.OFF and not reached:
html = ctx.lang('enabling')
else:
if not reached:
emoji = '♨️' if current_temp <= 90 else '🔥'
html = ctx.lang('enabled', emoji, target_temp)
# current temperature
html += '\n'
html += ctx.lang('status_current_temp', current_temp)
else:
html = ctx.lang('enabled_reached', current_temp)
# updated on
if not reached and update_time is not None:
html += '\n'
html += cls.updated(ctx, update_time)
return html, None if no_keyboard else cls.wait_buttons(ctx)
@classmethod
def turned_off(cls, ctx: Context,
mode: PowerType,
update_time: Optional[int] = None,
reached=False,
no_keyboard=False) -> RenderedContent:
if mode != PowerType.OFF:
html = ctx.lang('disabling')
else:
html = ctx.lang('disabled')
# updated on
if not reached and update_time is not None:
html += '\n'
html += cls.updated(ctx, update_time)
return html, None if no_keyboard else cls.wait_buttons(ctx)
@classmethod
def not_connected(cls, ctx: Context) -> RenderedContent:
return ctx.lang('status_not_connected'), None
@classmethod
def smth_went_wrong(cls, ctx: Context) -> RenderedContent:
html = ctx.lang('smth_went_wrong')
return html, None
@classmethod
def updated(cls, ctx: Context, update_time: int):
locale_bak = locale.getlocale(locale.LC_TIME)
locale.setlocale(locale.LC_TIME, 'ru_RU.UTF-8' if ctx.user_lang == 'ru' else 'en_US.UTF-8')
dt = datetime.fromtimestamp(update_time)
html = ctx.lang('status_update_time', dt.strftime(ctx.lang('status_update_time_fmt')))
locale.setlocale(locale.LC_TIME, locale_bak)
return html
@classmethod
def wait_buttons(cls, ctx: Context):
return InlineKeyboardMarkup([
[
InlineKeyboardButton(ctx.lang('please_wait'), callback_data='wait')
]
])
def run_tasks(tasks: queue.SimpleQueue, done: callable):
def next_task(r: Optional[kettle_proto.MessageResponse]):
if r is not None:
try:
assert r is not False, 'server error'
except AssertionError as exc:
logger.exception(exc)
tasks_lock.release()
return done(False)
if not tasks.empty():
task = tasks.get()
args = task[1:]
args.append(next_task)
f = getattr(kc.kettle, task[0])
f(*args)
else:
tasks_lock.release()
return done(True)
tasks_lock.acquire()
next_task(None)
MUTUpdate = namedtuple('MUTUpdate', 'message_id, user_id, finished, changed, delete, html, markup')
class MessageUpdatingTarget:
ctx: Context
message: Message
user_target_temp: Optional[int]
user_enabled_power_mode: PowerType
initial_power_mode: PowerType
need_to_delete: bool
rendered_content: Optional[RenderedContent]
def __init__(self,
ctx: Context,
message: Message,
user_enabled_power_mode: PowerType,
initial_power_mode: PowerType,
user_target_temp: Optional[int] = None):
self.ctx = ctx
self.message = message
self.initial_power_mode = initial_power_mode
self.user_enabled_power_mode = user_enabled_power_mode
self.ignore_pm = initial_power_mode is PowerType.OFF and self.user_did_turn_on()
self.user_target_temp = user_target_temp
self.need_to_delete = False
self.rendered_content = None
self.last_reported_temp = None
def set_rendered_content(self, content: RenderedContent):
self.rendered_content = content
def rendered_content_changed(self, content: RenderedContent) -> bool:
return content != self.rendered_content
def update(self,
mode: PowerType,
current_temp: int,
target_temp: int) -> MUTUpdate:
# determine whether status updating is finished
finished = False
reached = False
if self.ignore_pm:
if mode != PowerType.OFF:
self.ignore_pm = False
elif mode == PowerType.OFF:
reached = True
if self.user_did_turn_on():
# when target is 100 degrees, this kettle sometimes turns off at 91, sometimes at 95, sometimes at 98.
# it's totally unpredictable, so in this case, we keep updating the message until it reaches at least 97
# degrees, or if temperature started dropping.
if self.user_target_temp < 100 \
or current_temp >= self.user_target_temp - 3 \
or current_temp < self.last_reported_temp:
finished = True
else:
finished = True
self.last_reported_temp = current_temp
# render message
if self.user_did_turn_on():
rc = Renderer.turned_on(self.ctx,
target_temp=target_temp,
current_temp=current_temp,
mode=mode,
reached=reached,
no_keyboard=finished)
else:
rc = Renderer.turned_off(self.ctx,
mode=mode,
reached=reached,
no_keyboard=finished)
changed = self.rendered_content_changed(rc)
update = MUTUpdate(message_id=self.message.message_id,
user_id=self.ctx.user_id,
finished=finished,
changed=changed,
delete=self.need_to_delete,
html=rc[0],
markup=rc[1])
if changed:
self.set_rendered_content(rc)
return update
def user_did_turn_on(self) -> bool:
return self.user_enabled_power_mode in (PowerType.ON, PowerType.CUSTOM)
def delete(self):
self.need_to_delete = True
@property
def user_id(self) -> int:
return self.ctx.user_id
class KettleBot(Wrapper):
def __init__(self):
super().__init__()
self.lang.ru(
start_message="Выберите команду на клавиатуре",
unknown_command="Неизвестная команда",
unexpected_callback_data="Ошибка: неверные данные",
enable_70="♨️ 70 °C",
enable_80="♨️ 80 °C",
enable_90="♨️ 90 °C",
enable_100="🔥 100 °C",
disable="❌ Выключить",
server_error="Ошибка сервера",
# /status
status_not_connected="😟 Связь с чайником не установлена",
status_on="✅ Чайник включён (до %d °C)",
status_off="❌ Чайник выключен",
status_current_temp="Сейчас: %d °C",
status_update_time="Обновлено %s",
status_update_time_fmt="%d %b в %H:%M:%S",
# enable
enabling="💤 Чайник включается...",
disabling="💤 Чайник выключается...",
enabled="%s Чайник включён.\nЦель: %d °C",
enabled_reached="✅ Готово! Чайник вскипел, температура %d °C.",
disabled="✅ Чайник выключен.",
please_wait="⏳ Ожидайте..."
)
self.lang.en(
start_message="Select command on the keyboard",
unknown_command="Unknown command",
unexpected_callback_data="Unexpected callback data",
enable_70="♨️ 70 °C",
enable_80="♨️ 80 °C",
enable_90="♨️ 90 °C",
enable_100="🔥 100 °C",
disable="❌ Turn OFF",
server_error="Server error",
# /status
not_connected="😟 Connection has not been established",
status_on="✅ Turned ON! Target: %d °C",
status_off="❌ Turned OFF",
status_current_temp="Now: %d °C",
status_update_time="Updated on %s",
status_update_time_fmt="%b %d, %Y at %H:%M:%S",
# enable
enabling="💤 Turning on...",
disabling="💤 Turning off...",
enabled="%s The kettle is turned ON.\nTarget: %d °C",
enabled_reached="✅ It's done! The kettle has boiled, the temperature is %d °C.",
disabled="✅ The kettle is turned OFF.",
please_wait="⏳ Please wait..."
)
# commands
self.add_handler(CommandHandler('status', self.status))
# messages
for temp in (70, 80, 90, 100):
self.add_handler(MessageHandler(text_filter(self.lang.all(f'enable_{temp}')), self.wrap(partial(self.on, temp))))
self.add_handler(MessageHandler(text_filter(self.lang.all('disable')), self.off))
def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = [
[ctx.lang(f'enable_{x}') for x in (70, 80, 90, 100)],
[ctx.lang('disable')]
]
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
def on(self, temp: int, ctx: Context) -> None:
if not kc.kettle.is_connected():
text, markup = Renderer.not_connected(ctx)
ctx.reply(text, markup=markup)
return
tasks = queue.SimpleQueue()
if temp == 100:
power_mode = PowerType.ON
else:
power_mode = PowerType.CUSTOM
tasks.put(['set_target_temperature', temp])
tasks.put(['set_power', power_mode])
def done(ok: bool):
if not ok:
html, markup = Renderer.smth_went_wrong(ctx)
else:
html, markup = Renderer.turned_on(ctx,
target_temp=temp,
current_temp=kc.info.temperature,
mode=kc.info.mode)
message = ctx.reply(html, markup=markup)
logger.info(f'ctx.reply returned message: {message}')
mut = MessageUpdatingTarget(ctx, message,
initial_power_mode=kc.info.mode,
user_enabled_power_mode=power_mode,
user_target_temp=temp)
mut.set_rendered_content((html, markup))
kc.add_updating_message(mut)
run_tasks(tasks, done)
@handlermethod
def off(self, ctx: Context) -> None:
if not kc.kettle.is_connected():
text, markup = Renderer.not_connected(ctx)
ctx.reply(text, markup=markup)
return
def done(ok: bool):
if not ok:
html, markup = Renderer.smth_went_wrong(ctx)
else:
html, markup = Renderer.turned_off(ctx, mode=kc.info.mode)
message = ctx.reply(html, markup=markup)
logger.info(f'ctx.reply returned message: {message}')
mut = MessageUpdatingTarget(ctx, message,
initial_power_mode=kc.info.mode,
user_enabled_power_mode=PowerType.OFF)
mut.set_rendered_content((html, markup))
kc.add_updating_message(mut)
tasks = queue.SimpleQueue()
tasks.put(['set_power', PowerType.OFF])
run_tasks(tasks, done)
@handlermethod
def status(self, ctx: Context):
text, markup = Renderer.status(ctx,
connected=kc.kettle.is_connected(),
mode=kc.info.mode,
current_temp=kc.info.temperature,
target_temp=kc.info.target_temperature,
update_time=kc.info.update_time)
return ctx.reply(text, markup=markup)
if __name__ == '__main__':
config.load('polaris_kettle_bot')
kc = KettleController()
bot = KettleBot()
if 'api' in config:
bot.enable_logging(BotType.POLARIS_KETTLE)
bot.run()
# bot library handles signals, so when sigterm or something like that happens, we should stop all other threads here
kc.stop_all()