diff options
Diffstat (limited to 'src/home/util.py')
-rw-r--r-- | src/home/util.py | 255 |
1 files changed, 0 insertions, 255 deletions
diff --git a/src/home/util.py b/src/home/util.py deleted file mode 100644 index 11e7116..0000000 --- a/src/home/util.py +++ /dev/null @@ -1,255 +0,0 @@ -from __future__ import annotations - -import json -import socket -import time -import subprocess -import traceback -import logging -import string -import random -import re - -from enum import Enum -from datetime import datetime -from typing import Optional, List -from zlib import adler32 - -logger = logging.getLogger(__name__) - - -def validate_ipv4_or_hostname(address: str, raise_exception: bool = False) -> bool: - if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', address): - parts = address.split('.') - if all(0 <= int(part) < 256 for part in parts): - return True - else: - if raise_exception: - raise ValueError(f"invalid IPv4 address: {address}") - return False - - if re.match(r'^[a-zA-Z0-9.-]+$', address): - return True - else: - if raise_exception: - raise ValueError(f"invalid hostname: {address}") - return False - - -def validate_mac_address(mac_address: str) -> bool: - mac_pattern = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$' - if re.match(mac_pattern, mac_address): - return True - else: - return False - - -class Addr: - host: str - port: Optional[int] - - def __init__(self, host: str, port: Optional[int] = None): - self.host = host - self.port = port - - @staticmethod - def fromstring(addr: str) -> Addr: - colons = addr.count(':') - if colons != 1: - raise ValueError('invalid host:port format') - - if not colons: - host = addr - port= None - else: - host, port = addr.split(':') - - validate_ipv4_or_hostname(host, raise_exception=True) - - if port is not None: - port = int(port) - if not 0 <= port <= 65535: - raise ValueError(f'invalid port {port}') - - return Addr(host, port) - - def __str__(self): - buf = self.host - if self.port is not None: - buf += ':'+str(self.port) - return buf - - def __iter__(self): - yield self.host - yield self.port - - -# https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks -def chunks(lst, n): - """Yield successive n-sized chunks from lst.""" - for i in range(0, len(lst), n): - yield lst[i:i + n] - - -def json_serial(obj): - """JSON serializer for datetime objects""" - if isinstance(obj, datetime): - return obj.timestamp() - if isinstance(obj, Enum): - return obj.value - raise TypeError("Type %s not serializable" % type(obj)) - - -def stringify(v) -> str: - return json.dumps(v, separators=(',', ':'), default=json_serial) - - -def ipv4_valid(ip: str) -> bool: - try: - socket.inet_aton(ip) - return True - except socket.error: - return False - - -def strgen(n: int): - return ''.join(random.choices(string.ascii_letters + string.digits, k=n)) - - -class MySimpleSocketClient: - host: str - port: int - - def __init__(self, host: str, port: int): - self.host = host - self.port = port - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.connect((self.host, self.port)) - self.sock.settimeout(5) - - def __del__(self): - self.sock.close() - - def write(self, line: str) -> None: - self.sock.sendall((line + '\r\n').encode()) - - def read(self) -> str: - buf = bytearray() - while True: - buf.extend(self.sock.recv(256)) - if b'\r\n' in buf: - break - - response = buf.decode().strip() - return response - - -def send_datagram(message: str, addr: Addr) -> None: - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.sendto(message.encode(), addr) - - -def format_tb(exc) -> Optional[List[str]]: - tb = traceback.format_tb(exc.__traceback__) - if not tb: - return None - - tb = list(map(lambda s: s.strip(), tb)) - tb.reverse() - if tb[0][-1:] == ':': - tb[0] = tb[0][:-1] - - return tb - - -class ChildProcessInfo: - pid: int - cmd: str - - def __init__(self, - pid: int, - cmd: str): - self.pid = pid - self.cmd = cmd - - -def find_child_processes(ppid: int) -> List[ChildProcessInfo]: - p = subprocess.run(['pgrep', '-P', str(ppid), '--list-full'], capture_output=True) - if p.returncode != 0: - raise OSError(f'pgrep returned {p.returncode}') - - children = [] - - lines = p.stdout.decode().strip().split('\n') - for line in lines: - try: - space_idx = line.index(' ') - except ValueError as exc: - logger.exception(exc) - continue - - pid = int(line[0:space_idx]) - cmd = line[space_idx+1:] - - children.append(ChildProcessInfo(pid, cmd)) - - return children - - -class Stopwatch: - elapsed: float - time_started: Optional[float] - - def __init__(self): - self.elapsed = 0 - self.time_started = None - - def go(self): - if self.time_started is not None: - raise StopwatchError('stopwatch was already started') - - self.time_started = time.time() - - def pause(self): - if self.time_started is None: - raise StopwatchError('stopwatch was paused') - - self.elapsed += time.time() - self.time_started - self.time_started = None - - def get_elapsed_time(self): - elapsed = self.elapsed - if self.time_started is not None: - elapsed += time.time() - self.time_started - return elapsed - - def reset(self): - self.time_started = None - self.elapsed = 0 - - def is_paused(self): - return self.time_started is None - - -class StopwatchError(RuntimeError): - pass - - -def filesize_fmt(num, suffix="B") -> str: - for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: - if abs(num) < 1024.0: - return f"{num:3.1f} {unit}{suffix}" - num /= 1024.0 - return f"{num:.1f} Yi{suffix}" - - -class HashableEnum(Enum): - def hash(self) -> int: - return adler32(self.name.encode()) - - -def next_tick_gen(freq): - t = time.time() - while True: - t += freq - yield max(t - time.time(), 0)
\ No newline at end of file |