diff options
Diffstat (limited to 'src/home/api/web_api_client.py')
-rw-r--r-- | src/home/api/web_api_client.py | 225 |
1 files changed, 0 insertions, 225 deletions
diff --git a/src/home/api/web_api_client.py b/src/home/api/web_api_client.py deleted file mode 100644 index 6677182..0000000 --- a/src/home/api/web_api_client.py +++ /dev/null @@ -1,225 +0,0 @@ -import requests -import json -import threading -import logging - -from collections import namedtuple -from datetime import datetime -from enum import Enum, auto -from typing import Optional, Callable, Union, List, Tuple, Dict -from requests.auth import HTTPBasicAuth - -from .errors import ApiResponseError -from .types import * -from ..config import config -from ..util import stringify -from ..media import RecordFile, MediaNodeClient - -logger = logging.getLogger(__name__) - - -RequestParams = namedtuple('RequestParams', 'params, files, method') - - -class HTTPMethod(Enum): - GET = auto() - POST = auto() - - -class WebAPIClient: - token: str - timeout: Union[float, Tuple[float, float]] - basic_auth: Optional[HTTPBasicAuth] - do_async: bool - async_error_handler: Optional[Callable] - async_success_handler: Optional[Callable] - - def __init__(self, timeout: Union[float, Tuple[float, float]] = 5): - self.token = config['api']['token'] - self.timeout = timeout - self.basic_auth = None - self.do_async = False - self.async_error_handler = None - self.async_success_handler = None - - if 'basic_auth' in config['api']: - ba = config['api']['basic_auth'] - col = ba.index(':') - - user = ba[:col] - pw = ba[col+1:] - - logger.debug(f'enabling basic auth: {user}:{pw}') - self.basic_auth = HTTPBasicAuth(user, pw) - - # api methods - # ----------- - - def log_bot_request(self, - bot: BotType, - user_id: int, - message: str): - return self._post('log/bot_request/', { - 'bot': bot.value, - 'user_id': str(user_id), - 'message': message - }) - - def log_openwrt(self, - lines: List[Tuple[int, str]], - access_point: int): - return self._post('log/openwrt/', { - 'logs': stringify(lines), - 'ap': access_point - }) - - def get_sensors_data(self, - sensor: TemperatureSensorLocation, - hours: int): - data = self._get('sensors/data/', { - 'sensor': sensor.value, - 'hours': hours - }) - return [(datetime.fromtimestamp(date), temp, hum) for date, temp, hum in data] - - def add_sound_sensor_hits(self, - hits: List[Tuple[str, int]]): - return self._post('sound_sensors/hits/', { - 'hits': stringify(hits) - }) - - def get_sound_sensor_hits(self, - location: SoundSensorLocation, - after: datetime) -> List[dict]: - return self._process_sound_sensor_hits_data(self._get('sound_sensors/hits/', { - 'after': int(after.timestamp()), - 'location': location.value - })) - - def get_last_sound_sensor_hits(self, location: SoundSensorLocation, last: int): - return self._process_sound_sensor_hits_data(self._get('sound_sensors/hits/', { - 'last': last, - 'location': location.value - })) - - def recordings_list(self, extended=False, as_objects=False) -> Union[List[str], List[dict], List[RecordFile]]: - files = self._get('recordings/list/', {'extended': int(extended)})['data'] - if as_objects: - return MediaNodeClient.record_list_from_serialized(files) - return files - - def inverter_get_consumed_energy(self, s_from: str, s_to: str): - return self._get('inverter/consumed_energy/', { - 'from': s_from, - 'to': s_to - }) - - def inverter_get_grid_consumed_energy(self, s_from: str, s_to: str): - return self._get('inverter/grid_consumed_energy/', { - 'from': s_from, - 'to': s_to - }) - - @staticmethod - def _process_sound_sensor_hits_data(data: List[dict]) -> List[dict]: - for item in data: - item['time'] = datetime.fromtimestamp(item['time']) - return data - - # internal methods - # ---------------- - - def _get(self, *args, **kwargs): - return self._call(method=HTTPMethod.GET, *args, **kwargs) - - def _post(self, *args, **kwargs): - return self._call(method=HTTPMethod.POST, *args, **kwargs) - - def _call(self, - name: str, - params: dict, - method: HTTPMethod, - files: Optional[Dict[str, str]] = None): - if not self.do_async: - return self._make_request(name, params, method, files) - else: - t = threading.Thread(target=self._make_request_in_thread, args=(name, params, method, files)) - t.start() - return None - - def _make_request(self, - name: str, - params: dict, - method: HTTPMethod = HTTPMethod.GET, - files: Optional[Dict[str, str]] = None) -> Optional[any]: - domain = config['api']['host'] - kwargs = {} - - if self.basic_auth is not None: - kwargs['auth'] = self.basic_auth - - if method == HTTPMethod.GET: - if files: - raise RuntimeError('can\'t upload files using GET, please use me properly') - kwargs['params'] = params - f = requests.get - else: - kwargs['data'] = params - f = requests.post - - fd = {} - if files: - for fname, fpath in files.items(): - fd[fname] = open(fpath, 'rb') - kwargs['files'] = fd - - try: - r = f(f'https://{domain}/{name}', - headers={'X-Token': self.token}, - timeout=self.timeout, - **kwargs) - - if not r.headers['content-type'].startswith('application/json'): - raise ApiResponseError(r.status_code, 'TypeError', 'content-type is not application/json') - - data = json.loads(r.text) - if r.status_code != 200: - raise ApiResponseError(r.status_code, - data['error'], - data['message'], - data['stacktrace'] if 'stacktrace' in data['error'] else None) - - return data['response'] if 'response' in data else True - finally: - for fname, f in fd.items(): - # logger.debug(f'closing file {fname} (fd={f})') - try: - f.close() - except Exception as exc: - logger.exception(exc) - pass - - def _make_request_in_thread(self, name, params, method, files): - try: - result = self._make_request(name, params, method, files) - self._report_async_success(result, name, RequestParams(params=params, method=method, files=files)) - except Exception as e: - logger.exception(e) - self._report_async_error(e, name, RequestParams(params=params, method=method, files=files)) - - def enable_async(self, - success_handler: Optional[Callable] = None, - error_handler: Optional[Callable] = None): - self.do_async = True - if error_handler: - self.async_error_handler = error_handler - if success_handler: - self.async_success_handler = success_handler - - def _report_async_error(self, *args): - if self.async_error_handler: - self.async_error_handler(*args) - - def _report_async_success(self, *args): - if self.async_success_handler: - self.async_success_handler(*args)
\ No newline at end of file |