diff options
Diffstat (limited to 'src/sensors_bot.py')
-rwxr-xr-x | src/sensors_bot.py | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/src/sensors_bot.py b/src/sensors_bot.py new file mode 100755 index 0000000..ea3dc9e --- /dev/null +++ b/src/sensors_bot.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +import json +import socket +import logging +import re +import gc + +from io import BytesIO +from typing import Optional +from functools import partial + +import matplotlib.pyplot as plt +import matplotlib.dates as mdates +import matplotlib.ticker as mticker + +from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton +from telegram.ext import MessageHandler, CallbackQueryHandler + +from home.config import config +from home.bot import Wrapper, Context, text_filter +from home.util import chunks, MySimpleSocketClient +from home.api import WebAPIClient +from home.api.types import ( + BotType, + TemperatureSensorLocation +) + +bot: Optional[Wrapper] = None +plt.rcParams['font.size'] = 7 +logger = logging.getLogger(__name__) +plot_hours = [3, 6, 12, 24] + + +def read_sensor(sensor: str, ctx: Context) -> None: + host = config['sensors'][sensor]['ip'] + port = config['sensors'][sensor]['port'] + + try: + client = MySimpleSocketClient(host, port) + client.write('read') + data = json.loads(client.read()) + except (socket.timeout, socket.error) as error: + return ctx.reply_exc(error) + + temp = round(data['temp'], 2) + humidity = round(data['humidity'], 2) + + text = ctx.lang('temperature') + f': <b>{temp} °C</b>\n' + text += ctx.lang('humidity') + f': <b>{humidity}%</b>' + + buttons = list(map( + lambda h: InlineKeyboardButton(ctx.lang(f'plot_{h}h'), callback_data=f'plot/{sensor}/{h}'), + plot_hours + )) + ctx.reply(text, markup=InlineKeyboardMarkup(chunks(buttons, 2))) + + +def callback_handler(ctx: Context) -> None: + query = ctx.callback_query + + sensors_variants = '|'.join(config['sensors'].keys()) + hour_variants = '|'.join(list(map( + lambda n: str(n), + plot_hours + ))) + + match = re.match(rf'plot/({sensors_variants})/({hour_variants})', query.data) + if not match: + query.answer(ctx.lang('unexpected_callback_data')) + return + + query.answer(ctx.lang('loading')) + + # retrieve data + sensor = TemperatureSensorLocation[match.group(1).upper()] + hours = int(match.group(2)) + + api = WebAPIClient() + data = api.get_sensors_data(sensor, hours) + + title = ctx.lang(sensor.name.lower()) + ' (' + ctx.lang('n_hrs', hours) + ')' + plot = draw_plot(data, title, + ctx.lang('temperature'), + ctx.lang('humidity')) + bot.updater.bot.send_photo(ctx.user_id, plot) + + gc.collect() + + +def draw_plot(data, + title: str, + label_temp: str, + label_hum: str) -> BytesIO: + tempval = [] + humval = [] + dates = [] + for date, temp, humidity in data: + dates.append(date) + tempval.append(temp) + humval.append(humidity) + + fig, axs = plt.subplots(2, 1) + df = mdates.DateFormatter('%H:%M') + + axs[0].set_title(label_temp) + axs[0].plot(dates, tempval) + axs[0].xaxis.set_major_formatter(df) + axs[0].yaxis.set_major_formatter(mticker.FormatStrFormatter('%2.2f °C')) + + fig.suptitle(title, fontsize=10) + + axs[1].set_title(label_hum) + axs[1].plot(dates, humval) + axs[1].xaxis.set_major_formatter(df) + axs[1].yaxis.set_major_formatter(mticker.FormatStrFormatter('%2.1f %%')) + + fig.autofmt_xdate() + + # should be called after all axes have been added + fig.tight_layout() + + buf = BytesIO() + fig.savefig(buf, format='png', dpi=160) + buf.seek(0) + + plt.clf() + plt.close('all') + + return buf + + +class SensorsBot(Wrapper): + def __init__(self): + super().__init__() + + self.lang.ru( + start_message="Выберите датчик на клавиатуре", + unknown_command="Неизвестная команда", + temperature="Температура", + humidity="Влажность", + plot_3h="График за 3 часа", + plot_6h="График за 6 часов", + plot_12h="График за 12 часов", + plot_24h="График за 24 часа", + unexpected_callback_data="Ошибка: неверные данные", + loading="Загрузка...", + n_hrs="график за %d ч." + ) + + self.lang.en( + start_message="Select the sensor on the keyboard", + unknown_command="Unknown command", + temperature="Temperature", + humidity="Relative humidity", + plot_3h="Graph for 3 hours", + plot_6h="Graph for 6 hours", + plot_12h="Graph for 12 hours", + plot_24h="Graph for 24 hours", + unexpected_callback_data="Unexpected callback data", + loading="Loading...", + n_hrs="graph for %d hours" + ) + + for k, v in config['sensors'].items(): + self.lang.set({k: v['label_ru']}, 'ru') + self.lang.set({k: v['label_en']}, 'en') + self.add_handler(MessageHandler(text_filter(self.lang.all(k)), self.wrap(partial(read_sensor, k)))) + + self.add_handler(CallbackQueryHandler(self.wrap(callback_handler))) + + def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [] + for k in config['sensors'].keys(): + buttons.append(ctx.lang(k)) + buttons = chunks(buttons, 2) + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +if __name__ == '__main__': + config.load('sensors_bot') + + bot = SensorsBot() + if 'api' in config: + bot.enable_logging(BotType.SENSORS) + bot.run() |