diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2023-05-31 09:22:00 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2023-06-03 01:01:27 +0300 |
commit | 0f0a5fd44867b684bcbafd102b6b769ae61229b4 (patch) | |
tree | d4845bd3295159c06041b25753af4eb2d7b59461 /src/home/mqtt/_payload.py | |
parent | 3e3753d726f8a02d98368f20f77dd9fa739e3d80 (diff) |
wip
Diffstat (limited to 'src/home/mqtt/_payload.py')
-rw-r--r-- | src/home/mqtt/_payload.py | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/src/home/mqtt/_payload.py b/src/home/mqtt/_payload.py new file mode 100644 index 0000000..58eeae3 --- /dev/null +++ b/src/home/mqtt/_payload.py @@ -0,0 +1,145 @@ +import struct +import abc +import re + +from typing import Optional, Tuple + + +def pldstr(self) -> str: + attrs = [] + for field in self.__class__.__annotations__: + if hasattr(self, field): + attr = getattr(self, field) + attrs.append(f'{field}={attr}') + if attrs: + attrs_s = ' ' + attrs_s += ', '.join(attrs) + else: + attrs_s = '' + return f'<%s{attrs_s}>' % (self.__class__.__name__,) + + +class MqttPayload(abc.ABC): + FORMAT = '' + PACKER = {} + UNPACKER = {} + + def __init__(self, **kwargs): + for field in self.__class__.__annotations__: + setattr(self, field, kwargs[field]) + + def pack(self): + args = [] + bf_number = -1 + bf_arg = 0 + bf_progress = 0 + + for field, field_type in self.__class__.__annotations__.items(): + bfp = _bit_field_params(field_type) + if bfp: + n, s, b = bfp + if n != bf_number: + if bf_number != -1: + args.append(bf_arg) + bf_number = n + bf_progress = 0 + bf_arg = 0 + bf_arg |= (getattr(self, field) & (2 ** b - 1)) << bf_progress + bf_progress += b + + else: + if bf_number != -1: + args.append(bf_arg) + bf_number = -1 + bf_progress = 0 + bf_arg = 0 + + args.append(self._pack_field(field)) + + if bf_number != -1: + args.append(bf_arg) + + return struct.pack(self.FORMAT, *args) + + @classmethod + def unpack(cls, buf: bytes): + data = struct.unpack(cls.FORMAT, buf) + kwargs = {} + i = 0 + bf_number = -1 + bf_progress = 0 + + for field, field_type in cls.__annotations__.items(): + bfp = _bit_field_params(field_type) + if bfp: + n, s, b = bfp + if n != bf_number: + bf_number = n + bf_progress = 0 + kwargs[field] = (data[i] >> bf_progress) & (2 ** b - 1) + bf_progress += b + continue # don't increment i + + if bf_number != -1: + bf_number = -1 + i += 1 + + if issubclass(field_type, MqttPayloadCustomField): + kwargs[field] = field_type.unpack(data[i]) + else: + kwargs[field] = cls._unpack_field(field, data[i]) + i += 1 + + return cls(**kwargs) + + def _pack_field(self, name): + val = getattr(self, name) + if self.PACKER and name in self.PACKER: + return self.PACKER[name](val) + else: + return val + + @classmethod + def _unpack_field(cls, name, val): + if isinstance(val, MqttPayloadCustomField): + return + if cls.UNPACKER and name in cls.UNPACKER: + return cls.UNPACKER[name](val) + else: + return val + + def __str__(self): + return pldstr(self) + + +class MqttPayloadCustomField(abc.ABC): + def __init__(self, **kwargs): + for field in self.__class__.__annotations__: + setattr(self, field, kwargs[field]) + + @abc.abstractmethod + def __index__(self): + pass + + @classmethod + @abc.abstractmethod + def unpack(cls, *args, **kwargs): + pass + + def __str__(self): + return pldstr(self) + + +def bit_field(seq_no: int, total_bits: int, bits: int): + return type(f'MQTTPayloadBitField_{seq_no}_{total_bits}_{bits}', (object,), { + 'seq_no': seq_no, + 'total_bits': total_bits, + 'bits': bits + }) + + +def _bit_field_params(cl) -> Optional[Tuple[int, ...]]: + match = re.match(r'MQTTPayloadBitField_(\d+)_(\d+)_(\d)$', cl.__name__) + if match is not None: + return tuple([int(match.group(i)) for i in range(1, 4)]) + return None
\ No newline at end of file |