summaryrefslogtreecommitdiff
path: root/src/syncleo/kettle.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/syncleo/kettle.py')
-rw-r--r--src/syncleo/kettle.py243
1 files changed, 243 insertions, 0 deletions
diff --git a/src/syncleo/kettle.py b/src/syncleo/kettle.py
new file mode 100644
index 0000000..d6e0dd6
--- /dev/null
+++ b/src/syncleo/kettle.py
@@ -0,0 +1,243 @@
+# Polaris PWK 1725CGLD smart kettle python library
+# ------------------------------------------------
+# Copyright (C) Evgeny Zinoviev, 2022
+# License: BSD-3c
+
+from __future__ import annotations
+
+import threading
+import logging
+import zeroconf
+
+from abc import abstractmethod
+from ipaddress import ip_address, IPv4Address, IPv6Address
+from typing import Optional, List, Union
+
+from .protocol import (
+ UDPConnection,
+ ModeMessage,
+ TargetTemperatureMessage,
+ PowerType,
+ ConnectionStatus,
+ ConnectionStatusListener,
+ WrappedMessage
+)
+
+
+class DeviceDiscover(threading.Thread, zeroconf.ServiceListener):
+ si: Optional[zeroconf.ServiceInfo]
+ _mac: str
+ _sb: Optional[zeroconf.ServiceBrowser]
+ _zc: Optional[zeroconf.Zeroconf]
+ _listeners: List[DeviceListener]
+ _valid_addresses: List[Union[IPv4Address, IPv6Address]]
+ _only_ipv4: bool
+
+ def __init__(self, mac: str,
+ listener: Optional[DeviceListener] = None,
+ only_ipv4=True):
+ super().__init__()
+ self.si = None
+ self._mac = mac
+ self._zc = None
+ self._sb = None
+ self._only_ipv4 = only_ipv4
+ self._valid_addresses = []
+ self._listeners = []
+ if isinstance(listener, DeviceListener):
+ self._listeners.append(listener)
+ self._logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}')
+
+ def add_listener(self, listener: DeviceListener):
+ if listener not in self._listeners:
+ self._listeners.append(listener)
+ else:
+ self._logger.warning(f'add_listener: listener {listener} already in the listeners list')
+
+ def set_info(self, info: zeroconf.ServiceInfo):
+ valid_addresses = self._get_valid_addresses(info)
+ if not valid_addresses:
+ raise ValueError('no valid addresses')
+ self._valid_addresses = valid_addresses
+ self.si = info
+ for f in self._listeners:
+ try:
+ f.device_updated()
+ except Exception as exc:
+ self._logger.error(f'set_info: error while calling device_updated on {f}')
+ self._logger.exception(exc)
+
+ def add_service(self, zc: zeroconf.Zeroconf, type_: str, name: str) -> None:
+ self._add_update_service('add_service', zc, type_, name)
+
+ def update_service(self, zc: zeroconf.Zeroconf, type_: str, name: str) -> None:
+ self._add_update_service('update_service', zc, type_, name)
+
+ def _add_update_service(self, method: str, zc: zeroconf.Zeroconf, type_: str, name: str) -> None:
+ info = zc.get_service_info(type_, name)
+ if name.startswith(f'{self._mac}.'):
+ self._logger.info(f'{method}: type={type_} name={name}')
+ try:
+ self.set_info(info)
+ except ValueError as exc:
+ self._logger.error(f'{method}: rejected: {str(exc)}')
+ else:
+ self._logger.debug(f'{method}: mac not matched: {info}')
+
+ def remove_service(self, zc: zeroconf.Zeroconf, type_: str, name: str) -> None:
+ if name.startswith(f'{self._mac}.'):
+ self._logger.info(f'remove_service: type={type_} name={name}')
+ # TODO what to do here?!
+
+ def run(self):
+ self._logger.debug('starting zeroconf service browser')
+ ip_version = zeroconf.IPVersion.V4Only if self._only_ipv4 else zeroconf.IPVersion.All
+ self._zc = zeroconf.Zeroconf(ip_version=ip_version)
+ self._sb = zeroconf.ServiceBrowser(self._zc, "_syncleo._udp.local.", self)
+ self._sb.join()
+
+ def stop(self):
+ if self._sb:
+ try:
+ self._sb.cancel()
+ except RuntimeError:
+ pass
+ self._sb = None
+ self._zc.close()
+ self._zc = None
+
+ def _get_valid_addresses(self, si: zeroconf.ServiceInfo) -> List[Union[IPv4Address, IPv6Address]]:
+ valid = []
+ for addr in map(ip_address, si.addresses):
+ if self._only_ipv4 and not isinstance(addr, IPv4Address):
+ continue
+ if isinstance(addr, IPv4Address) and str(addr).startswith('169.254.'):
+ continue
+ valid.append(addr)
+ return valid
+
+ @property
+ def pubkey(self) -> bytes:
+ return bytes.fromhex(self.si.properties[b'public'].decode())
+
+ @property
+ def curve(self) -> int:
+ return int(self.si.properties[b'curve'].decode())
+
+ @property
+ def addr(self) -> Union[IPv4Address, IPv6Address]:
+ return self._valid_addresses[0]
+
+ @property
+ def port(self) -> int:
+ return int(self.si.port)
+
+ @property
+ def protocol(self) -> int:
+ return int(self.si.properties[b'protocol'].decode())
+
+
+class DeviceListener:
+ @abstractmethod
+ def device_updated(self):
+ pass
+
+
+class Kettle(DeviceListener, ConnectionStatusListener):
+ mac: str
+ device: Optional[DeviceDiscover]
+ device_token: str
+ conn: Optional[UDPConnection]
+ conn_status: Optional[ConnectionStatus]
+ _read_timeout: Optional[int]
+ _logger: logging.Logger
+ _find_evt: threading.Event
+
+ def __init__(self, mac: str, device_token: str, read_timeout: Optional[int] = None):
+ super().__init__()
+ self.mac = mac
+ self.device = None
+ self.device_token = device_token
+ self.conn = None
+ self.conn_status = None
+ self._read_timeout = read_timeout
+ self._find_evt = threading.Event()
+ self._logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}')
+
+ def device_updated(self):
+ self._find_evt.set()
+ self._logger.info(f'device updated, service info: {self.device.si}')
+
+ def connection_status_updated(self, status: ConnectionStatus):
+ self.conn_status = status
+
+ def discover(self, wait=True, timeout=None, listener=None) -> Optional[zeroconf.ServiceInfo]:
+ do_start = False
+ if not self.device:
+ self.device = DeviceDiscover(self.mac, listener=self, only_ipv4=True)
+ do_start = True
+ self._logger.debug('discover: started device discovery')
+ else:
+ self._logger.warning('discover: already started')
+
+ if listener is not None:
+ self.device.add_listener(listener)
+
+ if do_start:
+ self.device.start()
+
+ if wait:
+ self._find_evt.clear()
+ try:
+ self._find_evt.wait(timeout=timeout)
+ except KeyboardInterrupt:
+ self.device.stop()
+ return None
+ return self.device.si
+
+ def start_server_if_needed(self,
+ incoming_message_listener=None,
+ connection_status_listener=None):
+ if self.conn:
+ self._logger.warning('start_server_if_needed: server is already started!')
+ self.conn.set_address(self.device.addr, self.device.port)
+ self.conn.set_device_pubkey(self.device.pubkey)
+ return
+
+ assert self.device.curve == 29, f'curve type {self.device.curve} is not implemented'
+ assert self.device.protocol == 2, f'protocol {self.device.protocol} is not supported'
+
+ kw = {}
+ if self._read_timeout is not None:
+ kw['read_timeout'] = self._read_timeout
+ self.conn = UDPConnection(addr=self.device.addr,
+ port=self.device.port,
+ device_pubkey=self.device.pubkey,
+ device_token=bytes.fromhex(self.device_token), **kw)
+ if incoming_message_listener:
+ self.conn.add_incoming_message_listener(incoming_message_listener)
+
+ self.conn.add_connection_status_listener(self)
+ if connection_status_listener:
+ self.conn.add_connection_status_listener(connection_status_listener)
+
+ self.conn.start()
+
+ def stop_all(self):
+ # when we stop server, we should also stop device discovering service
+ if self.conn:
+ self.conn.interrupted = True
+ self.conn = None
+ self.device.stop()
+ self.device = None
+
+ def is_connected(self) -> bool:
+ return self.conn is not None and self.conn_status == ConnectionStatus.CONNECTED
+
+ def set_power(self, power_type: PowerType, callback: callable):
+ message = ModeMessage(power_type)
+ self.conn.enqueue_message(WrappedMessage(message, handler=callback, ack=True))
+
+ def set_target_temperature(self, temp: int, callback: callable):
+ message = TargetTemperatureMessage(temp)
+ self.conn.enqueue_message(WrappedMessage(message, handler=callback, ack=True))