aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/msg_parser/msg_parser.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/msg_parser/msg_parser.py')
-rw-r--r--.venv/lib/python3.12/site-packages/msg_parser/msg_parser.py507
1 files changed, 507 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/msg_parser/msg_parser.py b/.venv/lib/python3.12/site-packages/msg_parser/msg_parser.py
new file mode 100644
index 00000000..c02f36af
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/msg_parser/msg_parser.py
@@ -0,0 +1,507 @@
+# -*- coding: utf-8 -*-
+# !/usr/bin/env python
+# Based on MS-OXMSG protocol specification
+# ref: https://blogs.msdn.microsoft.com/openspecification/2010/06/20/msg-file-format-rights-managed-email-message-part-2/
+# ref: https://msdn.microsoft.com/en-us/library/cc463912(v=EXCHG.80).aspx
+import email
+import os
+import re
+from pickle import dumps
+from struct import unpack
+
+from olefile import OleFileIO
+from olefile import isOleFile
+
+from .data_models import DataModel
+from .email_builder import EmailFormatter
+from .properties.ms_props_id_map import PROPS_ID_MAP
+
+TOP_LEVEL_HEADER_SIZE = 32
+RECIPIENT_HEADER_SIZE = 8
+ATTACHMENT_HEADER_SIZE = 8
+EMBEDDED_MSG_HEADER_SIZE = 24
+CONTROL_CHARS = re.compile(r"[\n\r\t]")
+
+
+class Message(object):
+ """
+ Class to store Message properties
+ """
+
+ def __init__(self, directory_entries):
+
+ self._streams = self._process_directory_entries(directory_entries)
+ self._data_model = DataModel()
+ self._nested_attachments_depth = 0
+ self.properties = self._get_properties()
+ self.attachments = self._get_attachments()
+ self.recipients = self._get_recipients()
+
+ def as_dict(self):
+ """
+ returns message attributes as a python dictionary.
+ :return: dict
+ """
+ message_dict = {"attachments": self.attachments, "recipients": self.recipients}
+ message_dict.update(self.properties)
+ return message_dict
+
+ def _set_property_stream_info(self, ole_file, header_size):
+ property_dir_entry = ole_file.openstream("__properties_version1.0")
+ version_stream_data = property_dir_entry.read()
+
+ if not version_stream_data:
+ raise Exception(
+ "Invalid MSG file provided, 'properties_version1.0' stream data is empty."
+ )
+
+ if version_stream_data:
+
+ if header_size >= EMBEDDED_MSG_HEADER_SIZE:
+
+ properties_metadata = unpack("8sIIII", version_stream_data[:24])
+ if not properties_metadata or not len(properties_metadata) >= 5:
+ raise Exception("'properties_version1.0' stream data is corrupted.")
+ self.next_recipient_id = properties_metadata[1]
+ self.next_attachment_id = properties_metadata[2]
+ self.recipient_count = properties_metadata[3]
+ self.attachment_count = properties_metadata[4]
+
+ if (len(version_stream_data) - header_size) % 16 != 0:
+ raise Exception(
+ "Property Stream size less header is not exactly divisible by 16"
+ )
+
+ self.property_entries_count = (len(version_stream_data) - header_size) / 16
+
+ @staticmethod
+ def _process_directory_entries(directory_entries):
+
+ streams = {"properties": {}, "recipients": {}, "attachments": {}}
+ for name, stream in directory_entries.items():
+ # collect properties
+ if "__substg1.0_" in name:
+ streams["properties"][name] = stream
+
+ # collect attachments
+ elif "__attach_" in name:
+ streams["attachments"][name] = stream.kids
+
+ # collect recipients
+ elif "__recip_" in name:
+ streams["recipients"][name] = stream.kids
+
+ # unknown stream name
+ else:
+ continue
+
+ return streams
+
+ def _get_properties(self):
+
+ directory_entries = self._streams.get("properties")
+ directory_name_filter = "__substg1.0_"
+ property_entries = {}
+ for directory_name, directory_entry in directory_entries.items():
+
+ if directory_name_filter not in directory_name:
+ continue
+
+ if not directory_entry:
+ continue
+
+ if isinstance(directory_entry, list):
+ directory_values = {}
+ for property_entry in directory_entry:
+ property_data = self._get_property_data(
+ directory_name, property_entry, is_list=True
+ )
+ if property_data:
+ directory_values.update(property_data)
+
+ property_entries[directory_name] = directory_values
+ else:
+ property_data = self._get_property_data(directory_name, directory_entry)
+ if property_data:
+ property_entries.update(property_data)
+ return property_entries
+
+ def _get_recipients(self):
+
+ directory_entries = self._streams.get("recipients")
+ directory_name_filter = "__recip_version1.0_"
+ recipient_entries = {}
+ for directory_name, directory_entry in directory_entries.items():
+
+ if directory_name_filter not in directory_name:
+ continue
+
+ if not directory_entry:
+ continue
+
+ if isinstance(directory_entry, list):
+ directory_values = {}
+ for property_entry in directory_entry:
+ property_data = self._get_property_data(
+ directory_name, property_entry, is_list=True
+ )
+ if property_data:
+ directory_values.update(property_data)
+
+ recipient_address = directory_values.get(
+ "EmailAddress", directory_values.get("SmtpAddress", directory_name)
+ )
+ recipient_entries[recipient_address] = directory_values
+ else:
+ property_data = self._get_property_data(directory_name, directory_entry)
+ if property_data:
+ recipient_entries.update(property_data)
+ return recipient_entries
+
+ def _get_attachments(self):
+ directory_entries = self._streams.get("attachments")
+ directory_name_filter = "__attach_version1.0_"
+ attachment_entries = {}
+ for directory_name, directory_entry in directory_entries.items():
+
+ if directory_name_filter not in directory_name:
+ continue
+
+ if not directory_entry:
+ continue
+
+ if isinstance(directory_entry, list):
+ directory_values = {}
+ for property_entry in directory_entry:
+
+ kids = property_entry.kids
+ if kids:
+ embedded_message = Message(property_entry.kids_dict)
+ directory_values["EmbeddedMessage"] = {
+ "properties": embedded_message.properties,
+ "recipients": embedded_message.recipients,
+ "attachments": embedded_message.attachments,
+ }
+
+ property_data = self._get_property_data(
+ directory_name, property_entry, is_list=True
+ )
+ if property_data:
+ directory_values.update(property_data)
+
+ attachment_entries[directory_name] = directory_values
+
+ else:
+ property_data = self._get_property_data(directory_name, directory_entry)
+ if property_data:
+ attachment_entries.update(property_data)
+ return attachment_entries
+
+ def _get_property_data(self, directory_name, directory_entry, is_list=False):
+ directory_entry_name = directory_entry.name
+ if is_list:
+ stream_name = [directory_name, directory_entry_name]
+ else:
+ stream_name = [directory_entry_name]
+
+ ole_file = directory_entry.olefile
+ property_details = self._get_canonical_property_name(directory_entry_name)
+ if not property_details:
+ return None
+
+ property_name = property_details.get("name")
+ property_type = property_details.get("data_type")
+ if not property_type:
+ return None
+
+ try:
+ raw_content = ole_file.openstream(stream_name).read()
+ except IOError:
+ raw_content = None
+ property_value = self._data_model.get_value(
+ raw_content, data_type=property_type
+ )
+
+ if property_value:
+ property_detail = {property_name: property_value}
+ else:
+ property_detail = None
+
+ return property_detail
+
+ @staticmethod
+ def _get_canonical_property_name(dir_entry_name):
+ if not dir_entry_name:
+ return None
+
+ if "__substg1.0_" in dir_entry_name:
+ name = dir_entry_name.replace("__substg1.0_", "")
+ prop_name_id = "0x" + name[0:4]
+ prop_details = PROPS_ID_MAP.get(prop_name_id)
+ return prop_details
+
+ return None
+
+ def __repr__(self):
+ return "Message [%s]" % self.properties.get(
+ "InternetMessageId", self.properties.get("Subject")
+ )
+
+
+class Recipient(object):
+ """
+ class to store recipient attributes
+ """
+
+ def __init__(self, recipients_properties):
+ self.AddressType = recipients_properties.get("AddressType")
+ self.Account = recipients_properties.get("Account")
+ self.EmailAddress = recipients_properties.get("SmtpAddress")
+ self.DisplayName = recipients_properties.get("DisplayName")
+ self.ObjectType = recipients_properties.get("ObjectType")
+ self.RecipientType = recipients_properties.get("RecipientType")
+
+ def __repr__(self):
+ return "%s (%s)" % (self.DisplayName, self.EmailAddress)
+
+
+class Attachment(object):
+ """
+ class to store attachment attributes
+ """
+
+ def __init__(self, attachment_properties):
+
+ self.DisplayName = attachment_properties.get("DisplayName")
+ self.AttachEncoding = attachment_properties.get("AttachEncoding")
+ self.AttachContentId = attachment_properties.get("AttachContentId")
+ self.AttachMethod = attachment_properties.get("AttachMethod")
+ self.AttachmentSize = format_size(attachment_properties.get("AttachmentSize"))
+ self.AttachFilename = attachment_properties.get("AttachFilename")
+ self.AttachLongFilename = attachment_properties.get("AttachLongFilename")
+ if self.AttachLongFilename:
+ self.Filename = self.AttachLongFilename
+ else:
+ self.Filename = self.AttachFilename
+ if self.Filename:
+ self.Filename = os.path.basename(self.Filename)
+ else:
+ self.Filename = "[NoFilename_Method%s]" % self.AttachMethod
+ self.data = attachment_properties.get("AttachDataObject")
+ self.AttachMimeTag = attachment_properties.get(
+ "AttachMimeTag", "application/octet-stream"
+ )
+ self.AttachExtension = attachment_properties.get("AttachExtension")
+
+ def __repr__(self):
+ return "%s (%s / %s)" % (
+ self.Filename,
+ self.AttachmentSize,
+ len(self.data or []),
+ )
+
+
+class MsOxMessage(object):
+ """
+ Base class for Microsoft Message Object
+ """
+
+ def __init__(self, msg_file_path):
+ self.msg_file_path = msg_file_path
+ self.include_attachment_data = False
+
+ if not self.is_valid_msg_file():
+ raise Exception(
+ "Invalid file provided, please provide valid Microsoft’s Outlook MSG file."
+ )
+
+ with OleFileIO(msg_file_path) as ole_file:
+ # process directory entries
+ ole_root = ole_file.root
+ kids_dict = ole_root.kids_dict
+
+ self._message = Message(kids_dict)
+ self._message_dict = self._message.as_dict()
+
+ # process msg properties
+ self._set_properties()
+
+ # process msg recipients
+ self._set_recipients()
+
+ # process attachments
+ self._set_attachments()
+
+ def get_properties(self):
+
+ properties = {}
+
+ for key, value in self._message_dict.items():
+
+ if key == "attachments" and value:
+ properties["attachments"] = self.attachments
+
+ elif key == "recipients" and value:
+ properties["recipients"] = self.recipients
+
+ else:
+ properties[key] = value
+
+ return properties
+
+ def get_properties_as_dict(self):
+ return self._message
+
+ def get_message_as_json(self):
+ try:
+ if not self.include_attachment_data:
+ for _, attachment in self._message_dict.get("attachments", []).items():
+ if not isinstance(attachment, dict):
+ continue
+ attachment["AttachDataObject"] = {}
+ # Using Pickle to encode message. There is bytes-like objects in it. Therefore cannot be treated by embed json.dumps method
+ json_string = dumps(self._message_dict)
+ return json_string
+ except ValueError:
+ return None
+
+ def get_email_mime_content(self):
+ email_obj = EmailFormatter(self)
+ return email_obj.build_email()
+
+ def save_email_file(self, file_path):
+ email_obj = EmailFormatter(self)
+ email_obj.save_file(file_path)
+ return True
+
+ def _set_properties(self):
+ property_values = self._message.properties
+
+ # setting generally required properties to easily access using MsOxMessage instance.
+ self.subject = property_values.get("Subject")
+
+ header = property_values.get("TransportMessageHeaders")
+ self.header = parse_email_headers(header, True)
+ self.header_dict = parse_email_headers(header) or {}
+
+ self.created_date = property_values.get("CreationTime")
+ self.received_date = property_values.get("ReceiptTime")
+
+ sent_date = property_values.get("DeliverTime")
+ if not sent_date:
+ sent_date = self.header_dict.get("Date")
+ self.sent_date = sent_date
+
+ sender_address = self.header_dict.get("From")
+ if not sender_address:
+ sender_address = property_values.get("SenderRepresentingSmtpAddress")
+ self.sender = sender_address
+
+ reply_to_address = self.header_dict.get("Reply-To")
+ if not reply_to_address:
+ reply_to_address = property_values.get("ReplyRecipientNames")
+ self.reply_to = reply_to_address
+
+ self.message_id = property_values.get("InternetMessageId")
+
+ to_address = self.header_dict.get("TO")
+ if not to_address:
+ to_address = property_values.get("DisplayTo")
+ if not to_address:
+ to_address = property_values.get("ReceivedRepresentingSmtpAddress")
+ self.to = to_address
+
+ cc_address = self.header_dict.get("CC")
+ # if cc_address:
+ # cc_address = [CONTROL_CHARS.sub(" ", cc_add) for cc_add in cc_address.split(",")]
+ self.cc = cc_address
+
+ bcc_address = self.header_dict.get("BCC")
+ self.bcc = bcc_address
+
+ # prefer HTMl over plain text
+ if "Html" in property_values:
+ self.body = property_values.get("Html")
+ else:
+ self.body = property_values.get("Body")
+
+ # Trying to decode body if is bytes obj. This is not the way to go. Quick-fix only.
+ # See IMAP specs. Use charset-normalizer, cchardet or chardet as last resort.
+ if isinstance(self.body, bytes):
+ self.body = self.body.decode("utf-8", "ignore")
+
+ if not self.body and "RtfCompressed" in property_values:
+ try:
+ import compressed_rtf
+ except ImportError:
+ compressed_rtf = None
+ if compressed_rtf:
+ compressed_rtf_body = property_values["RtfCompressed"]
+ self.body = compressed_rtf.decompress(compressed_rtf_body)
+
+ def _set_recipients(self):
+ recipients = self._message.recipients
+ self.recipients = []
+ for recipient_name, recipient in recipients.items():
+
+ if self.to and recipient_name in self.to:
+ recipient["RecipientType"] = "TO"
+
+ if self.cc and recipient_name in self.cc:
+ recipient["RecipientType"] = "CC"
+
+ if self.bcc and recipient_name in self.bcc:
+ recipient["RecipientType"] = "BCC"
+
+ if self.reply_to and recipient_name in self.reply_to:
+ recipient["RecipientType"] = "ReplyTo"
+
+ self.recipients.append(Recipient(recipient))
+
+ def _set_attachments(self):
+ attachments = self._message.attachments
+ self.attachments = [Attachment(attach) for attach in attachments.values()]
+
+ def is_valid_msg_file(self):
+ if not isOleFile(self.msg_file_path) and not os.path.exists(self.msg_file_path):
+ return False
+
+ return True
+
+
+def format_size(num, suffix="B"):
+ if not num:
+ return "unknown"
+ for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
+ if abs(num) < 1024.0:
+ return "%3.1f%s%s" % (num, unit, suffix)
+ num /= 1024.0
+ return "%.1f%s%s" % (num, "Yi", suffix)
+
+
+def parse_email_headers(header, raw=False):
+ if not header:
+ return None
+
+ headers = email.message_from_string(header)
+ if raw:
+ return headers
+
+ email_address_headers = {
+ "To": [],
+ "From": [],
+ "CC": [],
+ "BCC": [],
+ "Reply-To": [],
+ }
+
+ for addr in email_address_headers.keys():
+ for (name, email_address) in email.utils.getaddresses(
+ headers.get_all(addr, [])
+ ):
+ email_address_headers[addr].append("{} <{}>".format(name, email_address))
+
+ parsed_headers = dict(headers)
+ parsed_headers.update(email_address_headers)
+
+ return parsed_headers