diff options
Diffstat (limited to 'src/sound_bot.py')
-rwxr-xr-x | src/sound_bot.py | 184 |
1 files changed, 182 insertions, 2 deletions
diff --git a/src/sound_bot.py b/src/sound_bot.py index ae54413..4cc08a3 100755 --- a/src/sound_bot.py +++ b/src/sound_bot.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 import logging import os +import time +import tempfile from enum import Enum from datetime import datetime, timedelta @@ -12,6 +14,7 @@ from home.api.types import BotType from home.api.errors import ApiResponseError from home.sound import SoundNodeClient, RecordClient, RecordFile from home.soundsensor import SoundSensorServerGuardClient +from home.camera import esp32 from home.util import parse_addr, chunks, filesize_fmt from home.api import WebAPIClient from home.api.types import SoundSensorLocation @@ -28,6 +31,7 @@ RenderedContent = tuple[str, Optional[InlineKeyboardMarkup]] record_client: Optional[RecordClient] = None bot: Optional[Wrapper] = None node_client_links: dict[str, SoundNodeClient] = {} +cam_client_links: dict[str, esp32.WebClient] = {} def node_client(node: str) -> SoundNodeClient: @@ -36,10 +40,24 @@ def node_client(node: str) -> SoundNodeClient: return node_client_links[node] +def camera_client(cam: str) -> esp32.WebClient: + if cam not in node_client_links: + cam_client_links[cam] = esp32.WebClient(parse_addr(config['cameras'][cam]['addr'])) + return cam_client_links[cam] + + def node_exists(node: str) -> bool: return node in config['nodes'] +def camera_exists(name: str) -> bool: + return name in config['cameras'] + + +def have_cameras() -> bool: + return 'cameras' in config and config['cameras'] + + def sound_sensor_exists(node: str) -> bool: return node in config['sound_sensors'] @@ -299,6 +317,141 @@ class SoundSensorRenderer(Renderer): return text.encode() +class CamerasRenderer(Renderer): + @classmethod + def index(cls, ctx: Context) -> RenderedContent: + html = f'<b>{ctx.lang("cameras")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='c0') + + @classmethod + def places_markup(cls, ctx: Context, callback_prefix: str) -> InlineKeyboardMarkup: + buttons = [] + for sensor, sensor_label in config['cameras'].items(): + buttons.append( + [InlineKeyboardButton(sensor_label[ctx.user_lang], callback_data=f'{callback_prefix}/{sensor}')]) + return InlineKeyboardMarkup(buttons) + + @classmethod + def camera(cls, ctx: Context) -> RenderedContent: + node, = callback_unpack(ctx) + + html = ctx.lang('select_interval') + buttons = [ + [ + InlineKeyboardButton(ctx.lang('w_flash'), callback_data=f'c1/{node}/1'), + InlineKeyboardButton(ctx.lang('wo_flash'), callback_data=f'c1/{node}/0'), + ] + ] + cls.back_button(ctx, buttons, callback_data=f'c0') + + return html, InlineKeyboardMarkup(buttons) + # + # @classmethod + # def record_started(cls, ctx: Context, rid: int) -> RenderedContent: + # node, *rest = callback_unpack(ctx) + # + # place = config['nodes'][node]['label'][ctx.user_lang] + # + # html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})' + # return html, None + # + # @classmethod + # def record_done(cls, info: dict, node: str, uid: int) -> str: + # ulang = bot.store.get_user_lang(uid) + # + # def lang(key, *args): + # return bot.lang.get(key, ulang, *args) + # + # rid = info['id'] + # fmt = '%d.%m.%y %H:%M:%S' + # start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt) + # stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt) + # + # place = config['nodes'][node]['label'][ulang] + # + # html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n' + # html += f'<b>{lang("beginning")}</b>: {start_time}\n' + # html += f'<b>{lang("end")}</b>: {stop_time}' + # + # return html + # + # @classmethod + # def record_error(cls, info: dict, node: str, uid: int) -> str: + # ulang = bot.store.get_user_lang(uid) + # + # def lang(key, *args): + # return bot.lang.get(key, ulang, *args) + # + # place = config['nodes'][node]['label'][ulang] + # rid = info['id'] + # + # html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})' + # if 'error' in info: + # html += '\n'+str(info['error']) + # + # return html + + +# cameras handlers +# ---------------- + +def cameras(ctx: Context): + text, markup = CamerasRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +def camera_options(ctx: Context) -> None: + cam, = callback_unpack(ctx) + if not camera_exists(cam): + ctx.answer(ctx.lang('invalid_location')) + return + + ctx.answer() + + text, markup = CamerasRenderer.camera(ctx) + ctx.edit(text, markup) + + +def camera_capture(ctx: Context) -> None: + cam, flash = callback_unpack(ctx) + flash = int(flash) + if not camera_exists(cam): + ctx.answer(ctx.lang('invalid_location')) + return + + ctx.answer() + + client = camera_client(cam) + client.setflash(True if flash else False) + time.sleep(0.2) + + fd = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') + fd.close() + + client.capture(fd.name) + logger.debug(f'captured photo ({cam}), saved to {fd.name}') + + # disable flash led + if flash: + client.setflash(False) + + try: + with open(fd.name, 'rb') as f: + bot.send_photo(ctx.user_id, photo=f) + except TelegramError as exc: + logger.exception(exc) + + try: + os.unlink(fd.name) + except OSError as exc: + logger.exception(exc) + + # settings handlers # ----------------- @@ -626,7 +779,12 @@ class SoundBot(Wrapper): sound_sensors="Датчики звука", sound_sensors_info="Здесь можно получить информацию о последних срабатываниях датчиков звука.", sound_sensors_no_24h_data="За последние 24 часа данных нет.", - sound_sensors_show_anything="Показать, что есть" + sound_sensors_show_anything="Показать, что есть", + + cameras="Камеры", + select_option="Выберите опцию", + w_flash="Со вспышкой", + wo_flash="Без вспышки", ) self.lang.en( @@ -672,7 +830,12 @@ class SoundBot(Wrapper): sound_sensors="Sound sensors", sound_sensors_info="Here you can get information about last sound sensors hits.", sound_sensors_no_24h_data="No data for the last 24 hours.", - sound_sensors_show_anything="Show me at least something" + sound_sensors_show_anything="Show me at least something", + + cameras="Cameras", + select_option="Select option", + w_flash="With flash", + wo_flash="Without flash", ) # ------ @@ -750,6 +913,21 @@ class SoundBot(Wrapper): # list of specific node's files # self.add_handler(CallbackQueryHandler(self.wrap(files_list), pattern=r'^g0/.*')) + # ------ + # cameras + # ------------ + + # list of cameras + self.add_handler(MessageHandler(text_filter(self.lang.all('cameras')), self.wrap(cameras))) + self.add_handler(CallbackQueryHandler(self.wrap(cameras), pattern=r'^c0$')) + + # list of options (with/without flash etc) + self.add_handler(CallbackQueryHandler(self.wrap(camera_options), pattern=r'^c0/.*')) + + # cheese + self.add_handler(CallbackQueryHandler(self.wrap(camera_capture), pattern=r'^c1/.*')) + + def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]: buttons = [ [ctx.lang('record'), ctx.lang('settings')], @@ -760,6 +938,8 @@ class SoundBot(Wrapper): ctx.lang('guard_enable'), ctx.lang('guard_disable'), ctx.lang('guard_status') ]) buttons.append([ctx.lang('sound_sensors')]) + if have_cameras(): + buttons.append([ctx.lang('cameras')]) return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) |