#!/usr/bin/env python3 import __py_include import threading from time import sleep from homekit.config import config from homekit.api import WebApiClient from homekit.api.types import SoundSensorLocation from typing import List, Tuple interrupted = False class HitCounter: def __init__(self): self.sensors = {} self.lock = threading.Lock() self._reset_sensors() def _reset_sensors(self): for loc in SoundSensorLocation: self.sensors[loc.name.lower()] = 0 def add(self, name: str, hits: int): if name not in self.sensors: raise ValueError(f'sensor {name} not found') with self.lock: self.sensors[name] += hits def get_all(self) -> List[Tuple[str, int]]: vals = [] with self.lock: for name, hits in self.sensors.items(): if hits > 0: vals.append((name, hits)) self._reset_sensors() return vals def hits_sender(): while True: try: all_hits = hc.get_all() if all_hits: api.add_sound_sensor_hits(all_hits) sleep(5) except (KeyboardInterrupt, SystemExit): return if __name__ == '__main__': config.load_app('test_api') hc = HitCounter() api = WebApiClient() hc.add('spb1', 1) # hc.add('big_house', 123) hits_sender()