From 0023b7f79922141cd071466c0602a7832a8be80e Mon Sep 17 00:00:00 2001 From: Nikita Stupin <18281368+nikitastupin@users.noreply.github.com> Date: Wed, 22 Apr 2020 22:35:31 +0300 Subject: Init --- README.md | 23 +++++ mqtt_message.py | 273 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 296 insertions(+) create mode 100644 README.md create mode 100644 mqtt_message.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..2684b09 --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +This script is ad-hoc solution for inspecting **MQTT over TLS** traffic. As far as I know Wireshark solves this problem but obtaining TLS master keys from IoT device might be a pain. + +## Usage + +Run mitmproxy as `mitmproxy --mode transparent --tcp '.*' -s mqtt_message.py`. Messages will be displayed at the event log (press `shift + e`). + +Of course before that you have to prepare a target device and your host running mitmproxy: +* Install mitmproxy's root certificate on a target device. +* Route device's traffic to mitmproxy. See https://docs.mitmproxy.org/stable/ for the details. +* https://docs.mitmproxy.org/stable/howto-transparent/. + +If server requires x509 client authentication `--set client_certs=cert.pem` mitmproxy's option might be useful. + +## Roadmap + +- [ ] [Add support for non-HTTP flows to the UI](https://github.com/mitmproxy/mitmproxy/issues/1020). +- [ ] Implement parsing of MQTT packet types other than `CONNECT`, `PUBLISH` and `SUBSCRIBE`. +- [ ] Add support for MQTT to mitmproxy including interception, modification and replay. + +## Credits + +* https://github.com/mitmproxy/mitmproxy/blob/master/examples/complex/tcp_message_buffer.py +* https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/client.py diff --git a/mqtt_message.py b/mqtt_message.py new file mode 100644 index 0000000..846c823 --- /dev/null +++ b/mqtt_message.py @@ -0,0 +1,273 @@ +from mitmproxy.utils import strutils +from mitmproxy import ctx +from mitmproxy import tcp + +import struct + + +class MQTTControlPacket: + # Packet types + ( + CONNECT, + CONNACK, + PUBLISH, + PUBACK, + PUBREC, + PUBREL, + PUBCOMP, + SUBSCRIBE, + SUBACK, + UNSUBSCRIBE, + UNSUBACK, + PINGREQ, + PINGRESP, + DISCONNECT, + ) = range(1, 15) + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.1_- + Names = [ + "reserved", + "CONNECT", + "CONNACK", + "PUBLISH", + "PUBACK", + "PUBREC", + "PUBREL", + "PUBCOMP", + "SUBSCRIBE", + "SUBACK", + "UNSUBSCRIBE", + "UNSUBACK", + "PINGREQ", + "PINGRESP", + "DISCONNECT", + "reserved", + ] + + PACKETS_WITH_IDENTIFIER = [ + PUBACK, + PUBREC, + PUBREL, + PUBCOMP, + SUBSCRIBE, + SUBACK, + UNSUBSCRIBE, + UNSUBACK, + ] + + def __init__(self, packet): + self._packet = packet + # Fixed header + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718020 + self.packet_type = self._parse_packet_type() + self.packet_type_human = self.Names[self.packet_type] + self.dup, self.qos, self.retain = self._parse_flags() + self.remaining_length = self._parse_remaining_length() + # Variable header & Payload + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718024 + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718026 + if self.packet_type == self.CONNECT: + self._parse_connect_variable_headers() + self._parse_connect_payload() + elif self.packet_type == self.PUBLISH: + self._parse_publish_variable_headers() + self._parse_publish_payload() + elif self.packet_type == self.SUBSCRIBE: + self._parse_subscribe_variable_headers() + self._parse_subscribe_payload() + elif self.packet_type == self.SUBACK: + pass + elif self.packet_type == self.UNSUBSCRIBE: + pass + else: + self.payload = None + + def pprint(self): + s = f"[{self.Names[self.packet_type]}]" + + if self.packet_type == self.CONNECT: + s += f""" + +Client Id: {self.payload['ClientId']} +Will Topic: {self.payload.get('WillTopic')} +Will Message: {strutils.bytes_to_escaped_str(self.payload.get('WillMessage', b'None'))} +User Name: {self.payload.get('UserName')} +Password: {strutils.bytes_to_escaped_str(self.payload.get('Password', b'None'))} +""" + elif self.packet_type == self.SUBSCRIBE: + s += " sent topic filters: " + s += ", ".join([f"'{tf}'" for tf in self.topic_filters]) + elif self.packet_type == self.PUBLISH: + topic_name = strutils.bytes_to_escaped_str(self.topic_name) + payload = strutils.bytes_to_escaped_str(self.payload) + + s += f" '{payload}' to topic '{topic_name}'" + elif self.packet_type in [self.PINGREQ, self.PINGRESP]: + pass + else: + s = f"Packet type {self.Names[self.packet_type]} is not supported yet!" + + return s + + def _parse_length_prefixed_bytes(self, offset): + field_length_bytes = self._packet[offset : offset + 2] + field_length = struct.unpack("!H", field_length_bytes)[0] + + field_content_bytes = self._packet[offset + 2 : offset + 2 + field_length] + + return field_length + 2, field_content_bytes + + def _parse_publish_variable_headers(self): + offset = len(self._packet) - self.remaining_length + + field_length, field_content_bytes = self._parse_length_prefixed_bytes(offset) + self.topic_name = field_content_bytes + + if self.qos in [0x01, 0x02]: + offset += field_length + self.packet_identifier = self._packet[offset : offset + 2] + + def _parse_publish_payload(self): + fixed_header_length = len(self._packet) - self.remaining_length + variable_header_length = 2 + len(self.topic_name) + + if self.qos in [0x01, 0x02]: + variable_header_length += 2 + + offset = fixed_header_length + variable_header_length + + self.payload = self._packet[offset:] + + def _parse_subscribe_variable_headers(self): + self._parse_packet_identifier() + + def _parse_subscribe_payload(self): + offset = len(self._packet) - self.remaining_length + 2 + + self.topic_filters = {} + + while len(self._packet) - offset > 0: + field_length, topic_filter_bytes = self._parse_length_prefixed_bytes(offset) + offset += field_length + + qos = self._packet[offset : offset + 1] + offset += 1 + + topic_filter = topic_filter_bytes.decode("utf-8") + self.topic_filters[topic_filter] = {"qos": qos} + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030 + def _parse_connect_variable_headers(self): + offset = len(self._packet) - self.remaining_length + + self.variable_headers = {} + self.connect_flags = {} + + self.variable_headers["ProtocolName"] = self._packet[offset : offset + 6] + self.variable_headers["ProtocolLevel"] = self._packet[offset + 6 : offset + 7] + self.variable_headers["ConnectFlags"] = self._packet[offset + 7 : offset + 8] + self.variable_headers["KeepAlive"] = self._packet[offset + 8 : offset + 10] + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349229 + self.connect_flags["CleanSession"] = bool( + self.variable_headers["ConnectFlags"][0] & 0x02 + ) + self.connect_flags["Will"] = bool( + self.variable_headers["ConnectFlags"][0] & 0x04 + ) + self.will_qos = (self.variable_headers["ConnectFlags"][0] >> 3) & 0x03 + self.connect_flags["WillRetain"] = bool( + self.variable_headers["ConnectFlags"][0] & 0x20 + ) + self.connect_flags["Password"] = bool( + self.variable_headers["ConnectFlags"][0] & 0x40 + ) + self.connect_flags["UserName"] = bool( + self.variable_headers["ConnectFlags"][0] & 0x80 + ) + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031 + def _parse_connect_payload(self): + fields = [] + offset = len(self._packet) - self.remaining_length + 10 + + while len(self._packet) - offset > 0: + field_length, field_content = self._parse_length_prefixed_bytes(offset) + fields.append(field_content) + offset += field_length + + self.payload = {} + + for f in fields: + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349242 + if "ClientId" not in self.payload: + self.payload["ClientId"] = f.decode("utf-8") + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349243 + elif self.connect_flags["Will"] and "WillTopic" not in self.payload: + self.payload["WillTopic"] = f.decode("utf-8") + elif self.connect_flags["Will"] and "WillMessage" not in self.payload: + self.payload["WillMessage"] = f + elif self.connect_flags["UserName"] and "UserName" not in self.payload: + self.payload["UserName"] = f.decode("utf-8") + elif self.connect_flags["Password"] and "Password" not in self.payload: + self.payload["Password"] = f + else: + raise Exception("") + + def _parse_packet_type(self): + return self._packet[0] >> 4 + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022 + def _parse_flags(self): + dup = None + qos = None + retain = None + + if self.packet_type == self.PUBLISH: + dup = (self._packet[0] >> 3) & 0x01 + qos = (self._packet[0] >> 1) & 0x03 + retain = self._packet[0] & 0x01 + + return dup, qos, retain + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.4_Size + def _parse_remaining_length(self): + multiplier = 1 + value = 0 + i = 1 + + while True: + encodedByte = self._packet[i] + value += (encodedByte & 127) * multiplier + multiplier *= 128 + + if multiplier > 128 * 128 * 128: + raise Exception("Malformed Remaining Length") + + if encodedByte & 128 == 0: + break + + i += 1 + + return value + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.5_- + def _parse_packet_identifier(self): + offset = len(self._packet) - self.remaining_length + self.packet_identifier = self._packet[offset : offset + 2] + + +def tcp_message(flow: tcp.TCPFlow): + message = flow.messages[-1] + + mqtt_packet = MQTTControlPacket(message.content) + + log_message = mqtt_packet.pprint() + ctx.log.info(log_message) + + # This way we can save topics + # if mqtt_message.packet_type == mqtt_message.PUBLISH: + # with open("topics.txt", "a") as f: + # f.write(f"{mqtt_message.topic_name}\n") + # elif mqtt_message.packet_type == mqtt_message.SUBSCRIBE: + # with open("topics.txt", "a") as f: + # f.write(f"{mqtt_message.topic_filters}\n") -- cgit v1.2.3