path: root/include/py/homekit/
diff options
authorEvgeny Zinoviev <>2024-02-17 03:08:25 +0300
committerEvgeny Zinoviev <>2024-02-17 03:08:25 +0300
commit0ce2e41a2bad790c5232fafb4b6ed631ca8cd957 (patch)
treefd401495b87cae8c95a4c4edf2c851c8177b6069 /include/py/homekit/
parente9fc2c1835f7ac8e072919df81a6661c6308dea9 (diff)
parentb7f1d55c9b4de4d21b11e5615a5dc8be0d4e883c (diff)
merge with master
Diffstat (limited to 'include/py/homekit/')
1 files changed, 301 insertions, 0 deletions
diff --git a/include/py/homekit/ b/include/py/homekit/
new file mode 100644
index 0000000..f718291
--- /dev/null
+++ b/include/py/homekit/
@@ -0,0 +1,301 @@
+from __future__ import annotations
+import json
+import socket
+import time
+import subprocess
+import traceback
+import logging
+import string
+import random
+import re
+import os
+import ipaddress
+from collections import namedtuple
+from enum import Enum
+from datetime import datetime
+from typing import Optional, List
+from zlib import adler32
+logger = logging.getLogger(__name__)
+def validate_ipv4_or_hostname(address: str, raise_exception: bool = False) -> bool:
+ if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', address):
+ parts = address.split('.')
+ if all(0 <= int(part) < 256 for part in parts):
+ return True
+ else:
+ if raise_exception:
+ raise ValueError(f"invalid IPv4 address: {address}")
+ return False
+ if re.match(r'^[a-zA-Z0-9.-]+$', address):
+ return True
+ else:
+ if raise_exception:
+ raise ValueError(f"invalid hostname: {address}")
+ return False
+def validate_ipv4(address: str) -> bool:
+ try:
+ ipaddress.IPv6Address(address)
+ return True
+ except ipaddress.AddressValueError:
+ return False
+def validate_mac_address(mac_address: str) -> bool:
+ mac_pattern = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$'
+ if re.match(mac_pattern, mac_address):
+ return True
+ else:
+ return False
+class Addr:
+ host: str
+ port: Optional[int]
+ def __init__(self, host: str, port: Optional[int] = None):
+ = host
+ self.port = port
+ @classmethod
+ def fromstring(cls, addr: str, port_required=True) -> Addr:
+ if port_required:
+ colons = addr.count(':')
+ if colons != 1:
+ raise ValueError('invalid host:port format')
+ if not colons:
+ host = addr
+ port = None
+ else:
+ host, port = addr.split(':')
+ else:
+ port = None
+ host = addr
+ validate_ipv4_or_hostname(host, raise_exception=True)
+ if port is not None:
+ port = int(port)
+ if not 0 <= port <= 65535:
+ raise ValueError(f'invalid port {port}')
+ return Addr(host, port)
+ @classmethod
+ def fromipstring(cls, addr: str) -> Addr:
+ return cls.fromstring(addr, port_required=False)
+ def __str__(self):
+ buf =
+ if self.port is not None:
+ buf += ':'+str(self.port)
+ return buf
+ def __repr__(self):
+ return self.__str__()
+ def __iter__(self):
+ yield
+ yield self.port
+def chunks(lst, n):
+ """Yield successive n-sized chunks from lst."""
+ for i in range(0, len(lst), n):
+ yield lst[i:i + n]
+def json_serial(obj):
+ """JSON serializer for datetime objects"""
+ if isinstance(obj, datetime):
+ return obj.timestamp()
+ if isinstance(obj, Enum):
+ return obj.value
+ raise TypeError("Type %s not serializable" % type(obj))
+def stringify(v) -> str:
+ return json.dumps(v, separators=(',', ':'), default=json_serial)
+def ipv4_valid(ip: str) -> bool:
+ try:
+ socket.inet_aton(ip)
+ return True
+ except socket.error:
+ return False
+def strgen(n: int):
+ return ''.join(random.choices(string.ascii_letters + string.digits, k=n))
+class MySimpleSocketClient:
+ host: str
+ port: int
+ def __init__(self, host: str, port: int):
+ = host
+ self.port = port
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.connect((, self.port))
+ self.sock.settimeout(5)
+ def __del__(self):
+ self.sock.close()
+ def write(self, line: str) -> None:
+ self.sock.sendall((line + '\r\n').encode())
+ def read(self) -> str:
+ buf = bytearray()
+ while True:
+ buf.extend(self.sock.recv(256))
+ if b'\r\n' in buf:
+ break
+ response = buf.decode().strip()
+ return response
+def send_datagram(message: str, addr: Addr) -> None:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ sock.sendto(message.encode(), addr)
+def format_tb(exc) -> Optional[List[str]]:
+ tb = traceback.format_tb(exc.__traceback__)
+ if not tb:
+ return None
+ tb = list(map(lambda s: s.strip(), tb))
+ tb.reverse()
+ if tb[0][-1:] == ':':
+ tb[0] = tb[0][:-1]
+ return tb
+class ChildProcessInfo:
+ pid: int
+ cmd: str
+ def __init__(self,
+ pid: int,
+ cmd: str):
+ = pid
+ self.cmd = cmd
+def find_child_processes(ppid: int) -> List[ChildProcessInfo]:
+ p =['pgrep', '-P', str(ppid), '--list-full'], capture_output=True)
+ if p.returncode != 0:
+ raise OSError(f'pgrep returned {p.returncode}')
+ children = []
+ lines = p.stdout.decode().strip().split('\n')
+ for line in lines:
+ try:
+ space_idx = line.index(' ')
+ except ValueError as exc:
+ logger.exception(exc)
+ continue
+ pid = int(line[0:space_idx])
+ cmd = line[space_idx+1:]
+ children.append(ChildProcessInfo(pid, cmd))
+ return children
+class Stopwatch:
+ elapsed: float
+ time_started: Optional[float]
+ def __init__(self):
+ self.elapsed = 0
+ self.time_started = None
+ def go(self):
+ if self.time_started is not None:
+ raise StopwatchError('stopwatch was already started')
+ self.time_started = time.time()
+ def pause(self):
+ if self.time_started is None:
+ raise StopwatchError('stopwatch was paused')
+ self.elapsed += time.time() - self.time_started
+ self.time_started = None
+ def get_elapsed_time(self):
+ elapsed = self.elapsed
+ if self.time_started is not None:
+ elapsed += time.time() - self.time_started
+ return elapsed
+ def reset(self):
+ self.time_started = None
+ self.elapsed = 0
+ def is_paused(self):
+ return self.time_started is None
+class StopwatchError(RuntimeError):
+ pass
+def filesize_fmt(num, suffix="B") -> str:
+ for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
+ if abs(num) < 1024.0:
+ return f"{num:3.1f} {unit}{suffix}"
+ num /= 1024.0
+ return f"{num:.1f} Yi{suffix}"
+def seconds_to_human_readable_string(seconds: int) -> str:
+ days, remainder = divmod(seconds, 86400)
+ hours, remainder = divmod(remainder, 3600)
+ minutes, seconds = divmod(remainder, 60)
+ parts = []
+ if days > 0:
+ parts.append(f"{int(days)} day{'s' if days > 1 else ''}")
+ if hours > 0:
+ parts.append(f"{int(hours)} hour{'s' if hours > 1 else ''}")
+ if minutes > 0:
+ parts.append(f"{int(minutes)} minute{'s' if minutes > 1 else ''}")
+ if seconds > 0:
+ parts.append(f"{int(seconds)} second{'s' if seconds > 1 else ''}")
+ return ' '.join(parts)
+class HashableEnum(Enum):
+ def hash(self) -> int:
+ return adler32(
+def next_tick_gen(freq):
+ t = time.time()
+ while True:
+ t += freq
+ yield max(t - time.time(), 0)
+def homekit_path(*args) -> str:
+ return os.path.realpath(
+ os.path.join(os.path.dirname(__file__), '..', '..', '..', *args)
+ )