#!/usr/bin/env python3 import logging import os from enum import Enum from datetime import datetime, timedelta from html import escape from typing import Optional from home.config import config from home.bot import Wrapper, Context, text_filter, user_any_name from home.api.types import BotType from home.api.errors import ApiResponseError from home.sound import SoundNodeClient, RecordClient, RecordFile from home.soundsensor import SoundSensorServerGuardClient from home.util import parse_addr, chunks, filesize_fmt from home.api import WebAPIClient from home.api.types import SoundSensorLocation from telegram.error import TelegramError from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton, User from telegram.ext import ( CallbackQueryHandler, MessageHandler ) logger = logging.getLogger(__name__) RenderedContent = tuple[str, Optional[InlineKeyboardMarkup]] record_client: Optional[RecordClient] = None bot: Optional[Wrapper] = None node_client_links: dict[str, SoundNodeClient] = {} def node_client(node: str) -> SoundNodeClient: if node not in node_client_links: node_client_links[node] = SoundNodeClient(parse_addr(config['nodes'][node]['addr'])) return node_client_links[node] def node_exists(node: str) -> bool: return node in config['nodes'] def sound_sensor_exists(node: str) -> bool: return node in config['sound_sensors'] def interval_defined(interval: int) -> bool: return interval in config['bot']['record_intervals'] def callback_unpack(ctx: Context) -> list[str]: return ctx.callback_query.data[3:].split('/') def manual_recording_allowed(user_id: int) -> bool: return 'manual_record_allowlist' not in config['bot'] or user_id in config['bot']['manual_record_allowlist'] def guard_client() -> SoundSensorServerGuardClient: return SoundSensorServerGuardClient(parse_addr(config['bot']['guard_server'])) # message renderers # ----------------- class Renderer: @classmethod def places_markup(cls, ctx: Context, callback_prefix: str) -> InlineKeyboardMarkup: buttons = [] for node, nodeconfig in config['nodes'].items(): buttons.append([InlineKeyboardButton(nodeconfig['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{node}')]) return InlineKeyboardMarkup(buttons) @classmethod def back_button(cls, ctx: Context, buttons: list, callback_data: str): buttons.append([ InlineKeyboardButton(ctx.lang('back'), callback_data=callback_data) ]) class SettingsRenderer(Renderer): @classmethod def index(cls, ctx: Context) -> RenderedContent: html = f'{ctx.lang("settings")}\n\n' html += ctx.lang('select_place') return html, cls.places_markup(ctx, callback_prefix='s0') @classmethod def node(cls, ctx: Context, controls: list[dict]) -> RenderedContent: node, = callback_unpack(ctx) html = [] buttons = [] for control in controls: html.append(f'{control["name"]}\n{escape(control["info"])}') buttons.append([ InlineKeyboardButton(control['name'], callback_data=f's1/{node}/{control["name"]}') ]) html = "\n\n".join(html) cls.back_button(ctx, buttons, callback_data='s0') return html, InlineKeyboardMarkup(buttons) @classmethod def control(cls, ctx: Context, data) -> RenderedContent: node, control, *rest = callback_unpack(ctx) html = '' + ctx.lang('control_state', control) + '\n\n' html += escape(data['info']) buttons = [] callback_prefix = f's2/{node}/{control}' for cap in data['caps']: if cap == 'mute': muted = 'dB] [off]' in data['info'] act = 'unmute' if muted else 'mute' buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')]) elif cap == 'cap': cap_dis = 'Capture [off]' in data['info'] act = 'cap' if cap_dis else 'nocap' buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')]) elif cap == 'volume': buttons.append( list(map(lambda s: InlineKeyboardButton(ctx.lang(s), callback_data=f'{callback_prefix}/{s}'), ['decr', 'incr'])) ) cls.back_button(ctx, buttons, callback_data=f's0/{node}') return html, InlineKeyboardMarkup(buttons) class RecordRenderer(Renderer): @classmethod def index(cls, ctx: Context) -> RenderedContent: html = f'{ctx.lang("record")}\n\n' html += ctx.lang('select_place') return html, cls.places_markup(ctx, callback_prefix='r0') @classmethod def node(cls, ctx: Context, durations: list[int]) -> RenderedContent: node, = callback_unpack(ctx) html = ctx.lang('select_interval') buttons = [] for s in durations: if s >= 60: m = int(s / 60) label = ctx.lang('n_min', m) else: label = ctx.lang('n_sec', s) buttons.append(InlineKeyboardButton(label, callback_data=f'r1/{node}/{s}')) buttons = list(chunks(buttons, 3)) cls.back_button(ctx, buttons, callback_data=f'r0') return html, InlineKeyboardMarkup(buttons) @classmethod def record_started(cls, ctx: Context, rid: int) -> RenderedContent: node, *rest = callback_unpack(ctx) place = config['nodes'][node]['label'][ctx.user_lang] html = f'{ctx.lang("record_started")} ({place}, id={rid})' return html, None @classmethod def record_done(cls, info: dict, node: str, uid: int) -> str: ulang = bot.store.get_user_lang(uid) def lang(key, *args): return bot.lang.get(key, ulang, *args) rid = info['id'] fmt = '%d.%m.%y %H:%M:%S' start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt) stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt) place = config['nodes'][node]['label'][ulang] html = f'{lang("record_result")} ({place}, id={rid})\n\n' html += f'{lang("beginning")}: {start_time}\n' html += f'{lang("end")}: {stop_time}' return html @classmethod def record_error(cls, info: dict, node: str, uid: int) -> str: ulang = bot.store.get_user_lang(uid) def lang(key, *args): return bot.lang.get(key, ulang, *args) place = config['nodes'][node]['label'][ulang] rid = info['id'] html = f'{lang("record_error")} ({place}, id={rid})' if 'error' in info: html += '\n'+str(info['error']) return html class FilesRenderer(Renderer): @classmethod def index(cls, ctx: Context) -> RenderedContent: html = f'{ctx.lang("files")}\n\n' html += ctx.lang('select_place') return html, cls.places_markup(ctx, callback_prefix='f0') @classmethod def filelist(cls, ctx: Context, files: list[RecordFile]) -> RenderedContent: node, = callback_unpack(ctx) html_files = map(lambda file: cls.file(ctx, file, node), files) html = '\n\n'.join(html_files) buttons = [] cls.back_button(ctx, buttons, callback_data='f0') return html, InlineKeyboardMarkup(buttons) @classmethod def file(cls, ctx: Context, file: RecordFile, node: str) -> str: html = ctx.lang('file_line', file.start_humantime, file.stop_humantime, filesize_fmt(file.filesize)) if file.file_id is not None: html += f'/audio_{node}_{file.file_id}' return html class RemoteFilesRenderer(FilesRenderer): @classmethod def index(cls, ctx: Context) -> RenderedContent: html = f'{ctx.lang("remote_files")}\n\n' html += ctx.lang('select_place') return html, cls.places_markup(ctx, callback_prefix='g0') class SoundSensorRenderer(Renderer): @classmethod def places_markup(cls, ctx: Context, callback_prefix: str) -> InlineKeyboardMarkup: buttons = [] for sensor, sensor_label in config['sound_sensors'].items(): buttons.append( [InlineKeyboardButton(sensor_label[ctx.user_lang], callback_data=f'{callback_prefix}/{sensor}')]) return InlineKeyboardMarkup(buttons) @classmethod def index(cls, ctx: Context) -> RenderedContent: html = f'{ctx.lang("sound_sensors_info")}\n\n' html += ctx.lang('select_place') return html, cls.places_markup(ctx, callback_prefix='S0') @classmethod def hits(cls, ctx: Context, data, is_last=False) -> RenderedContent: node, = callback_unpack(ctx) buttons = [] if not data: html = ctx.lang('sound_sensors_no_24h_data') if not is_last: buttons.append([InlineKeyboardButton(ctx.lang('sound_sensors_show_anything'), callback_data=f'S1/{node}')]) else: html = '' prev_date = None for item in data: item_date = item['time'].strftime('%d.%m.%y') if prev_date is None or prev_date != item_date: if html != '': html += '\n\n' html += f'{item_date}' prev_date = item_date html += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})' cls.back_button(ctx, buttons, callback_data='S0') return html, InlineKeyboardMarkup(buttons) @classmethod def hits_plain(cls, ctx: Context, data, is_last=False) -> bytes: node, = callback_unpack(ctx) text = '' prev_date = None for item in data: item_date = item['time'].strftime('%d.%m.%y') if prev_date is None or prev_date != item_date: if text != '': text += '\n\n' text += item_date prev_date = item_date text += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})' return text.encode() # settings handlers # ----------------- def settings(ctx: Context): text, markup = SettingsRenderer.index(ctx) if not ctx.is_callback_context(): return ctx.reply(text, markup=markup) else: ctx.answer() return ctx.edit(text, markup=markup) def settings_place(ctx: Context) -> None: node, = callback_unpack(ctx) if not node_exists(node): ctx.answer(ctx.lang('invalid_location')) return cl = node_client(node) controls = cl.amixer_get_all() ctx.answer() text, markup = SettingsRenderer.node(ctx, controls) ctx.edit(text, markup) def settings_place_control(ctx: Context) -> None: node, control = callback_unpack(ctx) if not node_exists(node): ctx.answer(ctx.lang('invalid_location')) return cl = node_client(node) control_data = cl.amixer_get(control) ctx.answer() text, markup = SettingsRenderer.control(ctx, control_data) ctx.edit(text, markup) def settings_place_control_action(ctx: Context) -> None: node, control, action = callback_unpack(ctx) if not node_exists(node): return cl = node_client(node) if not hasattr(cl, f'amixer_{action}'): ctx.answer(ctx.lang('invalid_action')) return func = getattr(cl, f'amixer_{action}') control_data = func(control) ctx.answer() text, markup = SettingsRenderer.control(ctx, control_data) ctx.edit(text, markup) # recording handlers # ------------------ def record(ctx: Context): if not manual_recording_allowed(ctx.user_id): return ctx.reply(ctx.lang('access_denied')) text, markup = RecordRenderer.index(ctx) if not ctx.is_callback_context(): return ctx.reply(text, markup=markup) else: ctx.answer() return ctx.edit(text, markup=markup) def record_place(ctx: Context) -> None: node, = callback_unpack(ctx) if not node_exists(node): ctx.answer(ctx.lang('invalid_location')) return ctx.answer() text, markup = RecordRenderer.node(ctx, config['bot']['record_intervals']) ctx.edit(text, markup) def record_place_interval(ctx: Context) -> None: node, interval = callback_unpack(ctx) interval = int(interval) if not node_exists(node): ctx.answer(ctx.lang('invalid_location')) return if not interval_defined(interval): ctx.answer(ctx.lang('invalid_interval')) return try: record_id = record_client.record(node, interval, {'user_id': ctx.user_id, 'node': node}) except ApiResponseError as e: ctx.answer(e.error_message) logger.error(e) return ctx.answer() html, markup = RecordRenderer.record_started(ctx, record_id) ctx.edit(html, markup) # files handlers # -------------- # def files(ctx: Context, remote=False): # renderer = RemoteFilesRenderer if remote else FilesRenderer # text, markup = renderer.index(ctx) # if not ctx.is_callback_context(): # return ctx.reply(text, markup=markup) # else: # ctx.answer() # return ctx.edit(text, markup=markup) # # # def files_list(ctx: Context): # node, = callback_unpack(ctx) # if not node_exists(node): # ctx.answer(ctx.lang('invalid_location')) # return # # ctx.answer() # # cl = node_client(node) # files = cl.storage_list(extended=True, as_objects=True) # # text, markup = FilesRenderer.filelist(ctx, files) # ctx.edit(text, markup) # sound sensor handlers # --------------------- def sound_sensors(ctx: Context): text, markup = SoundSensorRenderer.index(ctx) if not ctx.is_callback_context(): return ctx.reply(text, markup=markup) else: ctx.answer() return ctx.edit(text, markup=markup) def sound_sensors_last_24h(ctx: Context): node, = callback_unpack(ctx) if not sound_sensor_exists(node): ctx.answer(ctx.lang('invalid location')) return ctx.answer() cl = WebAPIClient() data = cl.get_sound_sensor_hits(location=SoundSensorLocation[node.upper()], after=datetime.now() - timedelta(hours=24)) text, markup = SoundSensorRenderer.hits(ctx, data) if len(text) > 4096: plain = SoundSensorRenderer.hits_plain(ctx, data) bot.send_file(ctx.user_id, document=plain, filename='data.txt') else: ctx.edit(text, markup=markup) def sound_sensors_last_anything(ctx: Context): node, = callback_unpack(ctx) if not sound_sensor_exists(node): ctx.answer(ctx.lang('invalid location')) return ctx.answer() cl = WebAPIClient() data = cl.get_last_sound_sensor_hits(location=SoundSensorLocation[node.upper()], last=20) text, markup = SoundSensorRenderer.hits(ctx, data, is_last=True) if len(text) > 4096: plain = SoundSensorRenderer.hits_plain(ctx, data) bot.send_file(ctx.user_id, document=plain, filename='data.txt') else: ctx.edit(text, markup=markup) # guard enable/disable handlers # ----------------------------- class GuardUserAction(Enum): ENABLE = 'enable' DISABLE = 'disable' def guard_status(ctx: Context): guard = guard_client() resp = guard.guard_status() key = 'enabled' if resp['enabled'] is True else 'disabled' ctx.reply(ctx.lang(f'guard_status_{key}')) def guard_enable(ctx: Context): guard = guard_client() guard.guard_enable() ctx.reply(ctx.lang('done')) _guard_notify(ctx.user, GuardUserAction.ENABLE) def guard_disable(ctx: Context): guard = guard_client() guard.guard_disable() ctx.reply(ctx.lang('done')) _guard_notify(ctx.user, GuardUserAction.DISABLE) def _guard_notify(user: User, action: GuardUserAction): def text_getter(lang: str): action_name = bot.lang.get(f'guard_user_action_{action.value}', lang) user_name = user_any_name(user) return 'ℹ ' + bot.lang.get('guard_user_action_notification', lang, user.id, user_name, action_name) bot.notify_all(text_getter, exclude=(user.id,)) # record client callbacks # ----------------------- def record_onerror(info: dict, userdata: dict): uid = userdata['user_id'] node = userdata['node'] html = RecordRenderer.record_error(info, node, uid) try: bot.notify_user(userdata['user_id'], html) except TelegramError as exc: logger.exception(exc) finally: record_client.forget(node, info['id']) def record_onfinished(info: dict, fn: str, userdata: dict): logger.info('record finished: ' + str(info)) uid = userdata['user_id'] node = userdata['node'] html = RecordRenderer.record_done(info, node, uid) bot.notify_user(uid, html) try: # sending audiofile to telegram with open(fn, 'rb') as f: bot.send_audio(uid, audio=f, filename='audio.mp3') # deleting temp file try: os.unlink(fn) except OSError as exc: logger.exception(exc) bot.notify_user(uid, exc) # remove the recording from sound_node's history record_client.forget(node, info['id']) # remove file from storage # node_client(node).storage_delete(info['file']['fileid']) except Exception as e: logger.exception(e) class SoundBot(Wrapper): def __init__(self): super().__init__() self.lang.ru( start_message="Выберите команду на клавиатуре", unknown_command="Неизвестная команда", unexpected_callback_data="Ошибка: неверные данные", settings="Настройки микшера", record="Запись", loading="Загрузка...", select_place="Выберите место:", invalid_location="Неверное место", invalid_interval="Неверная длительность", unsupported_action="Неподдерживаемое действие", # select_control="Выберите контрол для изменения настроек:", control_state="Состояние контрола %s", incr="громкость +", decr="громкость -", back="◀️ Назад", n_min="%d мин.", n_sec="%d сек.", select_interval="Выберите длительность:", place="Место", beginning="Начало", end="Конец", record_result="Результат записи", record_started='Запись запущена!', record_error="Ошибка записи", files="Локальные файлы", remote_files="Файлы на сервере", file_line="— Запись с %s до %s (%s)", access_denied="Доступ запрещён", guard_disable="Снять с охраны", guard_enable="Поставить на охрану", guard_status="Статус охраны", guard_user_action_notification='Пользователь %s %s.', guard_user_action_enable="включил охрану ✅", guard_user_action_disable="выключил охрану ❌", guard_status_enabled="Включена ✅", guard_status_disabled="Выключена ❌", done="Готово 👌", sound_sensors="Датчики звука", sound_sensors_info="Здесь можно получить информацию о последних срабатываниях датчиков звука.", sound_sensors_no_24h_data="За последние 24 часа данных нет.", sound_sensors_show_anything="Показать, что есть" ) self.lang.en( start_message="Select command on the keyboard", unknown_command="Unknown command", settings="Mixer settings", record="Record", unexpected_callback_data="Unexpected callback data", loading="Loading...", select_place="Select place:", invalid_location="Invalid place", invalid_interval="Invalid duration", unsupported_action="Unsupported action", # select_control="Select control to adjust its parameters:", control_state="%s control state", incr="vol +", decr="vol -", back="◀️ Back", n_min="%d min.", n_sec="%d s.", select_interval="Select duration:", place="Place", beginning="Started", end="Ended", record_result="Result", record_started='Recording started!', record_error="Recording error", files="Local files", remote_files="Remote files", file_line="— From %s to %s (%s)", access_denied="Access denied", guard_disable="Disable guard", guard_enable="Enable guard", guard_status="Guard status", guard_user_action_notification='User %s %s.', guard_user_action_enable="turned the guard ON ✅", guard_user_action_disable="turn the guard OFF ❌", guard_status_enabled="Active ✅", guard_status_disabled="Disabled ❌", done="Done 👌", sound_sensors="Sound sensors", sound_sensors_info="Here you can get information about last sound sensors hits.", sound_sensors_no_24h_data="No data for the last 24 hours.", sound_sensors_show_anything="Show me at least something" ) # ------ # settings # ------------- # list of nodes self.add_handler(MessageHandler(text_filter(self.lang.all('settings')), self.wrap(settings))) self.add_handler(CallbackQueryHandler(self.wrap(settings), pattern=r'^s0$')) # list of controls self.add_handler(CallbackQueryHandler(self.wrap(settings_place), pattern=r'^s0/.*')) # list of available tunes for control self.add_handler(CallbackQueryHandler(self.wrap(settings_place_control), pattern=r'^s1/.*')) # tuning self.add_handler(CallbackQueryHandler(self.wrap(settings_place_control_action), pattern=r'^s2/.*')) # ------ # recording # -------------- # list of nodes self.add_handler(MessageHandler(text_filter(self.lang.all('record')), self.wrap(record))) self.add_handler(CallbackQueryHandler(self.wrap(record), pattern=r'^r0$')) # list of available intervals self.add_handler(CallbackQueryHandler(self.wrap(record_place), pattern=r'^r0/.*')) # do record! self.add_handler(CallbackQueryHandler(self.wrap(record_place_interval), pattern=r'^r1/.*')) # --------- # sound sensors # ------------------ # list of places self.add_handler(MessageHandler(text_filter(self.lang.all('sound_sensors')), self.wrap(sound_sensors))) self.add_handler(CallbackQueryHandler(self.wrap(sound_sensors), pattern=r'^S0$')) # last 24h log self.add_handler(CallbackQueryHandler(self.wrap(sound_sensors_last_24h), pattern=r'^S0/.*')) # last _something_ self.add_handler(CallbackQueryHandler(self.wrap(sound_sensors_last_anything), pattern=r'^S1/.*')) # ------------- # guard enable/disable # ------------------------- if 'guard_server' in config['bot']: self.add_handler(MessageHandler(text_filter(self.lang.all('guard_enable')), self.wrap(guard_enable))) self.add_handler(MessageHandler(text_filter(self.lang.all('guard_disable')), self.wrap(guard_disable))) self.add_handler(MessageHandler(text_filter(self.lang.all('guard_status')), self.wrap(guard_status))) # -------- # local files # ---------------- # list of nodes # self.add_handler(MessageHandler(text_filter(self.lang.all('files')), self.wrap(partial(files, remote=False)))) # self.add_handler(CallbackQueryHandler(self.wrap(partial(files, remote=False)), pattern=r'^f0$')) # list of specific node's files # self.add_handler(CallbackQueryHandler(self.wrap(files_list), pattern=r'^f0/.*')) # -------- # remote files # ----------------- # list of nodes # self.add_handler(MessageHandler(text_filter(self.lang.all('remote_files')), self.wrap(partial(files, remote=True)))) # self.add_handler(CallbackQueryHandler(self.wrap(partial(files, remote=True)), pattern=r'^g0$')) # list of specific node's files # self.add_handler(CallbackQueryHandler(self.wrap(files_list), pattern=r'^g0/.*')) def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]: buttons = [ [ctx.lang('record'), ctx.lang('settings')], # [ctx.lang('files'), ctx.lang('remote_files')], ] if 'guard_server' in config['bot']: buttons.append([ ctx.lang('guard_enable'), ctx.lang('guard_disable'), ctx.lang('guard_status') ]) buttons.append([ctx.lang('sound_sensors')]) return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) if __name__ == '__main__': config.load('sound_bot') nodes = {} for nodename, nodecfg in config['nodes'].items(): nodes[nodename] = parse_addr(nodecfg['addr']) record_client = RecordClient(nodes, error_handler=record_onerror, finished_handler=record_onfinished, download_on_finish=True) bot = SoundBot() if 'api' in config: bot.enable_logging(BotType.SOUND) bot.run() record_client.stop()