1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
import asyncio
import json
import logging
import threading
from aiohttp import web
from ..database.sqlite import SQLiteBase
from ..config import config
from .. import http
from typing import Type
from ..util import Addr
logger = logging.getLogger(__name__)
class SoundSensorHitHandler(asyncio.DatagramProtocol):
def datagram_received(self, data, addr):
try:
data = json.loads(data)
except json.JSONDecodeError as e:
logger.error('failed to parse json datagram')
logger.exception(e)
return
try:
name, hits = data
except (ValueError, IndexError) as e:
logger.error('failed to unpack data')
logger.exception(e)
return
self.handler(name, hits)
def handler(self, name: str, hits: int):
pass
class Database(SQLiteBase):
SCHEMA = 1
def __init__(self):
super().__init__(dbname='sound_sensor_server')
def schema_init(self, version: int) -> None:
cursor = self.cursor()
if version < 1:
cursor.execute("CREATE TABLE IF NOT EXISTS status (guard_enabled INTEGER NOT NULL)")
cursor.execute("INSERT INTO status (guard_enabled) VALUES (-1)")
self.commit()
def get_guard_enabled(self) -> int:
cur = self.cursor()
cur.execute("SELECT guard_enabled FROM status LIMIT 1")
return int(cur.fetchone()[0])
def set_guard_enabled(self, enabled: bool) -> None:
cur = self.cursor()
cur.execute("UPDATE status SET guard_enabled=?", (int(enabled),))
self.commit()
class SoundSensorServer:
def __init__(self,
addr: Addr,
handler_impl: Type[SoundSensorHitHandler]):
self.addr = addr
self.impl = handler_impl
self.db = Database()
self._recording_lock = threading.Lock()
self._recording_enabled = True
if self.guard_control_enabled():
current_status = self.db.get_guard_enabled()
if current_status == -1:
self.set_recording(config['server']['guard_recording_default']
if 'guard_recording_default' in config['server']
else False,
update=False)
else:
self.set_recording(bool(current_status), update=False)
@staticmethod
def guard_control_enabled() -> bool:
return 'guard_control' in config['server'] and config['server']['guard_control'] is True
def set_recording(self, enabled: bool, update=True):
with self._recording_lock:
self._recording_enabled = enabled
if update:
self.db.set_guard_enabled(enabled)
def is_recording_enabled(self) -> bool:
with self._recording_lock:
return self._recording_enabled
def run(self):
if self.guard_control_enabled():
t = threading.Thread(target=self.run_guard_server)
t.daemon = True
t.start()
loop = asyncio.get_event_loop()
t = loop.create_datagram_endpoint(self.impl, local_addr=self.addr)
loop.run_until_complete(t)
loop.run_forever()
def run_guard_server(self):
routes = web.RouteTableDef()
@routes.post('/guard/enable')
async def guard_enable(request):
self.set_recording(True)
return http.ajax_ok()
@routes.post('/guard/disable')
async def guard_disable(request):
self.set_recording(False)
return http.ajax_ok()
@routes.get('/guard/status')
async def guard_status(request):
return http.ajax_ok({'enabled': self.is_recording_enabled()})
asyncio.set_event_loop(asyncio.new_event_loop()) # need to create new event loop in new thread
http.serve(self.addr, handle_signals=False) # handle_signals=True doesn't work in separate thread
|