diff options
Diffstat (limited to 'mqtt_message.py')
-rw-r--r-- | mqtt_message.py | 273 |
1 files changed, 0 insertions, 273 deletions
diff --git a/mqtt_message.py b/mqtt_message.py deleted file mode 100644 index 6efbd83..0000000 --- a/mqtt_message.py +++ /dev/null @@ -1,273 +0,0 @@ -from mitmproxy.utils import strutils -from mitmproxy import ctx -from mitmproxy import tcp - -import struct - - -class MQTTControlPacket: - # Packet types - ( - CONNECT, - CONNACK, - PUBLISH, - PUBACK, - PUBREC, - PUBREL, - PUBCOMP, - SUBSCRIBE, - SUBACK, - UNSUBSCRIBE, - UNSUBACK, - PINGREQ, - PINGRESP, - DISCONNECT, - ) = range(1, 15) - - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.1_- - Names = [ - "reserved", - "CONNECT", - "CONNACK", - "PUBLISH", - "PUBACK", - "PUBREC", - "PUBREL", - "PUBCOMP", - "SUBSCRIBE", - "SUBACK", - "UNSUBSCRIBE", - "UNSUBACK", - "PINGREQ", - "PINGRESP", - "DISCONNECT", - "reserved", - ] - - PACKETS_WITH_IDENTIFIER = [ - PUBACK, - PUBREC, - PUBREL, - PUBCOMP, - SUBSCRIBE, - SUBACK, - UNSUBSCRIBE, - UNSUBACK, - ] - - def __init__(self, packet): - self._packet = packet - # Fixed header - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718020 - self.packet_type = self._parse_packet_type() - self.packet_type_human = self.Names[self.packet_type] - self.dup, self.qos, self.retain = self._parse_flags() - self.remaining_length = self._parse_remaining_length() - # Variable header & Payload - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718024 - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718026 - if self.packet_type == self.CONNECT: - self._parse_connect_variable_headers() - self._parse_connect_payload() - elif self.packet_type == self.PUBLISH: - self._parse_publish_variable_headers() - self._parse_publish_payload() - elif self.packet_type == self.SUBSCRIBE: - self._parse_subscribe_variable_headers() - self._parse_subscribe_payload() - elif self.packet_type == self.SUBACK: - pass - elif self.packet_type == self.UNSUBSCRIBE: - pass - else: - self.payload = None - - def pprint(self): - s = f"[{self.Names[self.packet_type]}]" - - if self.packet_type == self.CONNECT: - s += f""" - -Client Id: {self.payload['ClientId']} -Will Topic: {self.payload.get('WillTopic')} -Will Message: {strutils.bytes_to_escaped_str(self.payload.get('WillMessage', b'None'))} -User Name: {self.payload.get('UserName')} -Password: {strutils.bytes_to_escaped_str(self.payload.get('Password', b'None'))} -""" - elif self.packet_type == self.SUBSCRIBE: - s += " sent topic filters: " - s += ", ".join([f"'{tf}'" for tf in self.topic_filters]) - elif self.packet_type == self.PUBLISH: - topic_name = strutils.bytes_to_escaped_str(self.topic_name) - payload = strutils.bytes_to_escaped_str(self.payload) - - s += f" '{payload}' to topic '{topic_name}'" - elif self.packet_type in [self.PINGREQ, self.PINGRESP]: - pass - else: - s = f"Packet type {self.Names[self.packet_type]} is not supported yet!" - - return s - - def _parse_length_prefixed_bytes(self, offset): - field_length_bytes = self._packet[offset : offset + 2] - field_length = struct.unpack("!H", field_length_bytes)[0] - - field_content_bytes = self._packet[offset + 2 : offset + 2 + field_length] - - return field_length + 2, field_content_bytes - - def _parse_publish_variable_headers(self): - offset = len(self._packet) - self.remaining_length - - field_length, field_content_bytes = self._parse_length_prefixed_bytes(offset) - self.topic_name = field_content_bytes - - if self.qos in [0x01, 0x02]: - offset += field_length - self.packet_identifier = self._packet[offset : offset + 2] - - def _parse_publish_payload(self): - fixed_header_length = len(self._packet) - self.remaining_length - variable_header_length = 2 + len(self.topic_name) - - if self.qos in [0x01, 0x02]: - variable_header_length += 2 - - offset = fixed_header_length + variable_header_length - - self.payload = self._packet[offset:] - - def _parse_subscribe_variable_headers(self): - self._parse_packet_identifier() - - def _parse_subscribe_payload(self): - offset = len(self._packet) - self.remaining_length + 2 - - self.topic_filters = {} - - while len(self._packet) - offset > 0: - field_length, topic_filter_bytes = self._parse_length_prefixed_bytes(offset) - offset += field_length - - qos = self._packet[offset : offset + 1] - offset += 1 - - topic_filter = topic_filter_bytes.decode("utf-8") - self.topic_filters[topic_filter] = {"qos": qos} - - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030 - def _parse_connect_variable_headers(self): - offset = len(self._packet) - self.remaining_length - - self.variable_headers = {} - self.connect_flags = {} - - self.variable_headers["ProtocolName"] = self._packet[offset : offset + 6] - self.variable_headers["ProtocolLevel"] = self._packet[offset + 6 : offset + 7] - self.variable_headers["ConnectFlags"] = self._packet[offset + 7 : offset + 8] - self.variable_headers["KeepAlive"] = self._packet[offset + 8 : offset + 10] - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349229 - self.connect_flags["CleanSession"] = bool( - self.variable_headers["ConnectFlags"][0] & 0x02 - ) - self.connect_flags["Will"] = bool( - self.variable_headers["ConnectFlags"][0] & 0x04 - ) - self.will_qos = (self.variable_headers["ConnectFlags"][0] >> 3) & 0x03 - self.connect_flags["WillRetain"] = bool( - self.variable_headers["ConnectFlags"][0] & 0x20 - ) - self.connect_flags["Password"] = bool( - self.variable_headers["ConnectFlags"][0] & 0x40 - ) - self.connect_flags["UserName"] = bool( - self.variable_headers["ConnectFlags"][0] & 0x80 - ) - - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031 - def _parse_connect_payload(self): - fields = [] - offset = len(self._packet) - self.remaining_length + 10 - - while len(self._packet) - offset > 0: - field_length, field_content = self._parse_length_prefixed_bytes(offset) - fields.append(field_content) - offset += field_length - - self.payload = {} - - for f in fields: - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349242 - if "ClientId" not in self.payload: - self.payload["ClientId"] = f.decode("utf-8") - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349243 - elif self.connect_flags["Will"] and "WillTopic" not in self.payload: - self.payload["WillTopic"] = f.decode("utf-8") - elif self.connect_flags["Will"] and "WillMessage" not in self.payload: - self.payload["WillMessage"] = f - elif self.connect_flags["UserName"] and "UserName" not in self.payload: - self.payload["UserName"] = f.decode("utf-8") - elif self.connect_flags["Password"] and "Password" not in self.payload: - self.payload["Password"] = f - else: - raise Exception("") - - def _parse_packet_type(self): - return self._packet[0] >> 4 - - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022 - def _parse_flags(self): - dup = None - qos = None - retain = None - - if self.packet_type == self.PUBLISH: - dup = (self._packet[0] >> 3) & 0x01 - qos = (self._packet[0] >> 1) & 0x03 - retain = self._packet[0] & 0x01 - - return dup, qos, retain - - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.4_Size - def _parse_remaining_length(self): - multiplier = 1 - value = 0 - i = 1 - - while True: - encodedByte = self._packet[i] - value += (encodedByte & 127) * multiplier - multiplier *= 128 - - if multiplier > 128 * 128 * 128: - raise Exception("Malformed Remaining Length") - - if encodedByte & 128 == 0: - break - - i += 1 - - return value - - # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.5_- - def _parse_packet_identifier(self): - offset = len(self._packet) - self.remaining_length - self.packet_identifier = self._packet[offset : offset + 2] - - -def tcp_message(flow: tcp.TCPFlow): - message = flow.messages[-1] - - mqtt_packet = MQTTControlPacket(message.content) - - log_message = mqtt_packet.pprint() - ctx.log.info(log_message) - - # This way we can save topics - # if mqtt_packet.packet_type == mqtt_packet.PUBLISH: - # with open("topics.txt", "a") as f: - # f.write(f"{mqtt_packet.topic_name}\n") - # elif mqtt_packet.packet_type == mqtt_packet.SUBSCRIBE: - # with open("topics.txt", "a") as f: - # f.write(f"{mqtt_packet.topic_filters}\n") |