diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2023-09-27 00:54:57 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2023-09-27 00:54:57 +0300 |
commit | d3a295872c49defb55fc8e4e43e55550991e0927 (patch) | |
tree | b9dca15454f9027d5a9dad0d4443a20de04dbc5d /bin/sound_bot.py | |
parent | b7cbc2571c1870b4582ead45277d0aa7f961bec8 (diff) | |
parent | bdbb296697f55f4c3a07af43c9aaf7a9ea86f3d0 (diff) |
Merge branch 'master' of ch1p.io:homekit
Diffstat (limited to 'bin/sound_bot.py')
-rwxr-xr-x | bin/sound_bot.py | 888 |
1 files changed, 888 insertions, 0 deletions
diff --git a/bin/sound_bot.py b/bin/sound_bot.py new file mode 100755 index 0000000..fa22ba7 --- /dev/null +++ b/bin/sound_bot.py @@ -0,0 +1,888 @@ +#!/usr/bin/env python3 +import logging +import os +import tempfile +import __py_include + +from enum import Enum +from datetime import datetime, timedelta +from html import escape +from typing import Optional, List, Dict, Tuple + +from homekit.config import config +from homekit.api import WebApiClient +from homekit.api.types import SoundSensorLocation +from homekit.api.errors import ApiResponseError +from homekit.media import SoundNodeClient, SoundRecordClient, SoundRecordFile, CameraNodeClient +from homekit.soundsensor import SoundSensorServerGuardClient +from homekit.util import Addr, chunks, filesize_fmt + +from homekit.telegram import bot + +from telegram.error import TelegramError +from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton, User + +from PIL import Image + +config.load_app('sound_bot') + +nodes = {} +for nodename, nodecfg in config['nodes'].items(): + nodes[nodename] = Addr.fromstring(nodecfg['addr']) + +bot.initialize() +bot.lang.ru( + start_message="Выберите команду на клавиатуре", + unknown_command="Неизвестная команда", + unexpected_callback_data="Ошибка: неверные данные", + settings="Настройки микшера", + record="Запись", + loading="Загрузка...", + select_place="Выберите место:", + invalid_location="Неверное место", + invalid_interval="Неверная длительность", + unsupported_action="Неподдерживаемое действие", + # select_control="Выберите контрол для изменения настроек:", + control_state="Состояние контрола %s", + incr="громкость +", + decr="громкость -", + back="◀️ Назад", + n_min="%d мин.", + n_sec="%d сек.", + select_interval="Выберите длительность:", + place="Место", + beginning="Начало", + end="Конец", + record_result="Результат записи", + record_started='Запись запущена!', + record_error="Ошибка записи", + files="Локальные файлы", + remote_files="Файлы на сервере", + file_line="— Запись с <b>%s</b> до <b>%s</b> <i>(%s)</i>", + access_denied="Доступ запрещён", + + guard_disable="Снять с охраны", + guard_enable="Поставить на охрану", + guard_status="Статус охраны", + guard_user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> %s.', + guard_user_action_enable="включил охрану ✅", + guard_user_action_disable="выключил охрану ❌", + guard_status_enabled="Включена ✅", + guard_status_disabled="Выключена ❌", + + done="Готово 👌", + + sound_sensors="Датчики звука", + sound_sensors_info="Здесь можно получить информацию о последних срабатываниях датчиков звука.", + sound_sensors_no_24h_data="За последние 24 часа данных нет.", + sound_sensors_show_anything="Показать, что есть", + + cameras="Камеры", + select_option="Выберите опцию", + w_flash="Со вспышкой", + wo_flash="Без вспышки", +) + +bot.lang.en( + start_message="Select command on the keyboard", + unknown_command="Unknown command", + settings="Mixer settings", + record="Record", + unexpected_callback_data="Unexpected callback data", + loading="Loading...", + select_place="Select place:", + invalid_location="Invalid place", + invalid_interval="Invalid duration", + unsupported_action="Unsupported action", + # select_control="Select control to adjust its parameters:", + control_state="%s control state", + incr="vol +", + decr="vol -", + back="◀️ Back", + n_min="%d min.", + n_sec="%d s.", + select_interval="Select duration:", + place="Place", + beginning="Started", + end="Ended", + record_result="Result", + record_started='Recording started!', + record_error="Recording error", + files="Local files", + remote_files="Remote files", + file_line="— From <b>%s</b> to <b>%s</b> <i>(%s)</i>", + access_denied="Access denied", + + guard_disable="Disable guard", + guard_enable="Enable guard", + guard_status="Guard status", + guard_user_action_notification='User <a href="tg://user?id=%d">%s</a> %s.', + guard_user_action_enable="turned the guard ON ✅", + guard_user_action_disable="turn the guard OFF ❌", + guard_status_enabled="Active ✅", + guard_status_disabled="Disabled ❌", + done="Done 👌", + + sound_sensors="Sound sensors", + sound_sensors_info="Here you can get information about last sound sensors hits.", + sound_sensors_no_24h_data="No data for the last 24 hours.", + sound_sensors_show_anything="Show me at least something", + + cameras="Cameras", + select_option="Select option", + w_flash="With flash", + wo_flash="Without flash", +) + +logger = logging.getLogger(__name__) +RenderedContent = Tuple[str, Optional[InlineKeyboardMarkup]] +record_client: Optional[SoundRecordClient] = None +node_client_links: Dict[str, SoundNodeClient] = {} +cam_client_links: Dict[str, CameraNodeClient] = {} + + +def node_client(node: str) -> SoundNodeClient: + if node not in node_client_links: + node_client_links[node] = SoundNodeClient(Addr.fromstring(config['nodes'][node]['addr'])) + return node_client_links[node] + + +def camera_client(cam: str) -> CameraNodeClient: + if cam not in node_client_links: + cam_client_links[cam] = CameraNodeClient(Addr.fromstring(config['cameras'][cam]['addr'])) + return cam_client_links[cam] + + +def node_exists(node: str) -> bool: + return node in config['nodes'] + + +def camera_exists(name: str) -> bool: + return name in config['cameras'] + + +def camera_settings(name: str) -> Optional[dict]: + try: + return config['cameras'][name]['settings'] + except KeyError: + return None + + +def have_cameras() -> bool: + return 'cameras' in config and config['cameras'] + + +def sound_sensor_exists(node: str) -> bool: + return node in config['sound_sensors'] + + +def interval_defined(interval: int) -> bool: + return interval in config['bot']['record_intervals'] + + +def callback_unpack(ctx: bot.Context) -> List[str]: + return ctx.callback_query.data[3:].split('/') + + +def manual_recording_allowed(user_id: int) -> bool: + return 'manual_record_allowlist' not in config['bot'] or user_id in config['bot']['manual_record_allowlist'] + + +def guard_client() -> SoundSensorServerGuardClient: + return SoundSensorServerGuardClient(Addr.fromstring(config['bot']['guard_server'])) + + +# message renderers +# ----------------- + +class Renderer: + @classmethod + def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup: + buttons = [] + for node, nodeconfig in config['nodes'].items(): + buttons.append([InlineKeyboardButton(nodeconfig['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{node}')]) + return InlineKeyboardMarkup(buttons) + + @classmethod + def back_button(cls, + ctx: bot.Context, + buttons: list, + callback_data: str): + buttons.append([ + InlineKeyboardButton(ctx.lang('back'), callback_data=callback_data) + ]) + + +class SettingsRenderer(Renderer): + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'<b>{ctx.lang("settings")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='s0') + + @classmethod + def node(cls, ctx: bot.Context, + controls: List[dict]) -> RenderedContent: + node, = callback_unpack(ctx) + + html = [] + buttons = [] + for control in controls: + html.append(f'<b>{control["name"]}</b>\n{escape(control["info"])}') + buttons.append([ + InlineKeyboardButton(control['name'], callback_data=f's1/{node}/{control["name"]}') + ]) + + html = "\n\n".join(html) + cls.back_button(ctx, buttons, callback_data='s0') + + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def control(cls, ctx: bot.Context, data) -> RenderedContent: + node, control, *rest = callback_unpack(ctx) + + html = '<b>' + ctx.lang('control_state', control) + '</b>\n\n' + html += escape(data['info']) + buttons = [] + callback_prefix = f's2/{node}/{control}' + for cap in data['caps']: + if cap == 'mute': + muted = 'dB] [off]' in data['info'] + act = 'unmute' if muted else 'mute' + buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')]) + + elif cap == 'cap': + cap_dis = 'Capture [off]' in data['info'] + act = 'cap' if cap_dis else 'nocap' + buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')]) + + elif cap == 'volume': + buttons.append( + list(map(lambda s: InlineKeyboardButton(ctx.lang(s), callback_data=f'{callback_prefix}/{s}'), + ['decr', 'incr'])) + ) + + cls.back_button(ctx, buttons, callback_data=f's0/{node}') + + return html, InlineKeyboardMarkup(buttons) + + +class RecordRenderer(Renderer): + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'<b>{ctx.lang("record")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='r0') + + @classmethod + def node(cls, ctx: bot.Context, durations: List[int]) -> RenderedContent: + node, = callback_unpack(ctx) + + html = ctx.lang('select_interval') + + buttons = [] + for s in durations: + if s >= 60: + m = int(s / 60) + label = ctx.lang('n_min', m) + else: + label = ctx.lang('n_sec', s) + buttons.append(InlineKeyboardButton(label, callback_data=f'r1/{node}/{s}')) + buttons = list(chunks(buttons, 3)) + cls.back_button(ctx, buttons, callback_data=f'r0') + + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent: + node, *rest = callback_unpack(ctx) + + place = config['nodes'][node]['label'][ctx.user_lang] + + html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})' + return html, None + + @classmethod + def record_done(cls, info: dict, node: str, uid: int) -> str: + ulang = bot.db.get_user_lang(uid) + + def lang(key, *args): + return bot.lang.get(key, ulang, *args) + + rid = info['id'] + fmt = '%d.%m.%y %H:%M:%S' + start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt) + stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt) + + place = config['nodes'][node]['label'][ulang] + + html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n' + html += f'<b>{lang("beginning")}</b>: {start_time}\n' + html += f'<b>{lang("end")}</b>: {stop_time}' + + return html + + @classmethod + def record_error(cls, info: dict, node: str, uid: int) -> str: + ulang = bot.db.get_user_lang(uid) + + def lang(key, *args): + return bot.lang.get(key, ulang, *args) + + place = config['nodes'][node]['label'][ulang] + rid = info['id'] + + html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})' + if 'error' in info: + html += '\n'+str(info['error']) + + return html + + +class FilesRenderer(Renderer): + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'<b>{ctx.lang("files")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='f0') + + @classmethod + def filelist(cls, ctx: bot.Context, files: List[SoundRecordFile]) -> RenderedContent: + node, = callback_unpack(ctx) + + html_files = map(lambda file: cls.file(ctx, file, node), files) + html = '\n\n'.join(html_files) + + buttons = [] + cls.back_button(ctx, buttons, callback_data='f0') + + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def file(cls, ctx: bot.Context, file: SoundRecordFile, node: str) -> str: + html = ctx.lang('file_line', file.start_humantime, file.stop_humantime, filesize_fmt(file.filesize)) + if file.file_id is not None: + html += f'/audio_{node}_{file.file_id}' + return html + + +class RemoteFilesRenderer(FilesRenderer): + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'<b>{ctx.lang("remote_files")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='g0') + + +class SoundSensorRenderer(Renderer): + @classmethod + def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup: + buttons = [] + for sensor, sensor_label in config['sound_sensors'].items(): + buttons.append( + [InlineKeyboardButton(sensor_label[ctx.user_lang], callback_data=f'{callback_prefix}/{sensor}')]) + return InlineKeyboardMarkup(buttons) + + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'{ctx.lang("sound_sensors_info")}\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='S0') + + @classmethod + def hits(cls, ctx: bot.Context, data, is_last=False) -> RenderedContent: + node, = callback_unpack(ctx) + buttons = [] + + if not data: + html = ctx.lang('sound_sensors_no_24h_data') + if not is_last: + buttons.append([InlineKeyboardButton(ctx.lang('sound_sensors_show_anything'), callback_data=f'S1/{node}')]) + else: + html = '' + prev_date = None + for item in data: + item_date = item['time'].strftime('%d.%m.%y') + if prev_date is None or prev_date != item_date: + if html != '': + html += '\n\n' + html += f'<b>{item_date}</b>' + prev_date = item_date + html += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})' + cls.back_button(ctx, buttons, callback_data='S0') + return html, InlineKeyboardMarkup(buttons) + + @classmethod + def hits_plain(cls, ctx: bot.Context, data, is_last=False) -> bytes: + node, = callback_unpack(ctx) + + text = '' + prev_date = None + for item in data: + item_date = item['time'].strftime('%d.%m.%y') + if prev_date is None or prev_date != item_date: + if text != '': + text += '\n\n' + text += item_date + prev_date = item_date + text += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})' + + return text.encode() + + +class CamerasRenderer(Renderer): + @classmethod + def index(cls, ctx: bot.Context) -> RenderedContent: + html = f'<b>{ctx.lang("cameras")}</b>\n\n' + html += ctx.lang('select_place') + return html, cls.places_markup(ctx, callback_prefix='c0') + + @classmethod + def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup: + buttons = [] + for camera_name, camera_data in config['cameras'].items(): + buttons.append( + [InlineKeyboardButton(camera_data['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{camera_name}')]) + return InlineKeyboardMarkup(buttons) + + @classmethod + def camera(cls, ctx: bot.Context, flash_available: bool) -> RenderedContent: + node, = callback_unpack(ctx) + + html = ctx.lang('select_option') + + buttons = [] + if flash_available: + buttons.append(InlineKeyboardButton(ctx.lang('w_flash'), callback_data=f'c1/{node}/1')) + buttons.append(InlineKeyboardButton(ctx.lang('wo_flash'), callback_data=f'c1/{node}/0')) + + cls.back_button(ctx, [buttons], callback_data=f'c0') + + return html, InlineKeyboardMarkup([buttons]) + # + # @classmethod + # def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent: + # node, *rest = callback_unpack(ctx) + # + # place = config['nodes'][node]['label'][ctx.user_lang] + # + # html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})' + # return html, None + # + # @classmethod + # def record_done(cls, info: dict, node: str, uid: int) -> str: + # ulang = bot.db.get_user_lang(uid) + # + # def lang(key, *args): + # return bot.lang.get(key, ulang, *args) + # + # rid = info['id'] + # fmt = '%d.%m.%y %H:%M:%S' + # start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt) + # stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt) + # + # place = config['nodes'][node]['label'][ulang] + # + # html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n' + # html += f'<b>{lang("beginning")}</b>: {start_time}\n' + # html += f'<b>{lang("end")}</b>: {stop_time}' + # + # return html + # + # @classmethod + # def record_error(cls, info: dict, node: str, uid: int) -> str: + # ulang = bot.db.get_user_lang(uid) + # + # def lang(key, *args): + # return bot.lang.get(key, ulang, *args) + # + # place = config['nodes'][node]['label'][ulang] + # rid = info['id'] + # + # html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})' + # if 'error' in info: + # html += '\n'+str(info['error']) + # + # return html + + +# cameras handlers +# ---------------- + +@bot.handler(message='cameras', callback=r'^c0$') +def cameras(ctx: bot.Context): + """ List of cameras """ + + text, markup = CamerasRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +@bot.callbackhandler(callback=r'^c0/.*') +def camera_options(ctx: bot.Context) -> None: + """ List of options (with/without flash etc) """ + + cam, = callback_unpack(ctx) + if not camera_exists(cam): + ctx.answer(ctx.lang('invalid_location')) + return + + ctx.answer() + flash_available = 'flash_available' in config['cameras'][cam] and config['cameras'][cam]['flash_available'] is True + + text, markup = CamerasRenderer.camera(ctx, flash_available) + ctx.edit(text, markup) + + +@bot.callbackhandler(callback=r'^c1/.*') +def camera_capture(ctx: bot.Context) -> None: + """ Cheese """ + + cam, flash = callback_unpack(ctx) + flash = int(flash) + if not camera_exists(cam): + ctx.answer(ctx.lang('invalid_location')) + return + + ctx.answer() + + client = camera_client(cam) + fd = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') + fd.close() + + client.capture(fd.name, with_flash=bool(flash)) + logger.debug(f'captured photo ({cam}), saved to {fd.name}') + + camera_config = config['cameras'][cam] + if 'rotate' in camera_config: + im = Image.open(fd.name) + im.rotate(camera_config['rotate'], expand=True) + # im.show() + im.save(fd.name) + logger.debug(f"rotated image {camera_config['rotate']} degrees") + + try: + with open(fd.name, 'rb') as f: + bot.send_photo(ctx.user_id, photo=f) + except TelegramError as exc: + logger.exception(exc) + + try: + os.unlink(fd.name) + except OSError as exc: + logger.exception(exc) + + +# settings handlers +# ----------------- + +@bot.handler(message='settings', callback=r'^s0$') +def settings(ctx: bot.Context): + """ List of nodes """ + + text, markup = SettingsRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +@bot.callbackhandler(callback=r'^s0/.*') +def settings_place(ctx: bot.Context): + """ List of controls """ + + node, = callback_unpack(ctx) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + + cl = node_client(node) + controls = cl.amixer_get_all() + + ctx.answer() + + text, markup = SettingsRenderer.node(ctx, controls) + ctx.edit(text, markup) + + +@bot.callbackhandler(callback=r'^s1/.*') +def settings_place_control(ctx: bot.Context): + """ List of available tunes for control """ + + node, control = callback_unpack(ctx) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + + cl = node_client(node) + control_data = cl.amixer_get(control) + + ctx.answer() + + text, markup = SettingsRenderer.control(ctx, control_data) + ctx.edit(text, markup) + + +@bot.callbackhandler(callback=r'^s2/.*') +def settings_place_control_action(ctx: bot.Context): + """ Tuning """ + + node, control, action = callback_unpack(ctx) + if not node_exists(node): + return + + cl = node_client(node) + if not hasattr(cl, f'amixer_{action}'): + ctx.answer(ctx.lang('invalid_action')) + return + + func = getattr(cl, f'amixer_{action}') + control_data = func(control) + + ctx.answer() + + text, markup = SettingsRenderer.control(ctx, control_data) + ctx.edit(text, markup) + + +# recording handlers +# ------------------ + +@bot.handler(message='record', callback=r'^r0$') +def record(ctx: bot.Context): + """ List of nodes """ + + if not manual_recording_allowed(ctx.user_id): + return ctx.reply(ctx.lang('access_denied')) + + text, markup = RecordRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +@bot.callbackhandler(callback=r'^r0/.*') +def record_place(ctx: bot.Context): + """ List of available intervals """ + + node, = callback_unpack(ctx) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + + ctx.answer() + + text, markup = RecordRenderer.node(ctx, config['bot']['record_intervals']) + ctx.edit(text, markup) + + +@bot.callbackhandler(callback=r'^r1/.*') +def record_place_interval(ctx: bot.Context): + """ Do record! """ + + node, interval = callback_unpack(ctx) + interval = int(interval) + if not node_exists(node): + ctx.answer(ctx.lang('invalid_location')) + return + if not interval_defined(interval): + ctx.answer(ctx.lang('invalid_interval')) + return + + try: + record_id = record_client.record(node, interval, {'user_id': ctx.user_id, 'node': node}) + except ApiResponseError as e: + ctx.answer(e.error_message) + logger.error(e) + return + + ctx.answer() + + html, markup = RecordRenderer.record_started(ctx, record_id) + ctx.edit(html, markup) + + +# sound sensor handlers +# --------------------- + +@bot.handler(message='sound_sensors', callback=r'^S0$') +def sound_sensors(ctx: bot.Context): + """ List of places """ + + text, markup = SoundSensorRenderer.index(ctx) + if not ctx.is_callback_context(): + return ctx.reply(text, markup=markup) + else: + ctx.answer() + return ctx.edit(text, markup=markup) + + +@bot.callbackhandler(callback=r'^S0/.*') +def sound_sensors_last_24h(ctx: bot.Context): + """ Last 24h log """ + + node, = callback_unpack(ctx) + if not sound_sensor_exists(node): + ctx.answer(ctx.lang('invalid location')) + return + + ctx.answer() + + cl = WebApiClient() + data = cl.get_sound_sensor_hits(location=SoundSensorLocation[node.upper()], + after=datetime.now() - timedelta(hours=24)) + + text, markup = SoundSensorRenderer.hits(ctx, data) + if len(text) > 4096: + plain = SoundSensorRenderer.hits_plain(ctx, data) + bot.send_file(ctx.user_id, document=plain, filename='data.txt') + else: + ctx.edit(text, markup=markup) + + +@bot.callbackhandler(callback=r'^S1/.*') +def sound_sensors_last_anything(ctx: bot.Context): + """ Last _something_ """ + + node, = callback_unpack(ctx) + if not sound_sensor_exists(node): + ctx.answer(ctx.lang('invalid location')) + return + + ctx.answer() + + cl = WebApiClient() + data = cl.get_last_sound_sensor_hits(location=SoundSensorLocation[node.upper()], + last=20) + + text, markup = SoundSensorRenderer.hits(ctx, data, is_last=True) + if len(text) > 4096: + plain = SoundSensorRenderer.hits_plain(ctx, data) + bot.send_file(ctx.user_id, document=plain, filename='data.txt') + else: + ctx.edit(text, markup=markup) + + +# guard enable/disable handlers +# ----------------------------- + +class GuardUserAction(Enum): + ENABLE = 'enable' + DISABLE = 'disable' + + +if 'guard_server' in config['bot']: + @bot.handler(message='guard_status') + def guard_status(ctx: bot.Context): + guard = guard_client() + resp = guard.guard_status() + + key = 'enabled' if resp['enabled'] is True else 'disabled' + ctx.reply(ctx.lang(f'guard_status_{key}')) + + + @bot.handler(message='guard_enable') + def guard_enable(ctx: bot.Context): + guard = guard_client() + guard.guard_enable() + ctx.reply(ctx.lang('done')) + + _guard_notify(ctx.user, GuardUserAction.ENABLE) + + + @bot.handler(message='guard_disable') + def guard_disable(ctx: bot.Context): + guard = guard_client() + guard.guard_disable() + ctx.reply(ctx.lang('done')) + + _guard_notify(ctx.user, GuardUserAction.DISABLE) + + + def _guard_notify(user: User, action: GuardUserAction): + def text_getter(lang: str): + action_name = bot.lang.get(f'guard_user_action_{action.value}', lang) + user_name = bot.user_any_name(user) + return 'ℹ ' + bot.lang.get('guard_user_action_notification', lang, + user.id, user_name, action_name) + + bot.notify_all(text_getter, exclude=(user.id,)) + + +@bot.defaultreplymarkup +def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: + buttons = [ + [ctx.lang('record'), ctx.lang('settings')], + # [ctx.lang('files'), ctx.lang('remote_files')], + ] + if 'guard_server' in config['bot']: + buttons.append([ + ctx.lang('guard_enable'), ctx.lang('guard_disable'), ctx.lang('guard_status') + ]) + buttons.append([ctx.lang('sound_sensors')]) + if have_cameras(): + buttons.append([ctx.lang('cameras')]) + return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) + + +# record client callbacks +# ----------------------- + +def record_onerror(info: dict, userdata: dict): + uid = userdata['user_id'] + node = userdata['node'] + + html = RecordRenderer.record_error(info, node, uid) + try: + bot.notify_user(userdata['user_id'], html) + except TelegramError as exc: + logger.exception(exc) + finally: + record_client.forget(node, info['id']) + + +def record_onfinished(info: dict, fn: str, userdata: dict): + logger.info('record finished: ' + str(info)) + + uid = userdata['user_id'] + node = userdata['node'] + + html = RecordRenderer.record_done(info, node, uid) + bot.notify_user(uid, html) + + try: + # sending audiofile to telegram + with open(fn, 'rb') as f: + bot.send_audio(uid, audio=f, filename='audio.mp3') + + # deleting temp file + try: + os.unlink(fn) + except OSError as exc: + logger.exception(exc) + bot.notify_user(uid, exc) + + # remove the recording from sound_node's history + record_client.forget(node, info['id']) + + # remove file from storage + # node_client(node).storage_delete(info['file']['fileid']) + except Exception as e: + logger.exception(e) + + +if __name__ == '__main__': + record_client = SoundRecordClient(nodes, + error_handler=record_onerror, + finished_handler=record_onfinished, + download_on_finish=True) + + bot.run() + record_client.stop() |