from __future__ import annotations import json import socket import time import subprocess import traceback import logging import string import random import re import os import ipaddress from collections import namedtuple from enum import Enum from datetime import datetime from typing import Optional, List from zlib import adler32 logger = logging.getLogger(__name__) def validate_ipv4_or_hostname(address: str, raise_exception: bool = False) -> bool: if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', address): parts = address.split('.') if all(0 <= int(part) < 256 for part in parts): return True else: if raise_exception: raise ValueError(f"invalid IPv4 address: {address}") return False if re.match(r'^[a-zA-Z0-9.-]+$', address): return True else: if raise_exception: raise ValueError(f"invalid hostname: {address}") return False def validate_ipv4(address: str) -> bool: try: ipaddress.IPv4Address(address) return True except ipaddress.AddressValueError: return False def validate_mac_address(mac_address: str) -> bool: mac_pattern = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$' if re.match(mac_pattern, mac_address): return True else: return False class Addr: host: str port: Optional[int] def __init__(self, host: str, port: Optional[int] = None): self.host = host self.port = port @classmethod def fromstring(cls, addr: str, port_required=True) -> Addr: if port_required: colons = addr.count(':') if colons != 1: raise ValueError('invalid host:port format') if not colons: host = addr port = None else: host, port = addr.split(':') else: port = None host = addr validate_ipv4_or_hostname(host, raise_exception=True) if port is not None: port = int(port) if not 0 <= port <= 65535: raise ValueError(f'invalid port {port}') return Addr(host, port) @classmethod def fromipstring(cls, addr: str) -> Addr: return cls.fromstring(addr, port_required=False) def __str__(self): buf = self.host if self.port is not None: buf += ':'+str(self.port) return buf def __repr__(self): return self.__str__() def __iter__(self): yield self.host yield self.port # https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] def json_serial(obj): """JSON serializer for datetime objects""" if isinstance(obj, datetime): return obj.timestamp() if isinstance(obj, Enum): return obj.value raise TypeError("Type %s not serializable" % type(obj)) def stringify(v) -> str: return json.dumps(v, separators=(',', ':'), default=json_serial) def ipv4_valid(ip: str) -> bool: try: socket.inet_aton(ip) return True except socket.error: return False def strgen(n: int): return ''.join(random.choices(string.ascii_letters + string.digits, k=n)) class MySimpleSocketClient: host: str port: int def __init__(self, host: str, port: int): self.host = host self.port = port self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) self.sock.settimeout(5) def __del__(self): self.sock.close() def write(self, line: str) -> None: self.sock.sendall((line + '\r\n').encode()) def read(self) -> str: buf = bytearray() while True: buf.extend(self.sock.recv(256)) if b'\r\n' in buf: break response = buf.decode().strip() return response def send_datagram(message: str, addr: Addr) -> None: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.sendto(message.encode(), addr) def format_tb(exc) -> Optional[List[str]]: tb = traceback.format_tb(exc.__traceback__) if not tb: return None tb = list(map(lambda s: s.strip(), tb)) tb.reverse() if tb[0][-1:] == ':': tb[0] = tb[0][:-1] return tb class ChildProcessInfo: pid: int cmd: str def __init__(self, pid: int, cmd: str): self.pid = pid self.cmd = cmd def find_child_processes(ppid: int) -> List[ChildProcessInfo]: p = subprocess.run(['pgrep', '-P', str(ppid), '--list-full'], capture_output=True) if p.returncode != 0: raise OSError(f'pgrep returned {p.returncode}') children = [] lines = p.stdout.decode().strip().split('\n') for line in lines: try: space_idx = line.index(' ') except ValueError as exc: logger.exception(exc) continue pid = int(line[0:space_idx]) cmd = line[space_idx+1:] children.append(ChildProcessInfo(pid, cmd)) return children class Stopwatch: elapsed: float time_started: Optional[float] def __init__(self): self.elapsed = 0 self.time_started = None def go(self): if self.time_started is not None: raise StopwatchError('stopwatch was already started') self.time_started = time.time() def pause(self): if self.time_started is None: raise StopwatchError('stopwatch was paused') self.elapsed += time.time() - self.time_started self.time_started = None def get_elapsed_time(self): elapsed = self.elapsed if self.time_started is not None: elapsed += time.time() - self.time_started return elapsed def reset(self): self.time_started = None self.elapsed = 0 def is_paused(self): return self.time_started is None class StopwatchError(RuntimeError): pass def filesize_fmt(num, suffix="B") -> str: for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return f"{num:3.1f} {unit}{suffix}" num /= 1024.0 return f"{num:.1f} Yi{suffix}" def seconds_to_human_readable_string(seconds: int) -> str: days, remainder = divmod(seconds, 86400) hours, remainder = divmod(remainder, 3600) minutes, seconds = divmod(remainder, 60) parts = [] if days > 0: parts.append(f"{int(days)} day{'s' if days > 1 else ''}") if hours > 0: parts.append(f"{int(hours)} hour{'s' if hours > 1 else ''}") if minutes > 0: parts.append(f"{int(minutes)} minute{'s' if minutes > 1 else ''}") if seconds > 0: parts.append(f"{int(seconds)} second{'s' if seconds > 1 else ''}") return ' '.join(parts) class HashableEnum(Enum): def hash(self) -> int: return adler32(self.name.encode()) def next_tick_gen(freq): t = time.time() while True: t += freq yield max(t - time.time(), 0) def homekit_path(*args) -> str: return os.path.realpath( os.path.join(os.path.dirname(__file__), '..', '..', '..', *args) )