blob: 7d8e2e34c660df0c070297cd625ee2198bb0ab41 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
import abc
import smbus
from .base import BaseSensor, SensorType
class I2CSensor(BaseSensor, abc.ABC):
def __init__(self, bus: int):
super().__init__()
self.bus = smbus.SMBus(bus)
class DHT12(I2CSensor):
i2c_addr = 0x5C
def _measure(self):
raw = self.bus.read_i2c_block_data(self.i2c_addr, 0, 5)
if (raw[0] + raw[1] + raw[2] + raw[3]) & 0xff != raw[4]:
raise ValueError("checksum error")
return raw
def temperature(self) -> float:
raw = self._measure()
temp = raw[2] + (raw[3] & 0x7f) * 0.1
if raw[3] & 0x80:
temp *= -1
return temp
def humidity(self) -> float:
raw = self._measure()
return raw[0] + raw[1] * 0.1
class Si7021(I2CSensor):
i2c_addr = 0x40
def temperature(self) -> float:
raw = self.bus.read_i2c_block_data(self.i2c_addr, 0xE3, 2)
return 175.72 * (raw[0] << 8 | raw[1]) / 65536.0 - 46.85
def humidity(self) -> float:
raw = self.bus.read_i2c_block_data(self.i2c_addr, 0xE5, 2)
return 125.0 * (raw[0] << 8 | raw[1]) / 65536.0 - 6.0
def create_sensor(type: SensorType, bus: int) -> BaseSensor:
if type == SensorType.Si7021:
return Si7021(bus)
elif type == SensorType.DHT12:
return DHT12(bus)
else:
raise ValueError('unexpected sensor type')
|