import requests import json import threading import logging from collections import namedtuple from datetime import datetime from enum import Enum, auto from typing import Optional, Callable, Union from requests.auth import HTTPBasicAuth from .errors import ApiResponseError from .types import * from ..config import config from ..util import stringify from ..sound import RecordFile, SoundNodeClient logger = logging.getLogger(__name__) RequestParams = namedtuple('RequestParams', 'params, files, method') class HTTPMethod(Enum): GET = auto() POST = auto() class WebAPIClient: token: str timeout: Union[float, tuple[float, float]] basic_auth: Optional[HTTPBasicAuth] do_async: bool async_error_handler: Optional[Callable] async_success_handler: Optional[Callable] def __init__(self, timeout: Union[float, tuple[float, float]] = 5): self.token = config['api']['token'] self.timeout = timeout self.basic_auth = None self.do_async = False self.async_error_handler = None self.async_success_handler = None if 'basic_auth' in config['api']: ba = config['api']['basic_auth'] col = ba.index(':') user = ba[:col] pw = ba[col+1:] logger.debug(f'enabling basic auth: {user}:{pw}') self.basic_auth = HTTPBasicAuth(user, pw) # api methods # ----------- def log_bot_request(self, bot: BotType, user_id: int, message: str): return self._post('logs/bot-request/', { 'bot': bot.value, 'user_id': str(user_id), 'message': message }) def log_openwrt(self, lines: list[tuple[int, str]]): return self._post('logs/openwrt', { 'logs': stringify(lines) }) def get_sensors_data(self, sensor: TemperatureSensorLocation, hours: int): data = self._get('sensors/data/', { 'sensor': sensor.value, 'hours': hours }) return [(datetime.fromtimestamp(date), temp, hum) for date, temp, hum in data] def add_sound_sensor_hits(self, hits: list[tuple[str, int]]): return self._post('sound_sensors/hits/', { 'hits': stringify(hits) }) def get_sound_sensor_hits(self, location: SoundSensorLocation, after: datetime) -> list[dict]: return self._process_sound_sensor_hits_data(self._get('sound_sensors/hits/', { 'after': int(after.timestamp()), 'location': location.value })) def get_last_sound_sensor_hits(self, location: SoundSensorLocation, last: int): return self._process_sound_sensor_hits_data(self._get('sound_sensors/hits/', { 'last': last, 'location': location.value })) def recordings_list(self, extended=False, as_objects=False) -> Union[list[str], list[dict], list[RecordFile]]: files = self._get('recordings/list/', {'extended': int(extended)})['data'] if as_objects: return SoundNodeClient.record_list_from_serialized(files) return files def _process_sound_sensor_hits_data(self, data: list[dict]) -> list[dict]: for item in data: item['time'] = datetime.fromtimestamp(item['time']) return data # internal methods # ---------------- def _get(self, *args, **kwargs): return self._call(method=HTTPMethod.GET, *args, **kwargs) def _post(self, *args, **kwargs): return self._call(method=HTTPMethod.POST, *args, **kwargs) def _call(self, name: str, params: dict, method: HTTPMethod, files: Optional[dict[str, str]] = None): if not self.do_async: return self._make_request(name, params, method, files) else: t = threading.Thread(target=self._make_request_in_thread, args=(name, params, method, files)) t.start() return None def _make_request(self, name: str, params: dict, method: HTTPMethod = HTTPMethod.GET, files: Optional[dict[str, str]] = None) -> Optional[any]: domain = config['api']['host'] kwargs = {} if self.basic_auth is not None: kwargs['auth'] = self.basic_auth if method == HTTPMethod.GET: if files: raise RuntimeError('can\'t upload files using GET, please use me properly') kwargs['params'] = params f = requests.get else: kwargs['data'] = params f = requests.post fd = {} if files: for fname, fpath in files.items(): fd[fname] = open(fpath, 'rb') kwargs['files'] = fd try: r = f(f'https://{domain}/api/{name}', headers={'X-Token': self.token}, timeout=self.timeout, **kwargs) if r.headers['content-type'] != 'application/json': raise ApiResponseError(r.status_code, 'TypeError', 'content-type is not application/json') data = json.loads(r.text) if r.status_code != 200 or data['result'] == 'error': raise ApiResponseError(r.status_code, data['error']['type'], data['error']['message'], data['error']['stacktrace'] if 'stacktrace' in data['error'] else None) return data['data'] if 'data' in data else True finally: for fname, f in fd.items(): # logger.debug(f'closing file {fname} (fd={f})') try: f.close() except Exception as exc: logger.exception(exc) pass def _make_request_in_thread(self, name, params, method, files): try: result = self._make_request(name, params, method, files) self._report_async_success(result, name, RequestParams(params=params, method=method, files=files)) except Exception as e: logger.exception(e) self._report_async_error(e, name, RequestParams(params=params, method=method, files=files)) def enable_async(self, success_handler: Optional[Callable] = None, error_handler: Optional[Callable] = None): self.do_async = True if error_handler: self.async_error_handler = error_handler if success_handler: self.async_success_handler = success_handler def _report_async_error(self, *args): if self.async_error_handler: self.async_error_handler(*args) def _report_async_success(self, *args): if self.async_success_handler: self.async_success_handler(*args)