aboutsummaryrefslogtreecommitdiff
path: root/mqtt_message.py
diff options
context:
space:
mode:
Diffstat (limited to 'mqtt_message.py')
-rw-r--r--mqtt_message.py273
1 files changed, 0 insertions, 273 deletions
diff --git a/mqtt_message.py b/mqtt_message.py
deleted file mode 100644
index 6efbd83..0000000
--- a/mqtt_message.py
+++ /dev/null
@@ -1,273 +0,0 @@
-from mitmproxy.utils import strutils
-from mitmproxy import ctx
-from mitmproxy import tcp
-
-import struct
-
-
-class MQTTControlPacket:
- # Packet types
- (
- CONNECT,
- CONNACK,
- PUBLISH,
- PUBACK,
- PUBREC,
- PUBREL,
- PUBCOMP,
- SUBSCRIBE,
- SUBACK,
- UNSUBSCRIBE,
- UNSUBACK,
- PINGREQ,
- PINGRESP,
- DISCONNECT,
- ) = range(1, 15)
-
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.1_-
- Names = [
- "reserved",
- "CONNECT",
- "CONNACK",
- "PUBLISH",
- "PUBACK",
- "PUBREC",
- "PUBREL",
- "PUBCOMP",
- "SUBSCRIBE",
- "SUBACK",
- "UNSUBSCRIBE",
- "UNSUBACK",
- "PINGREQ",
- "PINGRESP",
- "DISCONNECT",
- "reserved",
- ]
-
- PACKETS_WITH_IDENTIFIER = [
- PUBACK,
- PUBREC,
- PUBREL,
- PUBCOMP,
- SUBSCRIBE,
- SUBACK,
- UNSUBSCRIBE,
- UNSUBACK,
- ]
-
- def __init__(self, packet):
- self._packet = packet
- # Fixed header
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718020
- self.packet_type = self._parse_packet_type()
- self.packet_type_human = self.Names[self.packet_type]
- self.dup, self.qos, self.retain = self._parse_flags()
- self.remaining_length = self._parse_remaining_length()
- # Variable header & Payload
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718024
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718026
- if self.packet_type == self.CONNECT:
- self._parse_connect_variable_headers()
- self._parse_connect_payload()
- elif self.packet_type == self.PUBLISH:
- self._parse_publish_variable_headers()
- self._parse_publish_payload()
- elif self.packet_type == self.SUBSCRIBE:
- self._parse_subscribe_variable_headers()
- self._parse_subscribe_payload()
- elif self.packet_type == self.SUBACK:
- pass
- elif self.packet_type == self.UNSUBSCRIBE:
- pass
- else:
- self.payload = None
-
- def pprint(self):
- s = f"[{self.Names[self.packet_type]}]"
-
- if self.packet_type == self.CONNECT:
- s += f"""
-
-Client Id: {self.payload['ClientId']}
-Will Topic: {self.payload.get('WillTopic')}
-Will Message: {strutils.bytes_to_escaped_str(self.payload.get('WillMessage', b'None'))}
-User Name: {self.payload.get('UserName')}
-Password: {strutils.bytes_to_escaped_str(self.payload.get('Password', b'None'))}
-"""
- elif self.packet_type == self.SUBSCRIBE:
- s += " sent topic filters: "
- s += ", ".join([f"'{tf}'" for tf in self.topic_filters])
- elif self.packet_type == self.PUBLISH:
- topic_name = strutils.bytes_to_escaped_str(self.topic_name)
- payload = strutils.bytes_to_escaped_str(self.payload)
-
- s += f" '{payload}' to topic '{topic_name}'"
- elif self.packet_type in [self.PINGREQ, self.PINGRESP]:
- pass
- else:
- s = f"Packet type {self.Names[self.packet_type]} is not supported yet!"
-
- return s
-
- def _parse_length_prefixed_bytes(self, offset):
- field_length_bytes = self._packet[offset : offset + 2]
- field_length = struct.unpack("!H", field_length_bytes)[0]
-
- field_content_bytes = self._packet[offset + 2 : offset + 2 + field_length]
-
- return field_length + 2, field_content_bytes
-
- def _parse_publish_variable_headers(self):
- offset = len(self._packet) - self.remaining_length
-
- field_length, field_content_bytes = self._parse_length_prefixed_bytes(offset)
- self.topic_name = field_content_bytes
-
- if self.qos in [0x01, 0x02]:
- offset += field_length
- self.packet_identifier = self._packet[offset : offset + 2]
-
- def _parse_publish_payload(self):
- fixed_header_length = len(self._packet) - self.remaining_length
- variable_header_length = 2 + len(self.topic_name)
-
- if self.qos in [0x01, 0x02]:
- variable_header_length += 2
-
- offset = fixed_header_length + variable_header_length
-
- self.payload = self._packet[offset:]
-
- def _parse_subscribe_variable_headers(self):
- self._parse_packet_identifier()
-
- def _parse_subscribe_payload(self):
- offset = len(self._packet) - self.remaining_length + 2
-
- self.topic_filters = {}
-
- while len(self._packet) - offset > 0:
- field_length, topic_filter_bytes = self._parse_length_prefixed_bytes(offset)
- offset += field_length
-
- qos = self._packet[offset : offset + 1]
- offset += 1
-
- topic_filter = topic_filter_bytes.decode("utf-8")
- self.topic_filters[topic_filter] = {"qos": qos}
-
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030
- def _parse_connect_variable_headers(self):
- offset = len(self._packet) - self.remaining_length
-
- self.variable_headers = {}
- self.connect_flags = {}
-
- self.variable_headers["ProtocolName"] = self._packet[offset : offset + 6]
- self.variable_headers["ProtocolLevel"] = self._packet[offset + 6 : offset + 7]
- self.variable_headers["ConnectFlags"] = self._packet[offset + 7 : offset + 8]
- self.variable_headers["KeepAlive"] = self._packet[offset + 8 : offset + 10]
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349229
- self.connect_flags["CleanSession"] = bool(
- self.variable_headers["ConnectFlags"][0] & 0x02
- )
- self.connect_flags["Will"] = bool(
- self.variable_headers["ConnectFlags"][0] & 0x04
- )
- self.will_qos = (self.variable_headers["ConnectFlags"][0] >> 3) & 0x03
- self.connect_flags["WillRetain"] = bool(
- self.variable_headers["ConnectFlags"][0] & 0x20
- )
- self.connect_flags["Password"] = bool(
- self.variable_headers["ConnectFlags"][0] & 0x40
- )
- self.connect_flags["UserName"] = bool(
- self.variable_headers["ConnectFlags"][0] & 0x80
- )
-
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031
- def _parse_connect_payload(self):
- fields = []
- offset = len(self._packet) - self.remaining_length + 10
-
- while len(self._packet) - offset > 0:
- field_length, field_content = self._parse_length_prefixed_bytes(offset)
- fields.append(field_content)
- offset += field_length
-
- self.payload = {}
-
- for f in fields:
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349242
- if "ClientId" not in self.payload:
- self.payload["ClientId"] = f.decode("utf-8")
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349243
- elif self.connect_flags["Will"] and "WillTopic" not in self.payload:
- self.payload["WillTopic"] = f.decode("utf-8")
- elif self.connect_flags["Will"] and "WillMessage" not in self.payload:
- self.payload["WillMessage"] = f
- elif self.connect_flags["UserName"] and "UserName" not in self.payload:
- self.payload["UserName"] = f.decode("utf-8")
- elif self.connect_flags["Password"] and "Password" not in self.payload:
- self.payload["Password"] = f
- else:
- raise Exception("")
-
- def _parse_packet_type(self):
- return self._packet[0] >> 4
-
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022
- def _parse_flags(self):
- dup = None
- qos = None
- retain = None
-
- if self.packet_type == self.PUBLISH:
- dup = (self._packet[0] >> 3) & 0x01
- qos = (self._packet[0] >> 1) & 0x03
- retain = self._packet[0] & 0x01
-
- return dup, qos, retain
-
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.4_Size
- def _parse_remaining_length(self):
- multiplier = 1
- value = 0
- i = 1
-
- while True:
- encodedByte = self._packet[i]
- value += (encodedByte & 127) * multiplier
- multiplier *= 128
-
- if multiplier > 128 * 128 * 128:
- raise Exception("Malformed Remaining Length")
-
- if encodedByte & 128 == 0:
- break
-
- i += 1
-
- return value
-
- # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.5_-
- def _parse_packet_identifier(self):
- offset = len(self._packet) - self.remaining_length
- self.packet_identifier = self._packet[offset : offset + 2]
-
-
-def tcp_message(flow: tcp.TCPFlow):
- message = flow.messages[-1]
-
- mqtt_packet = MQTTControlPacket(message.content)
-
- log_message = mqtt_packet.pprint()
- ctx.log.info(log_message)
-
- # This way we can save topics
- # if mqtt_packet.packet_type == mqtt_packet.PUBLISH:
- # with open("topics.txt", "a") as f:
- # f.write(f"{mqtt_packet.topic_name}\n")
- # elif mqtt_packet.packet_type == mqtt_packet.SUBSCRIBE:
- # with open("topics.txt", "a") as f:
- # f.write(f"{mqtt_packet.topic_filters}\n")