summaryrefslogtreecommitdiff
path: root/src/sound_bot.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/sound_bot.py')
-rwxr-xr-xsrc/sound_bot.py184
1 files changed, 182 insertions, 2 deletions
diff --git a/src/sound_bot.py b/src/sound_bot.py
index ae54413..4cc08a3 100755
--- a/src/sound_bot.py
+++ b/src/sound_bot.py
@@ -1,6 +1,8 @@
#!/usr/bin/env python3
import logging
import os
+import time
+import tempfile
from enum import Enum
from datetime import datetime, timedelta
@@ -12,6 +14,7 @@ from home.api.types import BotType
from home.api.errors import ApiResponseError
from home.sound import SoundNodeClient, RecordClient, RecordFile
from home.soundsensor import SoundSensorServerGuardClient
+from home.camera import esp32
from home.util import parse_addr, chunks, filesize_fmt
from home.api import WebAPIClient
from home.api.types import SoundSensorLocation
@@ -28,6 +31,7 @@ RenderedContent = tuple[str, Optional[InlineKeyboardMarkup]]
record_client: Optional[RecordClient] = None
bot: Optional[Wrapper] = None
node_client_links: dict[str, SoundNodeClient] = {}
+cam_client_links: dict[str, esp32.WebClient] = {}
def node_client(node: str) -> SoundNodeClient:
@@ -36,10 +40,24 @@ def node_client(node: str) -> SoundNodeClient:
return node_client_links[node]
+def camera_client(cam: str) -> esp32.WebClient:
+ if cam not in node_client_links:
+ cam_client_links[cam] = esp32.WebClient(parse_addr(config['cameras'][cam]['addr']))
+ return cam_client_links[cam]
+
+
def node_exists(node: str) -> bool:
return node in config['nodes']
+def camera_exists(name: str) -> bool:
+ return name in config['cameras']
+
+
+def have_cameras() -> bool:
+ return 'cameras' in config and config['cameras']
+
+
def sound_sensor_exists(node: str) -> bool:
return node in config['sound_sensors']
@@ -299,6 +317,141 @@ class SoundSensorRenderer(Renderer):
return text.encode()
+class CamerasRenderer(Renderer):
+ @classmethod
+ def index(cls, ctx: Context) -> RenderedContent:
+ html = f'<b>{ctx.lang("cameras")}</b>\n\n'
+ html += ctx.lang('select_place')
+ return html, cls.places_markup(ctx, callback_prefix='c0')
+
+ @classmethod
+ def places_markup(cls, ctx: Context, callback_prefix: str) -> InlineKeyboardMarkup:
+ buttons = []
+ for sensor, sensor_label in config['cameras'].items():
+ buttons.append(
+ [InlineKeyboardButton(sensor_label[ctx.user_lang], callback_data=f'{callback_prefix}/{sensor}')])
+ return InlineKeyboardMarkup(buttons)
+
+ @classmethod
+ def camera(cls, ctx: Context) -> RenderedContent:
+ node, = callback_unpack(ctx)
+
+ html = ctx.lang('select_interval')
+ buttons = [
+ [
+ InlineKeyboardButton(ctx.lang('w_flash'), callback_data=f'c1/{node}/1'),
+ InlineKeyboardButton(ctx.lang('wo_flash'), callback_data=f'c1/{node}/0'),
+ ]
+ ]
+ cls.back_button(ctx, buttons, callback_data=f'c0')
+
+ return html, InlineKeyboardMarkup(buttons)
+ #
+ # @classmethod
+ # def record_started(cls, ctx: Context, rid: int) -> RenderedContent:
+ # node, *rest = callback_unpack(ctx)
+ #
+ # place = config['nodes'][node]['label'][ctx.user_lang]
+ #
+ # html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})'
+ # return html, None
+ #
+ # @classmethod
+ # def record_done(cls, info: dict, node: str, uid: int) -> str:
+ # ulang = bot.store.get_user_lang(uid)
+ #
+ # def lang(key, *args):
+ # return bot.lang.get(key, ulang, *args)
+ #
+ # rid = info['id']
+ # fmt = '%d.%m.%y %H:%M:%S'
+ # start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt)
+ # stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt)
+ #
+ # place = config['nodes'][node]['label'][ulang]
+ #
+ # html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n'
+ # html += f'<b>{lang("beginning")}</b>: {start_time}\n'
+ # html += f'<b>{lang("end")}</b>: {stop_time}'
+ #
+ # return html
+ #
+ # @classmethod
+ # def record_error(cls, info: dict, node: str, uid: int) -> str:
+ # ulang = bot.store.get_user_lang(uid)
+ #
+ # def lang(key, *args):
+ # return bot.lang.get(key, ulang, *args)
+ #
+ # place = config['nodes'][node]['label'][ulang]
+ # rid = info['id']
+ #
+ # html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})'
+ # if 'error' in info:
+ # html += '\n'+str(info['error'])
+ #
+ # return html
+
+
+# cameras handlers
+# ----------------
+
+def cameras(ctx: Context):
+ text, markup = CamerasRenderer.index(ctx)
+ if not ctx.is_callback_context():
+ return ctx.reply(text, markup=markup)
+ else:
+ ctx.answer()
+ return ctx.edit(text, markup=markup)
+
+
+def camera_options(ctx: Context) -> None:
+ cam, = callback_unpack(ctx)
+ if not camera_exists(cam):
+ ctx.answer(ctx.lang('invalid_location'))
+ return
+
+ ctx.answer()
+
+ text, markup = CamerasRenderer.camera(ctx)
+ ctx.edit(text, markup)
+
+
+def camera_capture(ctx: Context) -> None:
+ cam, flash = callback_unpack(ctx)
+ flash = int(flash)
+ if not camera_exists(cam):
+ ctx.answer(ctx.lang('invalid_location'))
+ return
+
+ ctx.answer()
+
+ client = camera_client(cam)
+ client.setflash(True if flash else False)
+ time.sleep(0.2)
+
+ fd = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg')
+ fd.close()
+
+ client.capture(fd.name)
+ logger.debug(f'captured photo ({cam}), saved to {fd.name}')
+
+ # disable flash led
+ if flash:
+ client.setflash(False)
+
+ try:
+ with open(fd.name, 'rb') as f:
+ bot.send_photo(ctx.user_id, photo=f)
+ except TelegramError as exc:
+ logger.exception(exc)
+
+ try:
+ os.unlink(fd.name)
+ except OSError as exc:
+ logger.exception(exc)
+
+
# settings handlers
# -----------------
@@ -626,7 +779,12 @@ class SoundBot(Wrapper):
sound_sensors="Датчики звука",
sound_sensors_info="Здесь можно получить информацию о последних срабатываниях датчиков звука.",
sound_sensors_no_24h_data="За последние 24 часа данных нет.",
- sound_sensors_show_anything="Показать, что есть"
+ sound_sensors_show_anything="Показать, что есть",
+
+ cameras="Камеры",
+ select_option="Выберите опцию",
+ w_flash="Со вспышкой",
+ wo_flash="Без вспышки",
)
self.lang.en(
@@ -672,7 +830,12 @@ class SoundBot(Wrapper):
sound_sensors="Sound sensors",
sound_sensors_info="Here you can get information about last sound sensors hits.",
sound_sensors_no_24h_data="No data for the last 24 hours.",
- sound_sensors_show_anything="Show me at least something"
+ sound_sensors_show_anything="Show me at least something",
+
+ cameras="Cameras",
+ select_option="Select option",
+ w_flash="With flash",
+ wo_flash="Without flash",
)
# ------
@@ -750,6 +913,21 @@ class SoundBot(Wrapper):
# list of specific node's files
# self.add_handler(CallbackQueryHandler(self.wrap(files_list), pattern=r'^g0/.*'))
+ # ------
+ # cameras
+ # ------------
+
+ # list of cameras
+ self.add_handler(MessageHandler(text_filter(self.lang.all('cameras')), self.wrap(cameras)))
+ self.add_handler(CallbackQueryHandler(self.wrap(cameras), pattern=r'^c0$'))
+
+ # list of options (with/without flash etc)
+ self.add_handler(CallbackQueryHandler(self.wrap(camera_options), pattern=r'^c0/.*'))
+
+ # cheese
+ self.add_handler(CallbackQueryHandler(self.wrap(camera_capture), pattern=r'^c1/.*'))
+
+
def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = [
[ctx.lang('record'), ctx.lang('settings')],
@@ -760,6 +938,8 @@ class SoundBot(Wrapper):
ctx.lang('guard_enable'), ctx.lang('guard_disable'), ctx.lang('guard_status')
])
buttons.append([ctx.lang('sound_sensors')])
+ if have_cameras():
+ buttons.append([ctx.lang('cameras')])
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)