summaryrefslogtreecommitdiff
path: root/src/sound_bot.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/sound_bot.py')
-rwxr-xr-xsrc/sound_bot.py889
1 files changed, 0 insertions, 889 deletions
diff --git a/src/sound_bot.py b/src/sound_bot.py
deleted file mode 100755
index 186337a..0000000
--- a/src/sound_bot.py
+++ /dev/null
@@ -1,889 +0,0 @@
-#!/usr/bin/env python3
-import logging
-import os
-import tempfile
-
-from enum import Enum
-from datetime import datetime, timedelta
-from html import escape
-from typing import Optional, List, Dict, Tuple
-
-from home.config import config
-from home.api import WebAPIClient
-from home.api.types import SoundSensorLocation, BotType
-from home.api.errors import ApiResponseError
-from home.media import SoundNodeClient, SoundRecordClient, SoundRecordFile, CameraNodeClient
-from home.soundsensor import SoundSensorServerGuardClient
-from home.util import parse_addr, chunks, filesize_fmt
-
-from home.telegram import bot
-
-from telegram.error import TelegramError
-from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton, User
-
-from PIL import Image
-
-config.load('sound_bot')
-
-nodes = {}
-for nodename, nodecfg in config['nodes'].items():
- nodes[nodename] = parse_addr(nodecfg['addr'])
-
-bot.initialize()
-bot.lang.ru(
- start_message="Выберите команду на клавиатуре",
- unknown_command="Неизвестная команда",
- unexpected_callback_data="Ошибка: неверные данные",
- settings="Настройки микшера",
- record="Запись",
- loading="Загрузка...",
- select_place="Выберите место:",
- invalid_location="Неверное место",
- invalid_interval="Неверная длительность",
- unsupported_action="Неподдерживаемое действие",
- # select_control="Выберите контрол для изменения настроек:",
- control_state="Состояние контрола %s",
- incr="громкость +",
- decr="громкость -",
- back="◀️ Назад",
- n_min="%d мин.",
- n_sec="%d сек.",
- select_interval="Выберите длительность:",
- place="Место",
- beginning="Начало",
- end="Конец",
- record_result="Результат записи",
- record_started='Запись запущена!',
- record_error="Ошибка записи",
- files="Локальные файлы",
- remote_files="Файлы на сервере",
- file_line="— Запись с <b>%s</b> до <b>%s</b> <i>(%s)</i>",
- access_denied="Доступ запрещён",
-
- guard_disable="Снять с охраны",
- guard_enable="Поставить на охрану",
- guard_status="Статус охраны",
- guard_user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> %s.',
- guard_user_action_enable="включил охрану ✅",
- guard_user_action_disable="выключил охрану ❌",
- guard_status_enabled="Включена ✅",
- guard_status_disabled="Выключена ❌",
-
- done="Готово 👌",
-
- sound_sensors="Датчики звука",
- sound_sensors_info="Здесь можно получить информацию о последних срабатываниях датчиков звука.",
- sound_sensors_no_24h_data="За последние 24 часа данных нет.",
- sound_sensors_show_anything="Показать, что есть",
-
- cameras="Камеры",
- select_option="Выберите опцию",
- w_flash="Со вспышкой",
- wo_flash="Без вспышки",
-)
-
-bot.lang.en(
- start_message="Select command on the keyboard",
- unknown_command="Unknown command",
- settings="Mixer settings",
- record="Record",
- unexpected_callback_data="Unexpected callback data",
- loading="Loading...",
- select_place="Select place:",
- invalid_location="Invalid place",
- invalid_interval="Invalid duration",
- unsupported_action="Unsupported action",
- # select_control="Select control to adjust its parameters:",
- control_state="%s control state",
- incr="vol +",
- decr="vol -",
- back="◀️ Back",
- n_min="%d min.",
- n_sec="%d s.",
- select_interval="Select duration:",
- place="Place",
- beginning="Started",
- end="Ended",
- record_result="Result",
- record_started='Recording started!',
- record_error="Recording error",
- files="Local files",
- remote_files="Remote files",
- file_line="— From <b>%s</b> to <b>%s</b> <i>(%s)</i>",
- access_denied="Access denied",
-
- guard_disable="Disable guard",
- guard_enable="Enable guard",
- guard_status="Guard status",
- guard_user_action_notification='User <a href="tg://user?id=%d">%s</a> %s.',
- guard_user_action_enable="turned the guard ON ✅",
- guard_user_action_disable="turn the guard OFF ❌",
- guard_status_enabled="Active ✅",
- guard_status_disabled="Disabled ❌",
- done="Done 👌",
-
- sound_sensors="Sound sensors",
- sound_sensors_info="Here you can get information about last sound sensors hits.",
- sound_sensors_no_24h_data="No data for the last 24 hours.",
- sound_sensors_show_anything="Show me at least something",
-
- cameras="Cameras",
- select_option="Select option",
- w_flash="With flash",
- wo_flash="Without flash",
-)
-
-logger = logging.getLogger(__name__)
-RenderedContent = Tuple[str, Optional[InlineKeyboardMarkup]]
-record_client: Optional[SoundRecordClient] = None
-node_client_links: Dict[str, SoundNodeClient] = {}
-cam_client_links: Dict[str, CameraNodeClient] = {}
-
-
-def node_client(node: str) -> SoundNodeClient:
- if node not in node_client_links:
- node_client_links[node] = SoundNodeClient(parse_addr(config['nodes'][node]['addr']))
- return node_client_links[node]
-
-
-def camera_client(cam: str) -> CameraNodeClient:
- if cam not in node_client_links:
- cam_client_links[cam] = CameraNodeClient(parse_addr(config['cameras'][cam]['addr']))
- return cam_client_links[cam]
-
-
-def node_exists(node: str) -> bool:
- return node in config['nodes']
-
-
-def camera_exists(name: str) -> bool:
- return name in config['cameras']
-
-
-def camera_settings(name: str) -> Optional[dict]:
- try:
- return config['cameras'][name]['settings']
- except KeyError:
- return None
-
-
-def have_cameras() -> bool:
- return 'cameras' in config and config['cameras']
-
-
-def sound_sensor_exists(node: str) -> bool:
- return node in config['sound_sensors']
-
-
-def interval_defined(interval: int) -> bool:
- return interval in config['bot']['record_intervals']
-
-
-def callback_unpack(ctx: bot.Context) -> List[str]:
- return ctx.callback_query.data[3:].split('/')
-
-
-def manual_recording_allowed(user_id: int) -> bool:
- return 'manual_record_allowlist' not in config['bot'] or user_id in config['bot']['manual_record_allowlist']
-
-
-def guard_client() -> SoundSensorServerGuardClient:
- return SoundSensorServerGuardClient(parse_addr(config['bot']['guard_server']))
-
-
-# message renderers
-# -----------------
-
-class Renderer:
- @classmethod
- def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup:
- buttons = []
- for node, nodeconfig in config['nodes'].items():
- buttons.append([InlineKeyboardButton(nodeconfig['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{node}')])
- return InlineKeyboardMarkup(buttons)
-
- @classmethod
- def back_button(cls,
- ctx: bot.Context,
- buttons: list,
- callback_data: str):
- buttons.append([
- InlineKeyboardButton(ctx.lang('back'), callback_data=callback_data)
- ])
-
-
-class SettingsRenderer(Renderer):
- @classmethod
- def index(cls, ctx: bot.Context) -> RenderedContent:
- html = f'<b>{ctx.lang("settings")}</b>\n\n'
- html += ctx.lang('select_place')
- return html, cls.places_markup(ctx, callback_prefix='s0')
-
- @classmethod
- def node(cls, ctx: bot.Context,
- controls: List[dict]) -> RenderedContent:
- node, = callback_unpack(ctx)
-
- html = []
- buttons = []
- for control in controls:
- html.append(f'<b>{control["name"]}</b>\n{escape(control["info"])}')
- buttons.append([
- InlineKeyboardButton(control['name'], callback_data=f's1/{node}/{control["name"]}')
- ])
-
- html = "\n\n".join(html)
- cls.back_button(ctx, buttons, callback_data='s0')
-
- return html, InlineKeyboardMarkup(buttons)
-
- @classmethod
- def control(cls, ctx: bot.Context, data) -> RenderedContent:
- node, control, *rest = callback_unpack(ctx)
-
- html = '<b>' + ctx.lang('control_state', control) + '</b>\n\n'
- html += escape(data['info'])
- buttons = []
- callback_prefix = f's2/{node}/{control}'
- for cap in data['caps']:
- if cap == 'mute':
- muted = 'dB] [off]' in data['info']
- act = 'unmute' if muted else 'mute'
- buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')])
-
- elif cap == 'cap':
- cap_dis = 'Capture [off]' in data['info']
- act = 'cap' if cap_dis else 'nocap'
- buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')])
-
- elif cap == 'volume':
- buttons.append(
- list(map(lambda s: InlineKeyboardButton(ctx.lang(s), callback_data=f'{callback_prefix}/{s}'),
- ['decr', 'incr']))
- )
-
- cls.back_button(ctx, buttons, callback_data=f's0/{node}')
-
- return html, InlineKeyboardMarkup(buttons)
-
-
-class RecordRenderer(Renderer):
- @classmethod
- def index(cls, ctx: bot.Context) -> RenderedContent:
- html = f'<b>{ctx.lang("record")}</b>\n\n'
- html += ctx.lang('select_place')
- return html, cls.places_markup(ctx, callback_prefix='r0')
-
- @classmethod
- def node(cls, ctx: bot.Context, durations: List[int]) -> RenderedContent:
- node, = callback_unpack(ctx)
-
- html = ctx.lang('select_interval')
-
- buttons = []
- for s in durations:
- if s >= 60:
- m = int(s / 60)
- label = ctx.lang('n_min', m)
- else:
- label = ctx.lang('n_sec', s)
- buttons.append(InlineKeyboardButton(label, callback_data=f'r1/{node}/{s}'))
- buttons = list(chunks(buttons, 3))
- cls.back_button(ctx, buttons, callback_data=f'r0')
-
- return html, InlineKeyboardMarkup(buttons)
-
- @classmethod
- def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent:
- node, *rest = callback_unpack(ctx)
-
- place = config['nodes'][node]['label'][ctx.user_lang]
-
- html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})'
- return html, None
-
- @classmethod
- def record_done(cls, info: dict, node: str, uid: int) -> str:
- ulang = bot.db.get_user_lang(uid)
-
- def lang(key, *args):
- return bot.lang.get(key, ulang, *args)
-
- rid = info['id']
- fmt = '%d.%m.%y %H:%M:%S'
- start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt)
- stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt)
-
- place = config['nodes'][node]['label'][ulang]
-
- html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n'
- html += f'<b>{lang("beginning")}</b>: {start_time}\n'
- html += f'<b>{lang("end")}</b>: {stop_time}'
-
- return html
-
- @classmethod
- def record_error(cls, info: dict, node: str, uid: int) -> str:
- ulang = bot.db.get_user_lang(uid)
-
- def lang(key, *args):
- return bot.lang.get(key, ulang, *args)
-
- place = config['nodes'][node]['label'][ulang]
- rid = info['id']
-
- html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})'
- if 'error' in info:
- html += '\n'+str(info['error'])
-
- return html
-
-
-class FilesRenderer(Renderer):
- @classmethod
- def index(cls, ctx: bot.Context) -> RenderedContent:
- html = f'<b>{ctx.lang("files")}</b>\n\n'
- html += ctx.lang('select_place')
- return html, cls.places_markup(ctx, callback_prefix='f0')
-
- @classmethod
- def filelist(cls, ctx: bot.Context, files: List[SoundRecordFile]) -> RenderedContent:
- node, = callback_unpack(ctx)
-
- html_files = map(lambda file: cls.file(ctx, file, node), files)
- html = '\n\n'.join(html_files)
-
- buttons = []
- cls.back_button(ctx, buttons, callback_data='f0')
-
- return html, InlineKeyboardMarkup(buttons)
-
- @classmethod
- def file(cls, ctx: bot.Context, file: SoundRecordFile, node: str) -> str:
- html = ctx.lang('file_line', file.start_humantime, file.stop_humantime, filesize_fmt(file.filesize))
- if file.file_id is not None:
- html += f'/audio_{node}_{file.file_id}'
- return html
-
-
-class RemoteFilesRenderer(FilesRenderer):
- @classmethod
- def index(cls, ctx: bot.Context) -> RenderedContent:
- html = f'<b>{ctx.lang("remote_files")}</b>\n\n'
- html += ctx.lang('select_place')
- return html, cls.places_markup(ctx, callback_prefix='g0')
-
-
-class SoundSensorRenderer(Renderer):
- @classmethod
- def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup:
- buttons = []
- for sensor, sensor_label in config['sound_sensors'].items():
- buttons.append(
- [InlineKeyboardButton(sensor_label[ctx.user_lang], callback_data=f'{callback_prefix}/{sensor}')])
- return InlineKeyboardMarkup(buttons)
-
- @classmethod
- def index(cls, ctx: bot.Context) -> RenderedContent:
- html = f'{ctx.lang("sound_sensors_info")}\n\n'
- html += ctx.lang('select_place')
- return html, cls.places_markup(ctx, callback_prefix='S0')
-
- @classmethod
- def hits(cls, ctx: bot.Context, data, is_last=False) -> RenderedContent:
- node, = callback_unpack(ctx)
- buttons = []
-
- if not data:
- html = ctx.lang('sound_sensors_no_24h_data')
- if not is_last:
- buttons.append([InlineKeyboardButton(ctx.lang('sound_sensors_show_anything'), callback_data=f'S1/{node}')])
- else:
- html = ''
- prev_date = None
- for item in data:
- item_date = item['time'].strftime('%d.%m.%y')
- if prev_date is None or prev_date != item_date:
- if html != '':
- html += '\n\n'
- html += f'<b>{item_date}</b>'
- prev_date = item_date
- html += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})'
- cls.back_button(ctx, buttons, callback_data='S0')
- return html, InlineKeyboardMarkup(buttons)
-
- @classmethod
- def hits_plain(cls, ctx: bot.Context, data, is_last=False) -> bytes:
- node, = callback_unpack(ctx)
-
- text = ''
- prev_date = None
- for item in data:
- item_date = item['time'].strftime('%d.%m.%y')
- if prev_date is None or prev_date != item_date:
- if text != '':
- text += '\n\n'
- text += item_date
- prev_date = item_date
- text += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})'
-
- return text.encode()
-
-
-class CamerasRenderer(Renderer):
- @classmethod
- def index(cls, ctx: bot.Context) -> RenderedContent:
- html = f'<b>{ctx.lang("cameras")}</b>\n\n'
- html += ctx.lang('select_place')
- return html, cls.places_markup(ctx, callback_prefix='c0')
-
- @classmethod
- def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup:
- buttons = []
- for camera_name, camera_data in config['cameras'].items():
- buttons.append(
- [InlineKeyboardButton(camera_data['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{camera_name}')])
- return InlineKeyboardMarkup(buttons)
-
- @classmethod
- def camera(cls, ctx: bot.Context, flash_available: bool) -> RenderedContent:
- node, = callback_unpack(ctx)
-
- html = ctx.lang('select_option')
-
- buttons = []
- if flash_available:
- buttons.append(InlineKeyboardButton(ctx.lang('w_flash'), callback_data=f'c1/{node}/1'))
- buttons.append(InlineKeyboardButton(ctx.lang('wo_flash'), callback_data=f'c1/{node}/0'))
-
- cls.back_button(ctx, [buttons], callback_data=f'c0')
-
- return html, InlineKeyboardMarkup([buttons])
- #
- # @classmethod
- # def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent:
- # node, *rest = callback_unpack(ctx)
- #
- # place = config['nodes'][node]['label'][ctx.user_lang]
- #
- # html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})'
- # return html, None
- #
- # @classmethod
- # def record_done(cls, info: dict, node: str, uid: int) -> str:
- # ulang = bot.db.get_user_lang(uid)
- #
- # def lang(key, *args):
- # return bot.lang.get(key, ulang, *args)
- #
- # rid = info['id']
- # fmt = '%d.%m.%y %H:%M:%S'
- # start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt)
- # stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt)
- #
- # place = config['nodes'][node]['label'][ulang]
- #
- # html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n'
- # html += f'<b>{lang("beginning")}</b>: {start_time}\n'
- # html += f'<b>{lang("end")}</b>: {stop_time}'
- #
- # return html
- #
- # @classmethod
- # def record_error(cls, info: dict, node: str, uid: int) -> str:
- # ulang = bot.db.get_user_lang(uid)
- #
- # def lang(key, *args):
- # return bot.lang.get(key, ulang, *args)
- #
- # place = config['nodes'][node]['label'][ulang]
- # rid = info['id']
- #
- # html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})'
- # if 'error' in info:
- # html += '\n'+str(info['error'])
- #
- # return html
-
-
-# cameras handlers
-# ----------------
-
-@bot.handler(message='cameras', callback=r'^c0$')
-def cameras(ctx: bot.Context):
- """ List of cameras """
-
- text, markup = CamerasRenderer.index(ctx)
- if not ctx.is_callback_context():
- return ctx.reply(text, markup=markup)
- else:
- ctx.answer()
- return ctx.edit(text, markup=markup)
-
-
-@bot.callbackhandler(callback=r'^c0/.*')
-def camera_options(ctx: bot.Context) -> None:
- """ List of options (with/without flash etc) """
-
- cam, = callback_unpack(ctx)
- if not camera_exists(cam):
- ctx.answer(ctx.lang('invalid_location'))
- return
-
- ctx.answer()
- flash_available = 'flash_available' in config['cameras'][cam] and config['cameras'][cam]['flash_available'] is True
-
- text, markup = CamerasRenderer.camera(ctx, flash_available)
- ctx.edit(text, markup)
-
-
-@bot.callbackhandler(callback=r'^c1/.*')
-def camera_capture(ctx: bot.Context) -> None:
- """ Cheese """
-
- cam, flash = callback_unpack(ctx)
- flash = int(flash)
- if not camera_exists(cam):
- ctx.answer(ctx.lang('invalid_location'))
- return
-
- ctx.answer()
-
- client = camera_client(cam)
- fd = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg')
- fd.close()
-
- client.capture(fd.name, with_flash=bool(flash))
- logger.debug(f'captured photo ({cam}), saved to {fd.name}')
-
- camera_config = config['cameras'][cam]
- if 'rotate' in camera_config:
- im = Image.open(fd.name)
- im.rotate(camera_config['rotate'], expand=True)
- # im.show()
- im.save(fd.name)
- logger.debug(f"rotated image {camera_config['rotate']} degrees")
-
- try:
- with open(fd.name, 'rb') as f:
- bot.send_photo(ctx.user_id, photo=f)
- except TelegramError as exc:
- logger.exception(exc)
-
- try:
- os.unlink(fd.name)
- except OSError as exc:
- logger.exception(exc)
-
-
-# settings handlers
-# -----------------
-
-@bot.handler(message='settings', callback=r'^s0$')
-def settings(ctx: bot.Context):
- """ List of nodes """
-
- text, markup = SettingsRenderer.index(ctx)
- if not ctx.is_callback_context():
- return ctx.reply(text, markup=markup)
- else:
- ctx.answer()
- return ctx.edit(text, markup=markup)
-
-
-@bot.callbackhandler(callback=r'^s0/.*')
-def settings_place(ctx: bot.Context):
- """ List of controls """
-
- node, = callback_unpack(ctx)
- if not node_exists(node):
- ctx.answer(ctx.lang('invalid_location'))
- return
-
- cl = node_client(node)
- controls = cl.amixer_get_all()
-
- ctx.answer()
-
- text, markup = SettingsRenderer.node(ctx, controls)
- ctx.edit(text, markup)
-
-
-@bot.callbackhandler(callback=r'^s1/.*')
-def settings_place_control(ctx: bot.Context):
- """ List of available tunes for control """
-
- node, control = callback_unpack(ctx)
- if not node_exists(node):
- ctx.answer(ctx.lang('invalid_location'))
- return
-
- cl = node_client(node)
- control_data = cl.amixer_get(control)
-
- ctx.answer()
-
- text, markup = SettingsRenderer.control(ctx, control_data)
- ctx.edit(text, markup)
-
-
-@bot.callbackhandler(callback=r'^s2/.*')
-def settings_place_control_action(ctx: bot.Context):
- """ Tuning """
-
- node, control, action = callback_unpack(ctx)
- if not node_exists(node):
- return
-
- cl = node_client(node)
- if not hasattr(cl, f'amixer_{action}'):
- ctx.answer(ctx.lang('invalid_action'))
- return
-
- func = getattr(cl, f'amixer_{action}')
- control_data = func(control)
-
- ctx.answer()
-
- text, markup = SettingsRenderer.control(ctx, control_data)
- ctx.edit(text, markup)
-
-
-# recording handlers
-# ------------------
-
-@bot.handler(message='record', callback=r'^r0$')
-def record(ctx: bot.Context):
- """ List of nodes """
-
- if not manual_recording_allowed(ctx.user_id):
- return ctx.reply(ctx.lang('access_denied'))
-
- text, markup = RecordRenderer.index(ctx)
- if not ctx.is_callback_context():
- return ctx.reply(text, markup=markup)
- else:
- ctx.answer()
- return ctx.edit(text, markup=markup)
-
-
-@bot.callbackhandler(callback=r'^r0/.*')
-def record_place(ctx: bot.Context):
- """ List of available intervals """
-
- node, = callback_unpack(ctx)
- if not node_exists(node):
- ctx.answer(ctx.lang('invalid_location'))
- return
-
- ctx.answer()
-
- text, markup = RecordRenderer.node(ctx, config['bot']['record_intervals'])
- ctx.edit(text, markup)
-
-
-@bot.callbackhandler(callback=r'^r1/.*')
-def record_place_interval(ctx: bot.Context):
- """ Do record! """
-
- node, interval = callback_unpack(ctx)
- interval = int(interval)
- if not node_exists(node):
- ctx.answer(ctx.lang('invalid_location'))
- return
- if not interval_defined(interval):
- ctx.answer(ctx.lang('invalid_interval'))
- return
-
- try:
- record_id = record_client.record(node, interval, {'user_id': ctx.user_id, 'node': node})
- except ApiResponseError as e:
- ctx.answer(e.error_message)
- logger.error(e)
- return
-
- ctx.answer()
-
- html, markup = RecordRenderer.record_started(ctx, record_id)
- ctx.edit(html, markup)
-
-
-# sound sensor handlers
-# ---------------------
-
-@bot.handler(message='sound_sensors', callback=r'^S0$')
-def sound_sensors(ctx: bot.Context):
- """ List of places """
-
- text, markup = SoundSensorRenderer.index(ctx)
- if not ctx.is_callback_context():
- return ctx.reply(text, markup=markup)
- else:
- ctx.answer()
- return ctx.edit(text, markup=markup)
-
-
-@bot.callbackhandler(callback=r'^S0/.*')
-def sound_sensors_last_24h(ctx: bot.Context):
- """ Last 24h log """
-
- node, = callback_unpack(ctx)
- if not sound_sensor_exists(node):
- ctx.answer(ctx.lang('invalid location'))
- return
-
- ctx.answer()
-
- cl = WebAPIClient()
- data = cl.get_sound_sensor_hits(location=SoundSensorLocation[node.upper()],
- after=datetime.now() - timedelta(hours=24))
-
- text, markup = SoundSensorRenderer.hits(ctx, data)
- if len(text) > 4096:
- plain = SoundSensorRenderer.hits_plain(ctx, data)
- bot.send_file(ctx.user_id, document=plain, filename='data.txt')
- else:
- ctx.edit(text, markup=markup)
-
-
-@bot.callbackhandler(callback=r'^S1/.*')
-def sound_sensors_last_anything(ctx: bot.Context):
- """ Last _something_ """
-
- node, = callback_unpack(ctx)
- if not sound_sensor_exists(node):
- ctx.answer(ctx.lang('invalid location'))
- return
-
- ctx.answer()
-
- cl = WebAPIClient()
- data = cl.get_last_sound_sensor_hits(location=SoundSensorLocation[node.upper()],
- last=20)
-
- text, markup = SoundSensorRenderer.hits(ctx, data, is_last=True)
- if len(text) > 4096:
- plain = SoundSensorRenderer.hits_plain(ctx, data)
- bot.send_file(ctx.user_id, document=plain, filename='data.txt')
- else:
- ctx.edit(text, markup=markup)
-
-
-# guard enable/disable handlers
-# -----------------------------
-
-class GuardUserAction(Enum):
- ENABLE = 'enable'
- DISABLE = 'disable'
-
-
-if 'guard_server' in config['bot']:
- @bot.handler(message='guard_status')
- def guard_status(ctx: bot.Context):
- guard = guard_client()
- resp = guard.guard_status()
-
- key = 'enabled' if resp['enabled'] is True else 'disabled'
- ctx.reply(ctx.lang(f'guard_status_{key}'))
-
-
- @bot.handler(message='guard_enable')
- def guard_enable(ctx: bot.Context):
- guard = guard_client()
- guard.guard_enable()
- ctx.reply(ctx.lang('done'))
-
- _guard_notify(ctx.user, GuardUserAction.ENABLE)
-
-
- @bot.handler(message='guard_disable')
- def guard_disable(ctx: bot.Context):
- guard = guard_client()
- guard.guard_disable()
- ctx.reply(ctx.lang('done'))
-
- _guard_notify(ctx.user, GuardUserAction.DISABLE)
-
-
- def _guard_notify(user: User, action: GuardUserAction):
- def text_getter(lang: str):
- action_name = bot.lang.get(f'guard_user_action_{action.value}', lang)
- user_name = bot.user_any_name(user)
- return 'ℹ ' + bot.lang.get('guard_user_action_notification', lang,
- user.id, user_name, action_name)
-
- bot.notify_all(text_getter, exclude=(user.id,))
-
-
-@bot.defaultreplymarkup
-def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
- buttons = [
- [ctx.lang('record'), ctx.lang('settings')],
- # [ctx.lang('files'), ctx.lang('remote_files')],
- ]
- if 'guard_server' in config['bot']:
- buttons.append([
- ctx.lang('guard_enable'), ctx.lang('guard_disable'), ctx.lang('guard_status')
- ])
- buttons.append([ctx.lang('sound_sensors')])
- if have_cameras():
- buttons.append([ctx.lang('cameras')])
- return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
-
-
-# record client callbacks
-# -----------------------
-
-def record_onerror(info: dict, userdata: dict):
- uid = userdata['user_id']
- node = userdata['node']
-
- html = RecordRenderer.record_error(info, node, uid)
- try:
- bot.notify_user(userdata['user_id'], html)
- except TelegramError as exc:
- logger.exception(exc)
- finally:
- record_client.forget(node, info['id'])
-
-
-def record_onfinished(info: dict, fn: str, userdata: dict):
- logger.info('record finished: ' + str(info))
-
- uid = userdata['user_id']
- node = userdata['node']
-
- html = RecordRenderer.record_done(info, node, uid)
- bot.notify_user(uid, html)
-
- try:
- # sending audiofile to telegram
- with open(fn, 'rb') as f:
- bot.send_audio(uid, audio=f, filename='audio.mp3')
-
- # deleting temp file
- try:
- os.unlink(fn)
- except OSError as exc:
- logger.exception(exc)
- bot.notify_user(uid, exc)
-
- # remove the recording from sound_node's history
- record_client.forget(node, info['id'])
-
- # remove file from storage
- # node_client(node).storage_delete(info['file']['fileid'])
- except Exception as e:
- logger.exception(e)
-
-
-if __name__ == '__main__':
- record_client = SoundRecordClient(nodes,
- error_handler=record_onerror,
- finished_handler=record_onfinished,
- download_on_finish=True)
-
- if 'api' in config:
- bot.enable_logging(BotType.SOUND)
- bot.run()
- record_client.stop()