diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2023-09-27 00:54:57 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2023-09-27 00:54:57 +0300 |
commit | d3a295872c49defb55fc8e4e43e55550991e0927 (patch) | |
tree | b9dca15454f9027d5a9dad0d4443a20de04dbc5d /src/home/soundsensor/node.py | |
parent | b7cbc2571c1870b4582ead45277d0aa7f961bec8 (diff) | |
parent | bdbb296697f55f4c3a07af43c9aaf7a9ea86f3d0 (diff) |
Merge branch 'master' of ch1p.io:homekit
Diffstat (limited to 'src/home/soundsensor/node.py')
-rw-r--r-- | src/home/soundsensor/node.py | 75 |
1 files changed, 0 insertions, 75 deletions
diff --git a/src/home/soundsensor/node.py b/src/home/soundsensor/node.py deleted file mode 100644 index 292452f..0000000 --- a/src/home/soundsensor/node.py +++ /dev/null @@ -1,75 +0,0 @@ -import logging -import threading - -from typing import Optional -from time import sleep -from ..util import stringify, send_datagram, Addr - -from pyA20.gpio import gpio -from pyA20.gpio import port as gpioport - -logger = logging.getLogger(__name__) - - -class SoundSensorNode: - def __init__(self, - name: str, - pinname: str, - server_addr: Optional[Addr], - threshold: int = 1, - delay=0.005): - - if not hasattr(gpioport, pinname): - raise ValueError(f'invalid pin {pinname}') - - self.pin = getattr(gpioport, pinname) - self.name = name - self.delay = delay - self.threshold = threshold - - self.server_addr = server_addr - - self.hits = 0 - self.hitlock = threading.Lock() - - self.interrupted = False - - def run(self): - try: - t = threading.Thread(target=self.sensor_reader) - t.daemon = True - t.start() - - while True: - with self.hitlock: - hits = self.hits - self.hits = 0 - - if hits >= self.threshold: - try: - if self.server_addr is not None: - send_datagram(stringify([self.name, hits]), self.server_addr) - else: - logger.debug(f'server reporting disabled, skipping reporting {hits} hits') - except OSError as exc: - logger.exception(exc) - - sleep(1) - - except (KeyboardInterrupt, SystemExit) as e: - self.interrupted = True - logger.info(str(e)) - - def sensor_reader(self): - gpio.init() - gpio.setcfg(self.pin, gpio.INPUT) - gpio.pullup(self.pin, gpio.PULLUP) - - while not self.interrupted: - state = gpio.input(self.pin) - sleep(self.delay) - - if not state: - with self.hitlock: - logger.debug('got a hit') - self.hits += 1 |