#!/usr/bin/env python3 import logging import os import tempfile from enum import Enum from datetime import datetime, timedelta from html import escape from typing import Optional, List, Dict, Tuple from home.config import config from home.api import WebAPIClient from home.api.types import SoundSensorLocation, BotType from home.api.errors import ApiResponseError from home.media import SoundNodeClient, SoundRecordClient, SoundRecordFile, CameraNodeClient from home.soundsensor import SoundSensorServerGuardClient from home.util import parse_addr, chunks, filesize_fmt from home.telegram import bot from telegram.error import TelegramError from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton, User from PIL import Image config.load_app('sound_bot') nodes = {} for nodename, nodecfg in config['nodes'].items(): nodes[nodename] = parse_addr(nodecfg['addr']) bot.initialize() bot.lang.ru( start_message="Выберите команду на клавиатуре", unknown_command="Неизвестная команда", unexpected_callback_data="Ошибка: неверные данные", settings="Настройки микшера", record="Запись", loading="Загрузка...", select_place="Выберите место:", invalid_location="Неверное место", invalid_interval="Неверная длительность", unsupported_action="Неподдерживаемое действие", # select_control="Выберите контрол для изменения настроек:", control_state="Состояние контрола %s", incr="громкость +", decr="громкость -", back="◀️ Назад", n_min="%d мин.", n_sec="%d сек.", select_interval="Выберите длительность:", place="Место", beginning="Начало", end="Конец", record_result="Результат записи", record_started='Запись запущена!', record_error="Ошибка записи", files="Локальные файлы", remote_files="Файлы на сервере", file_line="— Запись с %s до %s (%s)", access_denied="Доступ запрещён", guard_disable="Снять с охраны", guard_enable="Поставить на охрану", guard_status="Статус охраны", guard_user_action_notification='Пользователь %s %s.', guard_user_action_enable="включил охрану ✅", guard_user_action_disable="выключил охрану ❌", guard_status_enabled="Включена ✅", guard_status_disabled="Выключена ❌", done="Готово 👌", sound_sensors="Датчики звука", sound_sensors_info="Здесь можно получить информацию о последних срабатываниях датчиков звука.", sound_sensors_no_24h_data="За последние 24 часа данных нет.", sound_sensors_show_anything="Показать, что есть", cameras="Камеры", select_option="Выберите опцию", w_flash="Со вспышкой", wo_flash="Без вспышки", ) bot.lang.en( start_message="Select command on the keyboard", unknown_command="Unknown command", settings="Mixer settings", record="Record", unexpected_callback_data="Unexpected callback data", loading="Loading...", select_place="Select place:", invalid_location="Invalid place", invalid_interval="Invalid duration", unsupported_action="Unsupported action", # select_control="Select control to adjust its parameters:", control_state="%s control state", incr="vol +", decr="vol -", back="◀️ Back", n_min="%d min.", n_sec="%d s.", select_interval="Select duration:", place="Place", beginning="Started", end="Ended", record_result="Result", record_started='Recording started!', record_error="Recording error", files="Local files", remote_files="Remote files", file_line="— From %s to %s (%s)", access_denied="Access denied", guard_disable="Disable guard", guard_enable="Enable guard", guard_status="Guard status", guard_user_action_notification='User %s %s.', guard_user_action_enable="turned the guard ON ✅", guard_user_action_disable="turn the guard OFF ❌", guard_status_enabled="Active ✅", guard_status_disabled="Disabled ❌", done="Done 👌", sound_sensors="Sound sensors", sound_sensors_info="Here you can get information about last sound sensors hits.", sound_sensors_no_24h_data="No data for the last 24 hours.", sound_sensors_show_anything="Show me at least something", cameras="Cameras", select_option="Select option", w_flash="With flash", wo_flash="Without flash", ) logger = logging.getLogger(__name__) RenderedContent = Tuple[str, Optional[InlineKeyboardMarkup]] record_client: Optional[SoundRecordClient] = None node_client_links: Dict[str, SoundNodeClient] = {} cam_client_links: Dict[str, CameraNodeClient] = {} def node_client(node: str) -> SoundNodeClient: if node not in node_client_links: node_client_links[node] = SoundNodeClient(parse_addr(config['nodes'][node]['addr'])) return node_client_links[node] def camera_client(cam: str) -> CameraNodeClient: if cam not in node_client_links: cam_client_links[cam] = CameraNodeClient(parse_addr(config['cameras'][cam]['addr'])) return cam_client_links[cam] def node_exists(node: str) -> bool: return node in config['nodes'] def camera_exists(name: str) -> bool: return name in config['cameras'] def camera_settings(name: str) -> Optional[dict]: try: return config['cameras'][name]['settings'] except KeyError: return None def have_cameras() -> bool: return 'cameras' in config and config['cameras'] def sound_sensor_exists(node: str) -> bool: return node in config['sound_sensors'] def interval_defined(interval: int) -> bool: return interval in config['bot']['record_intervals'] def callback_unpack(ctx: bot.Context) -> List[str]: return ctx.callback_query.data[3:].split('/') def manual_recording_allowed(user_id: int) -> bool: return 'manual_record_allowlist' not in config['bot'] or user_id in config['bot']['manual_record_allowlist'] def guard_client() -> SoundSensorServerGuardClient: return SoundSensorServerGuardClient(parse_addr(config['bot']['guard_server'])) # message renderers # ----------------- class Renderer: @classmethod def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup: buttons = [] for node, nodeconfig in config['nodes'].items(): buttons.append([InlineKeyboardButton(nodeconfig['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{node}')]) return InlineKeyboardMarkup(buttons) @classmethod def back_button(cls, ctx: bot.Context, buttons: list, callback_data: str): buttons.append([ InlineKeyboardButton(ctx.lang('back'), callback_data=callback_data) ]) class SettingsRenderer(Renderer): @classmethod def index(cls, ctx: bot.Context) -> RenderedContent: html = f'{ctx.lang("settings")}\n\n' html += ctx.lang('select_place') return html, cls.places_markup(ctx, callback_prefix='s0') @classmethod def node(cls, ctx: bot.Context, controls: List[dict]) -> RenderedContent: node, = callback_unpack(ctx) html = [] buttons = [] for control in controls: html.append(f'{control["name"]}\n{escape(control["info"])}') buttons.append([ InlineKeyboardButton(control['name'], callback_data=f's1/{node}/{control["name"]}') ]) html = "\n\n".join(html) cls.back_button(ctx, buttons, callback_data='s0') return html, InlineKeyboardMarkup(buttons) @classmethod def control(cls, ctx: bot.Context, data) -> RenderedContent: node, control, *rest = callback_unpack(ctx) html = '' + ctx.lang('control_state', control) + '\n\n' html += escape(data['info']) buttons = [] callback_prefix = f's2/{node}/{control}' for cap in data['caps']: if cap == 'mute': muted = 'dB] [off]' in data['info'] act = 'unmute' if muted else 'mute' buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')]) elif cap == 'cap': cap_dis = 'Capture [off]' in data['info'] act = 'cap' if cap_dis else 'nocap' buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')]) elif cap == 'volume': buttons.append( list(map(lambda s: InlineKeyboardButton(ctx.lang(s), callback_data=f'{callback_prefix}/{s}'), ['decr', 'incr'])) ) cls.back_button(ctx, buttons, callback_data=f's0/{node}') return html, InlineKeyboardMarkup(buttons) class RecordRenderer(Renderer): @classmethod def index(cls, ctx: bot.Context) -> RenderedContent: html = f'{ctx.lang("record")}\n\n' html += ctx.lang('select_place') return html, cls.places_markup(ctx, callback_prefix='r0') @classmethod def node(cls, ctx: bot.Context, durations: List[int]) -> RenderedContent: node, = callback_unpack(ctx) html = ctx.lang('select_interval') buttons = [] for s in durations: if s >= 60: m = int(s / 60) label = ctx.lang('n_min', m) else: label = ctx.lang('n_sec', s) buttons.append(InlineKeyboardButton(label, callback_data=f'r1/{node}/{s}')) buttons = list(chunks(buttons, 3)) cls.back_button(ctx, buttons, callback_data=f'r0') return html, InlineKeyboardMarkup(buttons) @classmethod def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent: node, *rest = callback_unpack(ctx) place = config['nodes'][node]['label'][ctx.user_lang] html = f'{ctx.lang("record_started")} ({place}, id={rid})' return html, None @classmethod def record_done(cls, info: dict, node: str, uid: int) -> str: ulang = bot.db.get_user_lang(uid) def lang(key, *args): return bot.lang.get(key, ulang, *args) rid = info['id'] fmt = '%d.%m.%y %H:%M:%S' start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt) stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt) place = config['nodes'][node]['label'][ulang] html = f'{lang("record_result")} ({place}, id={rid})\n\n' html += f'{lang("beginning")}: {start_time}\n' html += f'{lang("end")}: {stop_time}' return html @classmethod def record_error(cls, info: dict, node: str, uid: int) -> str: ulang = bot.db.get_user_lang(uid) def lang(key, *args): return bot.lang.get(key, ulang, *args) place = config['nodes'][node]['label'][ulang] rid = info['id'] html = f'{lang("record_error")} ({place}, id={rid})' if 'error' in info: html += '\n'+str(info['error']) return html class FilesRenderer(Renderer): @classmethod def index(cls, ctx: bot.Context) -> RenderedContent: html = f'{ctx.lang("files")}\n\n' html += ctx.lang('select_place') return html, cls.places_markup(ctx, callback_prefix='f0') @classmethod def filelist(cls, ctx: bot.Context, files: List[SoundRecordFile]) -> RenderedContent: node, = callback_unpack(ctx) html_files = map(lambda file: cls.file(ctx, file, node), files) html = '\n\n'.join(html_files) buttons = [] cls.back_button(ctx, buttons, callback_data='f0') return html, InlineKeyboardMarkup(buttons) @classmethod def file(cls, ctx: bot.Context, file: SoundRecordFile, node: str) -> str: html = ctx.lang('file_line', file.start_humantime, file.stop_humantime, filesize_fmt(file.filesize)) if file.file_id is not None: html += f'/audio_{node}_{file.file_id}' return html class RemoteFilesRenderer(FilesRenderer): @classmethod def index(cls, ctx: bot.Context) -> RenderedContent: html = f'{ctx.lang("remote_files")}\n\n' html += ctx.lang('select_place') return html, cls.places_markup(ctx, callback_prefix='g0') class SoundSensorRenderer(Renderer): @classmethod def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup: buttons = [] for sensor, sensor_label in config['sound_sensors'].items(): buttons.append( [InlineKeyboardButton(sensor_label[ctx.user_lang], callback_data=f'{callback_prefix}/{sensor}')]) return InlineKeyboardMarkup(buttons) @classmethod def index(cls, ctx: bot.Context) -> RenderedContent: html = f'{ctx.lang("sound_sensors_info")}\n\n' html += ctx.lang('select_place') return html, cls.places_markup(ctx, callback_prefix='S0') @classmethod def hits(cls, ctx: bot.Context, data, is_last=False) -> RenderedContent: node, = callback_unpack(ctx) buttons = [] if not data: html = ctx.lang('sound_sensors_no_24h_data') if not is_last: buttons.append([InlineKeyboardButton(ctx.lang('sound_sensors_show_anything'), callback_data=f'S1/{node}')]) else: html = '' prev_date = None for item in data: item_date = item['time'].strftime('%d.%m.%y') if prev_date is None or prev_date != item_date: if html != '': html += '\n\n' html += f'{item_date}' prev_date = item_date html += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})' cls.back_button(ctx, buttons, callback_data='S0') return html, InlineKeyboardMarkup(buttons) @classmethod def hits_plain(cls, ctx: bot.Context, data, is_last=False) -> bytes: node, = callback_unpack(ctx) text = '' prev_date = None for item in data: item_date = item['time'].strftime('%d.%m.%y') if prev_date is None or prev_date != item_date: if text != '': text += '\n\n' text += item_date prev_date = item_date text += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})' return text.encode() class CamerasRenderer(Renderer): @classmethod def index(cls, ctx: bot.Context) -> RenderedContent: html = f'{ctx.lang("cameras")}\n\n' html += ctx.lang('select_place') return html, cls.places_markup(ctx, callback_prefix='c0') @classmethod def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup: buttons = [] for camera_name, camera_data in config['cameras'].items(): buttons.append( [InlineKeyboardButton(camera_data['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{camera_name}')]) return InlineKeyboardMarkup(buttons) @classmethod def camera(cls, ctx: bot.Context, flash_available: bool) -> RenderedContent: node, = callback_unpack(ctx) html = ctx.lang('select_option') buttons = [] if flash_available: buttons.append(InlineKeyboardButton(ctx.lang('w_flash'), callback_data=f'c1/{node}/1')) buttons.append(InlineKeyboardButton(ctx.lang('wo_flash'), callback_data=f'c1/{node}/0')) cls.back_button(ctx, [buttons], callback_data=f'c0') return html, InlineKeyboardMarkup([buttons]) # # @classmethod # def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent: # node, *rest = callback_unpack(ctx) # # place = config['nodes'][node]['label'][ctx.user_lang] # # html = f'{ctx.lang("record_started")} ({place}, id={rid})' # return html, None # # @classmethod # def record_done(cls, info: dict, node: str, uid: int) -> str: # ulang = bot.db.get_user_lang(uid) # # def lang(key, *args): # return bot.lang.get(key, ulang, *args) # # rid = info['id'] # fmt = '%d.%m.%y %H:%M:%S' # start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt) # stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt) # # place = config['nodes'][node]['label'][ulang] # # html = f'{lang("record_result")} ({place}, id={rid})\n\n' # html += f'{lang("beginning")}: {start_time}\n' # html += f'{lang("end")}: {stop_time}' # # return html # # @classmethod # def record_error(cls, info: dict, node: str, uid: int) -> str: # ulang = bot.db.get_user_lang(uid) # # def lang(key, *args): # return bot.lang.get(key, ulang, *args) # # place = config['nodes'][node]['label'][ulang] # rid = info['id'] # # html = f'{lang("record_error")} ({place}, id={rid})' # if 'error' in info: # html += '\n'+str(info['error']) # # return html # cameras handlers # ---------------- @bot.handler(message='cameras', callback=r'^c0$') def cameras(ctx: bot.Context): """ List of cameras """ text, markup = CamerasRenderer.index(ctx) if not ctx.is_callback_context(): return ctx.reply(text, markup=markup) else: ctx.answer() return ctx.edit(text, markup=markup) @bot.callbackhandler(callback=r'^c0/.*') def camera_options(ctx: bot.Context) -> None: """ List of options (with/without flash etc) """ cam, = callback_unpack(ctx) if not camera_exists(cam): ctx.answer(ctx.lang('invalid_location')) return ctx.answer() flash_available = 'flash_available' in config['cameras'][cam] and config['cameras'][cam]['flash_available'] is True text, markup = CamerasRenderer.camera(ctx, flash_available) ctx.edit(text, markup) @bot.callbackhandler(callback=r'^c1/.*') def camera_capture(ctx: bot.Context) -> None: """ Cheese """ cam, flash = callback_unpack(ctx) flash = int(flash) if not camera_exists(cam): ctx.answer(ctx.lang('invalid_location')) return ctx.answer() client = camera_client(cam) fd = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') fd.close() client.capture(fd.name, with_flash=bool(flash)) logger.debug(f'captured photo ({cam}), saved to {fd.name}') camera_config = config['cameras'][cam] if 'rotate' in camera_config: im = Image.open(fd.name) im.rotate(camera_config['rotate'], expand=True) # im.show() im.save(fd.name) logger.debug(f"rotated image {camera_config['rotate']} degrees") try: with open(fd.name, 'rb') as f: bot.send_photo(ctx.user_id, photo=f) except TelegramError as exc: logger.exception(exc) try: os.unlink(fd.name) except OSError as exc: logger.exception(exc) # settings handlers # ----------------- @bot.handler(message='settings', callback=r'^s0$') def settings(ctx: bot.Context): """ List of nodes """ text, markup = SettingsRenderer.index(ctx) if not ctx.is_callback_context(): return ctx.reply(text, markup=markup) else: ctx.answer() return ctx.edit(text, markup=markup) @bot.callbackhandler(callback=r'^s0/.*') def settings_place(ctx: bot.Context): """ List of controls """ node, = callback_unpack(ctx) if not node_exists(node): ctx.answer(ctx.lang('invalid_location')) return cl = node_client(node) controls = cl.amixer_get_all() ctx.answer() text, markup = SettingsRenderer.node(ctx, controls) ctx.edit(text, markup) @bot.callbackhandler(callback=r'^s1/.*') def settings_place_control(ctx: bot.Context): """ List of available tunes for control """ node, control = callback_unpack(ctx) if not node_exists(node): ctx.answer(ctx.lang('invalid_location')) return cl = node_client(node) control_data = cl.amixer_get(control) ctx.answer() text, markup = SettingsRenderer.control(ctx, control_data) ctx.edit(text, markup) @bot.callbackhandler(callback=r'^s2/.*') def settings_place_control_action(ctx: bot.Context): """ Tuning """ node, control, action = callback_unpack(ctx) if not node_exists(node): return cl = node_client(node) if not hasattr(cl, f'amixer_{action}'): ctx.answer(ctx.lang('invalid_action')) return func = getattr(cl, f'amixer_{action}') control_data = func(control) ctx.answer() text, markup = SettingsRenderer.control(ctx, control_data) ctx.edit(text, markup) # recording handlers # ------------------ @bot.handler(message='record', callback=r'^r0$') def record(ctx: bot.Context): """ List of nodes """ if not manual_recording_allowed(ctx.user_id): return ctx.reply(ctx.lang('access_denied')) text, markup = RecordRenderer.index(ctx) if not ctx.is_callback_context(): return ctx.reply(text, markup=markup) else: ctx.answer() return ctx.edit(text, markup=markup) @bot.callbackhandler(callback=r'^r0/.*') def record_place(ctx: bot.Context): """ List of available intervals """ node, = callback_unpack(ctx) if not node_exists(node): ctx.answer(ctx.lang('invalid_location')) return ctx.answer() text, markup = RecordRenderer.node(ctx, config['bot']['record_intervals']) ctx.edit(text, markup) @bot.callbackhandler(callback=r'^r1/.*') def record_place_interval(ctx: bot.Context): """ Do record! """ node, interval = callback_unpack(ctx) interval = int(interval) if not node_exists(node): ctx.answer(ctx.lang('invalid_location')) return if not interval_defined(interval): ctx.answer(ctx.lang('invalid_interval')) return try: record_id = record_client.record(node, interval, {'user_id': ctx.user_id, 'node': node}) except ApiResponseError as e: ctx.answer(e.error_message) logger.error(e) return ctx.answer() html, markup = RecordRenderer.record_started(ctx, record_id) ctx.edit(html, markup) # sound sensor handlers # --------------------- @bot.handler(message='sound_sensors', callback=r'^S0$') def sound_sensors(ctx: bot.Context): """ List of places """ text, markup = SoundSensorRenderer.index(ctx) if not ctx.is_callback_context(): return ctx.reply(text, markup=markup) else: ctx.answer() return ctx.edit(text, markup=markup) @bot.callbackhandler(callback=r'^S0/.*') def sound_sensors_last_24h(ctx: bot.Context): """ Last 24h log """ node, = callback_unpack(ctx) if not sound_sensor_exists(node): ctx.answer(ctx.lang('invalid location')) return ctx.answer() cl = WebAPIClient() data = cl.get_sound_sensor_hits(location=SoundSensorLocation[node.upper()], after=datetime.now() - timedelta(hours=24)) text, markup = SoundSensorRenderer.hits(ctx, data) if len(text) > 4096: plain = SoundSensorRenderer.hits_plain(ctx, data) bot.send_file(ctx.user_id, document=plain, filename='data.txt') else: ctx.edit(text, markup=markup) @bot.callbackhandler(callback=r'^S1/.*') def sound_sensors_last_anything(ctx: bot.Context): """ Last _something_ """ node, = callback_unpack(ctx) if not sound_sensor_exists(node): ctx.answer(ctx.lang('invalid location')) return ctx.answer() cl = WebAPIClient() data = cl.get_last_sound_sensor_hits(location=SoundSensorLocation[node.upper()], last=20) text, markup = SoundSensorRenderer.hits(ctx, data, is_last=True) if len(text) > 4096: plain = SoundSensorRenderer.hits_plain(ctx, data) bot.send_file(ctx.user_id, document=plain, filename='data.txt') else: ctx.edit(text, markup=markup) # guard enable/disable handlers # ----------------------------- class GuardUserAction(Enum): ENABLE = 'enable' DISABLE = 'disable' if 'guard_server' in config['bot']: @bot.handler(message='guard_status') def guard_status(ctx: bot.Context): guard = guard_client() resp = guard.guard_status() key = 'enabled' if resp['enabled'] is True else 'disabled' ctx.reply(ctx.lang(f'guard_status_{key}')) @bot.handler(message='guard_enable') def guard_enable(ctx: bot.Context): guard = guard_client() guard.guard_enable() ctx.reply(ctx.lang('done')) _guard_notify(ctx.user, GuardUserAction.ENABLE) @bot.handler(message='guard_disable') def guard_disable(ctx: bot.Context): guard = guard_client() guard.guard_disable() ctx.reply(ctx.lang('done')) _guard_notify(ctx.user, GuardUserAction.DISABLE) def _guard_notify(user: User, action: GuardUserAction): def text_getter(lang: str): action_name = bot.lang.get(f'guard_user_action_{action.value}', lang) user_name = bot.user_any_name(user) return 'ℹ ' + bot.lang.get('guard_user_action_notification', lang, user.id, user_name, action_name) bot.notify_all(text_getter, exclude=(user.id,)) @bot.defaultreplymarkup def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: buttons = [ [ctx.lang('record'), ctx.lang('settings')], # [ctx.lang('files'), ctx.lang('remote_files')], ] if 'guard_server' in config['bot']: buttons.append([ ctx.lang('guard_enable'), ctx.lang('guard_disable'), ctx.lang('guard_status') ]) buttons.append([ctx.lang('sound_sensors')]) if have_cameras(): buttons.append([ctx.lang('cameras')]) return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) # record client callbacks # ----------------------- def record_onerror(info: dict, userdata: dict): uid = userdata['user_id'] node = userdata['node'] html = RecordRenderer.record_error(info, node, uid) try: bot.notify_user(userdata['user_id'], html) except TelegramError as exc: logger.exception(exc) finally: record_client.forget(node, info['id']) def record_onfinished(info: dict, fn: str, userdata: dict): logger.info('record finished: ' + str(info)) uid = userdata['user_id'] node = userdata['node'] html = RecordRenderer.record_done(info, node, uid) bot.notify_user(uid, html) try: # sending audiofile to telegram with open(fn, 'rb') as f: bot.send_audio(uid, audio=f, filename='audio.mp3') # deleting temp file try: os.unlink(fn) except OSError as exc: logger.exception(exc) bot.notify_user(uid, exc) # remove the recording from sound_node's history record_client.forget(node, info['id']) # remove file from storage # node_client(node).storage_delete(info['file']['fileid']) except Exception as e: logger.exception(e) if __name__ == '__main__': record_client = SoundRecordClient(nodes, error_handler=record_onerror, finished_handler=record_onfinished, download_on_finish=True) if 'api' in config: bot.enable_logging(BotType.SOUND) bot.run() record_client.stop()