# -*- coding: utf-8 -*- import codecs import os import unicodedata from email import encoders from email.header import Header from email.mime.audio import MIMEAudio from email.mime.base import MIMEBase from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText class EmailFormatter(object): def __init__(self, msg_object): self.msg_obj = msg_object self.message = MIMEMultipart() self.message.set_charset("utf-8") def build_email(self): # Setting Message ID self.message.set_param("Message-ID", self.msg_obj.message_id) # Encoding for unicode subject self.message["Subject"] = Header(self.msg_obj.subject, charset="UTF-8") # Setting Date Time # Returns a date string as specified by RFC 2822, e.g.: Fri, 09 Nov 2001 01:08:47 -0000 self.message["Date"] = str(self.msg_obj.sent_date) # At least one recipient is required # Required fromAddress from_address = flatten_list(self.msg_obj.sender) if from_address: self.message["From"] = from_address to_address = flatten_list(self.msg_obj.header_dict.get("To")) if to_address: self.message["To"] = to_address cc_address = flatten_list(self.msg_obj.header_dict.get("CC")) if cc_address: self.message["CC"] = cc_address bcc_address = flatten_list(self.msg_obj.header_dict.get("BCC")) if bcc_address: self.message["BCC"] = bcc_address # Add reply-to reply_to = flatten_list(self.msg_obj.reply_to) if reply_to: self.message.add_header("reply-to", reply_to) else: self.message.add_header("reply-to", from_address) # Required Email body content body_content = self.msg_obj.body if body_content: if "" in body_content: body_type = "html" else: body_type = "plain" body = MIMEText(_text=body_content, _subtype=body_type, _charset="UTF-8") self.message.attach(body) else: raise KeyError("Missing email body") # Add message preamble self.message.preamble = "You will not see this in a MIME-aware mail reader.\n" # Optional attachments attachments = self.msg_obj.attachments if len(attachments) > 0: # Some issues here, where data is None or is bytes-like object. self._process_attachments(self.msg_obj.attachments) # composed email composed = self.message.as_string() return composed def save_file(self, file_path): eml_content = self.build_email() file_name = str(self.message["Subject"]) + ".eml" eml_file_path = os.path.join(file_path, file_name) with codecs.open(eml_file_path, mode="wb+", encoding="utf-8") as eml_file: eml_file.write(eml_content) return eml_file_path def _process_attachments(self, attachments): for attachment in attachments: ctype = attachment.AttachMimeTag data = attachment.data filename = attachment.Filename maintype, subtype = ctype.split("/", 1) if data is None: continue if isinstance(data, bytes): data = data.decode("utf-8", "ignore") if maintype == "text" or "message" in maintype: attach = MIMEText(data, _subtype=subtype) elif maintype == "image": attach = MIMEImage(data, _subtype=subtype) elif maintype == "audio": attach = MIMEAudio(data, _subtype=subtype) else: attach = MIMEBase(maintype, subtype) attach.set_payload(data) # Encode the payload using Base64 encoders.encode_base64(attach) # Set the filename parameter base_filename = os.path.basename(filename) attach.add_header("Content-ID", "<{}>".format(base_filename)) attach.add_header( "Content-Disposition", "attachment", filename=base_filename ) self.message.attach(attach) def flatten_list(string_list): if string_list and isinstance(string_list, list): string = ",".join(string_list) return string return None def normalize(input_str): if not input_str: return input_str try: if isinstance(input_str, list): input_str = [s.decode("ascii") for s in input_str] else: input_str.decode("ascii") return input_str except UnicodeError: if isinstance(input_str, bytes): input_str = input_str.decode("utf-8", "ignore") normalized = unicodedata.normalize("NFKD", input_str) if not normalized.strip(): normalized = input_str.encode("unicode-escape").decode("utf-8") return normalized