summaryrefslogtreecommitdiff
path: root/src/home/util.py
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2023-09-27 00:54:57 +0300
committerEvgeny Zinoviev <me@ch1p.io>2023-09-27 00:54:57 +0300
commitd3a295872c49defb55fc8e4e43e55550991e0927 (patch)
treeb9dca15454f9027d5a9dad0d4443a20de04dbc5d /src/home/util.py
parentb7cbc2571c1870b4582ead45277d0aa7f961bec8 (diff)
parentbdbb296697f55f4c3a07af43c9aaf7a9ea86f3d0 (diff)
Merge branch 'master' of ch1p.io:homekit
Diffstat (limited to 'src/home/util.py')
-rw-r--r--src/home/util.py196
1 files changed, 0 insertions, 196 deletions
diff --git a/src/home/util.py b/src/home/util.py
deleted file mode 100644
index 93a9d8f..0000000
--- a/src/home/util.py
+++ /dev/null
@@ -1,196 +0,0 @@
-import json
-import socket
-import time
-import subprocess
-import traceback
-import logging
-import string
-import random
-
-from enum import Enum
-from datetime import datetime
-from typing import Tuple, Optional, List
-from zlib import adler32
-
-Addr = Tuple[str, int] # network address type (host, port)
-
-logger = logging.getLogger(__name__)
-
-
-# https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks
-def chunks(lst, n):
- """Yield successive n-sized chunks from lst."""
- for i in range(0, len(lst), n):
- yield lst[i:i + n]
-
-
-def json_serial(obj):
- """JSON serializer for datetime objects"""
- if isinstance(obj, datetime):
- return obj.timestamp()
- if isinstance(obj, Enum):
- return obj.value
- raise TypeError("Type %s not serializable" % type(obj))
-
-
-def stringify(v) -> str:
- return json.dumps(v, separators=(',', ':'), default=json_serial)
-
-
-def ipv4_valid(ip: str) -> bool:
- try:
- socket.inet_aton(ip)
- return True
- except socket.error:
- return False
-
-
-def parse_addr(addr: str) -> Addr:
- if addr.count(':') != 1:
- raise ValueError('invalid host:port format')
-
- host, port = addr.split(':')
- if not ipv4_valid(host):
- raise ValueError('invalid ipv4 address')
-
- port = int(port)
- if not 0 <= port <= 65535:
- raise ValueError('invalid port')
-
- return host, port
-
-
-def strgen(n: int):
- return ''.join(random.choices(string.ascii_letters + string.digits, k=n))
-
-
-class MySimpleSocketClient:
- host: str
- port: int
-
- def __init__(self, host: str, port: int):
- self.host = host
- self.port = port
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.connect((self.host, self.port))
- self.sock.settimeout(5)
-
- def __del__(self):
- self.sock.close()
-
- def write(self, line: str) -> None:
- self.sock.sendall((line + '\r\n').encode())
-
- def read(self) -> str:
- buf = bytearray()
- while True:
- buf.extend(self.sock.recv(256))
- if b'\r\n' in buf:
- break
-
- response = buf.decode().strip()
- return response
-
-
-def send_datagram(message: str, addr: Addr) -> None:
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock.sendto(message.encode(), addr)
-
-
-def format_tb(exc) -> Optional[List[str]]:
- tb = traceback.format_tb(exc.__traceback__)
- if not tb:
- return None
-
- tb = list(map(lambda s: s.strip(), tb))
- tb.reverse()
- if tb[0][-1:] == ':':
- tb[0] = tb[0][:-1]
-
- return tb
-
-
-class ChildProcessInfo:
- pid: int
- cmd: str
-
- def __init__(self,
- pid: int,
- cmd: str):
- self.pid = pid
- self.cmd = cmd
-
-
-def find_child_processes(ppid: int) -> List[ChildProcessInfo]:
- p = subprocess.run(['pgrep', '-P', str(ppid), '--list-full'], capture_output=True)
- if p.returncode != 0:
- raise OSError(f'pgrep returned {p.returncode}')
-
- children = []
-
- lines = p.stdout.decode().strip().split('\n')
- for line in lines:
- try:
- space_idx = line.index(' ')
- except ValueError as exc:
- logger.exception(exc)
- continue
-
- pid = int(line[0:space_idx])
- cmd = line[space_idx+1:]
-
- children.append(ChildProcessInfo(pid, cmd))
-
- return children
-
-
-class Stopwatch:
- elapsed: float
- time_started: Optional[float]
-
- def __init__(self):
- self.elapsed = 0
- self.time_started = None
-
- def go(self):
- if self.time_started is not None:
- raise StopwatchError('stopwatch was already started')
-
- self.time_started = time.time()
-
- def pause(self):
- if self.time_started is None:
- raise StopwatchError('stopwatch was paused')
-
- self.elapsed += time.time() - self.time_started
- self.time_started = None
-
- def get_elapsed_time(self):
- elapsed = self.elapsed
- if self.time_started is not None:
- elapsed += time.time() - self.time_started
- return elapsed
-
- def reset(self):
- self.time_started = None
- self.elapsed = 0
-
- def is_paused(self):
- return self.time_started is None
-
-
-class StopwatchError(RuntimeError):
- pass
-
-
-def filesize_fmt(num, suffix="B") -> str:
- for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
- if abs(num) < 1024.0:
- return f"{num:3.1f} {unit}{suffix}"
- num /= 1024.0
- return f"{num:.1f} Yi{suffix}"
-
-
-class HashableEnum(Enum):
- def hash(self) -> int:
- return adler32(self.name.encode()) \ No newline at end of file