diff options
Diffstat (limited to 'src/home/camera/esp32.py')
-rw-r--r-- | src/home/camera/esp32.py | 226 |
1 files changed, 0 insertions, 226 deletions
diff --git a/src/home/camera/esp32.py b/src/home/camera/esp32.py deleted file mode 100644 index fe6de0e..0000000 --- a/src/home/camera/esp32.py +++ /dev/null @@ -1,226 +0,0 @@ -import logging -import requests -import json -import asyncio -import aioshutil - -from io import BytesIO -from functools import partial -from typing import Union, Optional -from enum import Enum -from ..api.errors import ApiResponseError -from ..util import Addr - - -class FrameSize(Enum): - UXGA_1600x1200 = 13 - SXGA_1280x1024 = 12 - HD_1280x720 = 11 - XGA_1024x768 = 10 - SVGA_800x600 = 9 - VGA_640x480 = 8 - HVGA_480x320 = 7 - CIF_400x296 = 6 - QVGA_320x240 = 5 - N_240x240 = 4 - HQVGA_240x176 = 3 - QCIF_176x144 = 2 - QQVGA_160x120 = 1 - N_96x96 = 0 - - -class WBMode(Enum): - AUTO = 0 - SUNNY = 1 - CLOUDY = 2 - OFFICE = 3 - HOME = 4 - - -def _assert_bounds(n: int, min: int, max: int): - if not min <= n <= max: - raise ValueError(f'value must be between {min} and {max}') - - -class WebClient: - def __init__(self, - addr: Addr): - self.endpoint = f'http://{addr[0]}:{addr[1]}' - self.logger = logging.getLogger(self.__class__.__name__) - self.delay = 0 - self.isfirstrequest = True - - async def syncsettings(self, settings) -> bool: - status = await self.getstatus() - self.logger.debug(f'syncsettings: status={status}') - - changed_anything = False - - for name, value in settings.items(): - server_name = name - if name == 'aec_dsp': - server_name = 'aec2' - - if server_name not in status: - # legacy compatibility - if server_name != 'vflip': - self.logger.warning(f'syncsettings: field `{server_name}` not found in camera status') - continue - - try: - # server returns 0 or 1 for bool values - if type(value) is bool: - value = int(value) - - if status[server_name] == value: - continue - except KeyError as exc: - if name != 'vflip': - self.logger.error(exc) - - try: - # fix for cases like when field is called raw_gma, but method is setrawgma() - name = name.replace('_', '') - - func = getattr(self, f'set{name}') - self.logger.debug(f'syncsettings: calling set{name}({value})') - - await func(value) - - changed_anything = True - except AttributeError as exc: - self.logger.exception(exc) - self.logger.error(f'syncsettings: method set{name}() not found') - - return changed_anything - - def setdelay(self, delay: int): - self.delay = delay - - async def capture(self, output: Optional[str] = None) -> Union[BytesIO, bool]: - kw = {} - if output: - kw['save_to'] = output - else: - kw['as_bytes'] = True - return await self._call('capture', **kw) - - async def getstatus(self): - return json.loads(await self._call('status')) - - async def setflash(self, enable: bool): - await self._control('flash', int(enable)) - - async def setframesize(self, fs: Union[int, FrameSize]): - if type(fs) is int: - fs = FrameSize(fs) - await self._control('framesize', fs.value) - - async def sethmirror(self, enable: bool): - await self._control('hmirror', int(enable)) - - async def setvflip(self, enable: bool): - await self._control('vflip', int(enable)) - - async def setawb(self, enable: bool): - await self._control('awb', int(enable)) - - async def setawbgain(self, enable: bool): - await self._control('awb_gain', int(enable)) - - async def setwbmode(self, mode: WBMode): - await self._control('wb_mode', mode.value) - - async def setaecsensor(self, enable: bool): - await self._control('aec', int(enable)) - - async def setaecdsp(self, enable: bool): - await self._control('aec2', int(enable)) - - async def setagc(self, enable: bool): - await self._control('agc', int(enable)) - - async def setagcgain(self, gain: int): - _assert_bounds(gain, 1, 31) - await self._control('agc_gain', gain) - - async def setgainceiling(self, gainceiling: int): - _assert_bounds(gainceiling, 2, 128) - await self._control('gainceiling', gainceiling) - - async def setbpc(self, enable: bool): - await self._control('bpc', int(enable)) - - async def setwpc(self, enable: bool): - await self._control('wpc', int(enable)) - - async def setrawgma(self, enable: bool): - await self._control('raw_gma', int(enable)) - - async def setlenscorrection(self, enable: bool): - await self._control('lenc', int(enable)) - - async def setdcw(self, enable: bool): - await self._control('dcw', int(enable)) - - async def setcolorbar(self, enable: bool): - await self._control('colorbar', int(enable)) - - async def setquality(self, q: int): - _assert_bounds(q, 4, 63) - await self._control('quality', q) - - async def setbrightness(self, brightness: int): - _assert_bounds(brightness, -2, -2) - await self._control('brightness', brightness) - - async def setcontrast(self, contrast: int): - _assert_bounds(contrast, -2, 2) - await self._control('contrast', contrast) - - async def setsaturation(self, saturation: int): - _assert_bounds(saturation, -2, 2) - await self._control('saturation', saturation) - - async def _control(self, var: str, value: Union[int, str]): - return await self._call('control', params={'var': var, 'val': value}) - - async def _call(self, - method: str, - params: Optional[dict] = None, - save_to: Optional[str] = None, - as_bytes=False) -> Union[str, bool, BytesIO]: - loop = asyncio.get_event_loop() - - if not self.isfirstrequest and self.delay > 0: - sleeptime = self.delay / 1000 - self.logger.debug(f'sleeping for {sleeptime}') - - await asyncio.sleep(sleeptime) - - self.isfirstrequest = False - - url = f'{self.endpoint}/{method}' - self.logger.debug(f'calling {url}, params: {params}') - - kwargs = {} - if params: - kwargs['params'] = params - if save_to: - kwargs['stream'] = True - - r = await loop.run_in_executor(None, - partial(requests.get, url, **kwargs)) - if r.status_code != 200: - raise ApiResponseError(status_code=r.status_code) - - if as_bytes: - return BytesIO(r.content) - - if save_to: - r.raise_for_status() - with open(save_to, 'wb') as f: - await aioshutil.copyfileobj(r.raw, f) - return True - - return r.text |