diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/msg_parser/email_builder.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/msg_parser/email_builder.py | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/msg_parser/email_builder.py b/.venv/lib/python3.12/site-packages/msg_parser/email_builder.py new file mode 100644 index 00000000..7939c63a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/msg_parser/email_builder.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +import codecs +import os +import unicodedata +from email import encoders +from email.header import Header +from email.mime.audio import MIMEAudio +from email.mime.base import MIMEBase +from email.mime.image import MIMEImage +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + + +class EmailFormatter(object): + def __init__(self, msg_object): + self.msg_obj = msg_object + self.message = MIMEMultipart() + self.message.set_charset("utf-8") + + def build_email(self): + + # Setting Message ID + self.message.set_param("Message-ID", self.msg_obj.message_id) + + # Encoding for unicode subject + self.message["Subject"] = Header(self.msg_obj.subject, charset="UTF-8") + + # Setting Date Time + # Returns a date string as specified by RFC 2822, e.g.: Fri, 09 Nov 2001 01:08:47 -0000 + self.message["Date"] = str(self.msg_obj.sent_date) + + # At least one recipient is required + # Required fromAddress + from_address = flatten_list(self.msg_obj.sender) + if from_address: + self.message["From"] = from_address + + to_address = flatten_list(self.msg_obj.header_dict.get("To")) + if to_address: + self.message["To"] = to_address + + cc_address = flatten_list(self.msg_obj.header_dict.get("CC")) + if cc_address: + self.message["CC"] = cc_address + + bcc_address = flatten_list(self.msg_obj.header_dict.get("BCC")) + if bcc_address: + self.message["BCC"] = bcc_address + + # Add reply-to + reply_to = flatten_list(self.msg_obj.reply_to) + if reply_to: + self.message.add_header("reply-to", reply_to) + else: + self.message.add_header("reply-to", from_address) + + # Required Email body content + body_content = self.msg_obj.body + if body_content: + if "<html>" in body_content: + body_type = "html" + else: + body_type = "plain" + + body = MIMEText(_text=body_content, _subtype=body_type, _charset="UTF-8") + self.message.attach(body) + else: + raise KeyError("Missing email body") + + # Add message preamble + self.message.preamble = "You will not see this in a MIME-aware mail reader.\n" + + # Optional attachments + attachments = self.msg_obj.attachments + if len(attachments) > 0: + # Some issues here, where data is None or is bytes-like object. + self._process_attachments(self.msg_obj.attachments) + + # composed email + composed = self.message.as_string() + + return composed + + def save_file(self, file_path): + + eml_content = self.build_email() + + file_name = str(self.message["Subject"]) + ".eml" + + eml_file_path = os.path.join(file_path, file_name) + + with codecs.open(eml_file_path, mode="wb+", encoding="utf-8") as eml_file: + eml_file.write(eml_content) + + return eml_file_path + + def _process_attachments(self, attachments): + for attachment in attachments: + ctype = attachment.AttachMimeTag + data = attachment.data + filename = attachment.Filename + maintype, subtype = ctype.split("/", 1) + + if data is None: + continue + + if isinstance(data, bytes): + data = data.decode("utf-8", "ignore") + + if maintype == "text" or "message" in maintype: + attach = MIMEText(data, _subtype=subtype) + elif maintype == "image": + attach = MIMEImage(data, _subtype=subtype) + elif maintype == "audio": + attach = MIMEAudio(data, _subtype=subtype) + else: + attach = MIMEBase(maintype, subtype) + attach.set_payload(data) + + # Encode the payload using Base64 + encoders.encode_base64(attach) + # Set the filename parameter + base_filename = os.path.basename(filename) + attach.add_header("Content-ID", "<{}>".format(base_filename)) + attach.add_header( + "Content-Disposition", "attachment", filename=base_filename + ) + self.message.attach(attach) + + +def flatten_list(string_list): + if string_list and isinstance(string_list, list): + string = ",".join(string_list) + return string + return None + + +def normalize(input_str): + if not input_str: + return input_str + try: + if isinstance(input_str, list): + input_str = [s.decode("ascii") for s in input_str] + else: + input_str.decode("ascii") + return input_str + except UnicodeError: + if isinstance(input_str, bytes): + input_str = input_str.decode("utf-8", "ignore") + normalized = unicodedata.normalize("NFKD", input_str) + if not normalized.strip(): + normalized = input_str.encode("unicode-escape").decode("utf-8") + + return normalized |