summaryrefslogtreecommitdiff
path: root/src/polaris/kettle.py
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2022-06-28 03:22:30 +0300
committerEvgeny Zinoviev <me@ch1p.io>2022-06-30 03:47:49 +0300
commit8f20c9b825cabab7a3f0f5dd2cfe000cc7f72c28 (patch)
treeb5d7446e7b2fcfd42b1e5029aeef33ecb5f9715f /src/polaris/kettle.py
parentee09bc98aedfc6a65a5026432b399345a30a39c8 (diff)
polaris pwk 1725cgld full support
- significant improvements, correctnesses and stability fixes in protocol implementation - correct handling of device appearances and disappearances - flawlessly functioning telegram bot that re-renders kettle's state (temperature and other) in real time
Diffstat (limited to 'src/polaris/kettle.py')
-rw-r--r--src/polaris/kettle.py271
1 files changed, 206 insertions, 65 deletions
diff --git a/src/polaris/kettle.py b/src/polaris/kettle.py
index 37f6813..1b995fd 100644
--- a/src/polaris/kettle.py
+++ b/src/polaris/kettle.py
@@ -1,97 +1,238 @@
-# SPDX-License-Identifier: BSD-3-Clause
+# Polaris PWK 1725CGLD smart kettle python library
+# ------------------------------------------------
+# Copyright (C) Evgeny Zinoviev, 2022
+# License: BSD-3c
from __future__ import annotations
+import threading
import logging
import zeroconf
-import cryptography.hazmat.primitives._serialization
-from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
-from cryptography.hazmat.primitives import hashes
-
-from functools import partial
-from abc import ABC
-from ipaddress import ip_address
-from typing import Optional
+from abc import abstractmethod
+from ipaddress import ip_address, IPv4Address, IPv6Address
+from typing import Optional, List, Union
from .protocol import (
- Connection,
+ UDPConnection,
ModeMessage,
- HandshakeMessage,
TargetTemperatureMessage,
- Message,
- PowerType
+ PowerType,
+ ConnectionStatus,
+ ConnectionStatusListener,
+ WrappedMessage
)
-_logger = logging.getLogger(__name__)
-
-# Polaris PWK 1725CGLD IoT kettle
-class Kettle(zeroconf.ServiceListener, ABC):
- macaddr: str
- device_token: str
- sb: Optional[zeroconf.ServiceBrowser]
- found_device: Optional[zeroconf.ServiceInfo]
- conn: Optional[Connection]
+class DeviceDiscover(threading.Thread, zeroconf.ServiceListener):
+ si: Optional[zeroconf.ServiceInfo]
+ _mac: str
+ _sb: Optional[zeroconf.ServiceBrowser]
+ _zc: Optional[zeroconf.Zeroconf]
+ _listeners: List[DeviceListener]
+ _valid_addresses: List[Union[IPv4Address, IPv6Address]]
+ _only_ipv4: bool
- def __init__(self, mac: str, device_token: str):
+ def __init__(self, mac: str,
+ listener: Optional[DeviceListener] = None,
+ only_ipv4=True):
super().__init__()
- self.zeroconf = zeroconf.Zeroconf()
- self.sb = None
- self.macaddr = mac
- self.device_token = device_token
- self.found_device = None
- self.conn = None
+ self.si = None
+ self._mac = mac
+ self._zc = None
+ self._sb = None
+ self._only_ipv4 = only_ipv4
+ self._valid_addresses = []
+ self._listeners = []
+ if isinstance(listener, DeviceListener):
+ self._listeners.append(listener)
+ self._logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}')
+
+ def add_listener(self, listener: DeviceListener):
+ if listener not in self._listeners:
+ self._listeners.append(listener)
+ else:
+ self._logger.warning(f'add_listener: listener {listener} already in the listeners list')
+
+ def set_info(self, info: zeroconf.ServiceInfo):
+ valid_addresses = self._get_valid_addresses(info)
+ if not valid_addresses:
+ raise ValueError('no valid addresses')
+ self._valid_addresses = valid_addresses
+ self.si = info
+ for f in self._listeners:
+ try:
+ f.device_updated()
+ except Exception as exc:
+ self._logger.error(f'set_info: error while calling device_updated on {f}')
+ self._logger.exception(exc)
- def find(self) -> zeroconf.ServiceInfo:
- self.sb = zeroconf.ServiceBrowser(self.zeroconf, "_syncleo._udp.local.", self)
- self.sb.join()
+ def add_service(self, zc: zeroconf.Zeroconf, type_: str, name: str) -> None:
+ self._add_update_service('add_service', zc, type_, name)
- return self.found_device
+ def update_service(self, zc: zeroconf.Zeroconf, type_: str, name: str) -> None:
+ self._add_update_service('update_service', zc, type_, name)
- # zeroconf.ServiceListener implementation
- def add_service(self,
- zc: zeroconf.Zeroconf,
- type_: str,
- name: str) -> None:
- if name.startswith(f'{self.macaddr}.'):
- info = zc.get_service_info(type_, name)
+ def _add_update_service(self, method: str, zc: zeroconf.Zeroconf, type_: str, name: str) -> None:
+ info = zc.get_service_info(type_, name)
+ if name.startswith(f'{self._mac}.'):
+ self._logger.info(f'{method}: type={type_} name={name}')
+ try:
+ self.set_info(info)
+ except ValueError as exc:
+ self._logger.error(f'{method}: rejected: {str(exc)}')
+ else:
+ self._logger.debug(f'{method}: mac not matched: {info}')
+
+ def remove_service(self, zc: zeroconf.Zeroconf, type_: str, name: str) -> None:
+ if name.startswith(f'{self._mac}.'):
+ self._logger.info(f'remove_service: type={type_} name={name}')
+ # TODO what to do here?!
+
+ def run(self):
+ self._logger.info('starting zeroconf service browser')
+ ip_version = zeroconf.IPVersion.V4Only if self._only_ipv4 else zeroconf.IPVersion.All
+ self._zc = zeroconf.Zeroconf(ip_version=ip_version)
+ self._sb = zeroconf.ServiceBrowser(self._zc, "_syncleo._udp.local.", self)
+ self._sb.join()
+
+ def stop(self):
+ if self._sb:
try:
- self.sb.cancel()
+ self._sb.cancel()
except RuntimeError:
pass
- self.zeroconf.close()
- self.found_device = info
+ self._sb = None
+ self._zc.close()
+ self._zc = None
+
+ def _get_valid_addresses(self, si: zeroconf.ServiceInfo) -> List[Union[IPv4Address, IPv6Address]]:
+ valid = []
+ for addr in map(ip_address, si.addresses):
+ if self._only_ipv4 and not isinstance(addr, IPv4Address):
+ continue
+ if isinstance(addr, IPv4Address) and str(addr).startswith('169.254.'):
+ continue
+ valid.append(addr)
+ return valid
- assert self.device_curve == 29, f'curve type {self.device_curve} is not implemented'
-
- def start_server(self, callback: callable):
- addresses = list(map(ip_address, self.found_device.addresses))
- self.conn = Connection(addr=addresses[0],
- port=int(self.found_device.port),
- device_pubkey=self.device_pubkey,
- device_token=bytes.fromhex(self.device_token))
+ @property
+ def pubkey(self) -> bytes:
+ return bytes.fromhex(self.si.properties[b'public'].decode())
- # shake the kettle's hand
- self._pass_message(HandshakeMessage(), callback)
- self.conn.start()
+ @property
+ def curve(self) -> int:
+ return int(self.si.properties[b'curve'].decode())
- def stop_server(self):
- self.conn.interrupted = True
+ @property
+ def addr(self) -> Union[IPv4Address, IPv6Address]:
+ return self._valid_addresses[0]
@property
- def device_pubkey(self) -> bytes:
- return bytes.fromhex(self.found_device.properties[b'public'].decode())
+ def port(self) -> int:
+ return int(self.si.port)
@property
- def device_curve(self) -> int:
- return int(self.found_device.properties[b'curve'].decode())
+ def protocol(self) -> int:
+ return int(self.si.properties[b'protocol'].decode())
+
+
+class DeviceListener:
+ @abstractmethod
+ def device_updated(self):
+ pass
+
+
+class Kettle(DeviceListener, ConnectionStatusListener):
+ mac: str
+ device: Optional[DeviceDiscover]
+ device_token: str
+ conn: Optional[UDPConnection]
+ conn_status: Optional[ConnectionStatus]
+ _logger: logging.Logger
+ _find_evt: threading.Event
+
+ def __init__(self, mac: str, device_token: str):
+ super().__init__()
+ self.mac = mac
+ self.device = None
+ self.device_token = device_token
+ self.conn = None
+ self.conn_status = None
+ self._find_evt = threading.Event()
+ self._logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}')
+
+ def device_updated(self):
+ self._find_evt.set()
+ self._logger.info(f'device updated, service info: {self.device.si}')
+
+ def connection_status_updated(self, status: ConnectionStatus):
+ self.conn_status = status
+
+ def discover(self, wait=True, timeout=None, listener=None) -> Optional[zeroconf.ServiceInfo]:
+ do_start = False
+ if not self.device:
+ self.device = DeviceDiscover(self.mac, listener=self, only_ipv4=True)
+ do_start = True
+ self._logger.debug('discover: started device discovery')
+ else:
+ self._logger.warning('discover: already started')
+
+ if listener is not None:
+ self.device.add_listener(listener)
+
+ if do_start:
+ self.device.start()
+
+ if wait:
+ self._find_evt.clear()
+ try:
+ self._find_evt.wait(timeout=timeout)
+ except KeyboardInterrupt:
+ self.device.stop()
+ return None
+ return self.device.si
+
+ def start_server_if_needed(self,
+ incoming_message_listener=None,
+ connection_status_listener=None):
+ if self.conn:
+ self._logger.warning('start_server_if_needed: server is already started!')
+ self.conn.set_address(self.device.addr, self.device.port)
+ self.conn.set_device_pubkey(self.device.pubkey)
+ return
+
+ assert self.device.curve == 29, f'curve type {self.device.curve} is not implemented'
+ assert self.device.protocol == 2, f'protocol {self.device.protocol} is not supported'
+
+ self.conn = UDPConnection(addr=self.device.addr,
+ port=self.device.port,
+ device_pubkey=self.device.pubkey,
+ device_token=bytes.fromhex(self.device_token))
+ if incoming_message_listener:
+ self.conn.add_incoming_message_listener(incoming_message_listener)
+
+ self.conn.add_connection_status_listener(self)
+ if connection_status_listener:
+ self.conn.add_connection_status_listener(connection_status_listener)
+
+ self.conn.start()
+
+ def stop_all(self):
+ # when we stop server, we should also stop device discovering service
+ if self.conn:
+ self.conn.interrupted = True
+ self.conn = None
+ self.device.stop()
+ self.device = None
+
+ def is_connected(self) -> bool:
+ return self.conn is not None and self.conn_status == ConnectionStatus.CONNECTED
def set_power(self, power_type: PowerType, callback: callable):
- self._pass_message(ModeMessage(power_type), callback)
+ message = ModeMessage(power_type)
+ self.conn.enqueue_message(WrappedMessage(message, handler=callback, ack=True))
def set_target_temperature(self, temp: int, callback: callable):
- self._pass_message(TargetTemperatureMessage(temp), callback)
-
- def _pass_message(self, message: Message, callback: callable):
- self.conn.send_message(message, partial(callback, self))
+ message = TargetTemperatureMessage(temp)
+ self.conn.enqueue_message(WrappedMessage(message, handler=callback, ack=True))