summaryrefslogtreecommitdiff
path: root/src/sensors_bot.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/sensors_bot.py')
-rwxr-xr-xsrc/sensors_bot.py185
1 files changed, 185 insertions, 0 deletions
diff --git a/src/sensors_bot.py b/src/sensors_bot.py
new file mode 100755
index 0000000..ea3dc9e
--- /dev/null
+++ b/src/sensors_bot.py
@@ -0,0 +1,185 @@
+#!/usr/bin/env python3
+import json
+import socket
+import logging
+import re
+import gc
+
+from io import BytesIO
+from typing import Optional
+from functools import partial
+
+import matplotlib.pyplot as plt
+import matplotlib.dates as mdates
+import matplotlib.ticker as mticker
+
+from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton
+from telegram.ext import MessageHandler, CallbackQueryHandler
+
+from home.config import config
+from home.bot import Wrapper, Context, text_filter
+from home.util import chunks, MySimpleSocketClient
+from home.api import WebAPIClient
+from home.api.types import (
+ BotType,
+ TemperatureSensorLocation
+)
+
+bot: Optional[Wrapper] = None
+plt.rcParams['font.size'] = 7
+logger = logging.getLogger(__name__)
+plot_hours = [3, 6, 12, 24]
+
+
+def read_sensor(sensor: str, ctx: Context) -> None:
+ host = config['sensors'][sensor]['ip']
+ port = config['sensors'][sensor]['port']
+
+ try:
+ client = MySimpleSocketClient(host, port)
+ client.write('read')
+ data = json.loads(client.read())
+ except (socket.timeout, socket.error) as error:
+ return ctx.reply_exc(error)
+
+ temp = round(data['temp'], 2)
+ humidity = round(data['humidity'], 2)
+
+ text = ctx.lang('temperature') + f': <b>{temp} °C</b>\n'
+ text += ctx.lang('humidity') + f': <b>{humidity}%</b>'
+
+ buttons = list(map(
+ lambda h: InlineKeyboardButton(ctx.lang(f'plot_{h}h'), callback_data=f'plot/{sensor}/{h}'),
+ plot_hours
+ ))
+ ctx.reply(text, markup=InlineKeyboardMarkup(chunks(buttons, 2)))
+
+
+def callback_handler(ctx: Context) -> None:
+ query = ctx.callback_query
+
+ sensors_variants = '|'.join(config['sensors'].keys())
+ hour_variants = '|'.join(list(map(
+ lambda n: str(n),
+ plot_hours
+ )))
+
+ match = re.match(rf'plot/({sensors_variants})/({hour_variants})', query.data)
+ if not match:
+ query.answer(ctx.lang('unexpected_callback_data'))
+ return
+
+ query.answer(ctx.lang('loading'))
+
+ # retrieve data
+ sensor = TemperatureSensorLocation[match.group(1).upper()]
+ hours = int(match.group(2))
+
+ api = WebAPIClient()
+ data = api.get_sensors_data(sensor, hours)
+
+ title = ctx.lang(sensor.name.lower()) + ' (' + ctx.lang('n_hrs', hours) + ')'
+ plot = draw_plot(data, title,
+ ctx.lang('temperature'),
+ ctx.lang('humidity'))
+ bot.updater.bot.send_photo(ctx.user_id, plot)
+
+ gc.collect()
+
+
+def draw_plot(data,
+ title: str,
+ label_temp: str,
+ label_hum: str) -> BytesIO:
+ tempval = []
+ humval = []
+ dates = []
+ for date, temp, humidity in data:
+ dates.append(date)
+ tempval.append(temp)
+ humval.append(humidity)
+
+ fig, axs = plt.subplots(2, 1)
+ df = mdates.DateFormatter('%H:%M')
+
+ axs[0].set_title(label_temp)
+ axs[0].plot(dates, tempval)
+ axs[0].xaxis.set_major_formatter(df)
+ axs[0].yaxis.set_major_formatter(mticker.FormatStrFormatter('%2.2f °C'))
+
+ fig.suptitle(title, fontsize=10)
+
+ axs[1].set_title(label_hum)
+ axs[1].plot(dates, humval)
+ axs[1].xaxis.set_major_formatter(df)
+ axs[1].yaxis.set_major_formatter(mticker.FormatStrFormatter('%2.1f %%'))
+
+ fig.autofmt_xdate()
+
+ # should be called after all axes have been added
+ fig.tight_layout()
+
+ buf = BytesIO()
+ fig.savefig(buf, format='png', dpi=160)
+ buf.seek(0)
+
+ plt.clf()
+ plt.close('all')
+
+ return buf
+
+
+class SensorsBot(Wrapper):
+ def __init__(self):
+ super().__init__()
+
+ self.lang.ru(
+ start_message="Выберите датчик на клавиатуре",
+ unknown_command="Неизвестная команда",
+ temperature="Температура",
+ humidity="Влажность",
+ plot_3h="График за 3 часа",
+ plot_6h="График за 6 часов",
+ plot_12h="График за 12 часов",
+ plot_24h="График за 24 часа",
+ unexpected_callback_data="Ошибка: неверные данные",
+ loading="Загрузка...",
+ n_hrs="график за %d ч."
+ )
+
+ self.lang.en(
+ start_message="Select the sensor on the keyboard",
+ unknown_command="Unknown command",
+ temperature="Temperature",
+ humidity="Relative humidity",
+ plot_3h="Graph for 3 hours",
+ plot_6h="Graph for 6 hours",
+ plot_12h="Graph for 12 hours",
+ plot_24h="Graph for 24 hours",
+ unexpected_callback_data="Unexpected callback data",
+ loading="Loading...",
+ n_hrs="graph for %d hours"
+ )
+
+ for k, v in config['sensors'].items():
+ self.lang.set({k: v['label_ru']}, 'ru')
+ self.lang.set({k: v['label_en']}, 'en')
+ self.add_handler(MessageHandler(text_filter(self.lang.all(k)), self.wrap(partial(read_sensor, k))))
+
+ self.add_handler(CallbackQueryHandler(self.wrap(callback_handler)))
+
+ def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]:
+ buttons = []
+ for k in config['sensors'].keys():
+ buttons.append(ctx.lang(k))
+ buttons = chunks(buttons, 2)
+ return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
+
+
+if __name__ == '__main__':
+ config.load('sensors_bot')
+
+ bot = SensorsBot()
+ if 'api' in config:
+ bot.enable_logging(BotType.SENSORS)
+ bot.run()