1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
# -*- coding: utf-8 -*-
import codecs
import os
import unicodedata
from email import encoders
from email.header import Header
from email.mime.audio import MIMEAudio
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
class EmailFormatter(object):
def __init__(self, msg_object):
self.msg_obj = msg_object
self.message = MIMEMultipart()
self.message.set_charset("utf-8")
def build_email(self):
# Setting Message ID
self.message.set_param("Message-ID", self.msg_obj.message_id)
# Encoding for unicode subject
self.message["Subject"] = Header(self.msg_obj.subject, charset="UTF-8")
# Setting Date Time
# Returns a date string as specified by RFC 2822, e.g.: Fri, 09 Nov 2001 01:08:47 -0000
self.message["Date"] = str(self.msg_obj.sent_date)
# At least one recipient is required
# Required fromAddress
from_address = flatten_list(self.msg_obj.sender)
if from_address:
self.message["From"] = from_address
to_address = flatten_list(self.msg_obj.header_dict.get("To"))
if to_address:
self.message["To"] = to_address
cc_address = flatten_list(self.msg_obj.header_dict.get("CC"))
if cc_address:
self.message["CC"] = cc_address
bcc_address = flatten_list(self.msg_obj.header_dict.get("BCC"))
if bcc_address:
self.message["BCC"] = bcc_address
# Add reply-to
reply_to = flatten_list(self.msg_obj.reply_to)
if reply_to:
self.message.add_header("reply-to", reply_to)
else:
self.message.add_header("reply-to", from_address)
# Required Email body content
body_content = self.msg_obj.body
if body_content:
if "<html>" in body_content:
body_type = "html"
else:
body_type = "plain"
body = MIMEText(_text=body_content, _subtype=body_type, _charset="UTF-8")
self.message.attach(body)
else:
raise KeyError("Missing email body")
# Add message preamble
self.message.preamble = "You will not see this in a MIME-aware mail reader.\n"
# Optional attachments
attachments = self.msg_obj.attachments
if len(attachments) > 0:
# Some issues here, where data is None or is bytes-like object.
self._process_attachments(self.msg_obj.attachments)
# composed email
composed = self.message.as_string()
return composed
def save_file(self, file_path):
eml_content = self.build_email()
file_name = str(self.message["Subject"]) + ".eml"
eml_file_path = os.path.join(file_path, file_name)
with codecs.open(eml_file_path, mode="wb+", encoding="utf-8") as eml_file:
eml_file.write(eml_content)
return eml_file_path
def _process_attachments(self, attachments):
for attachment in attachments:
ctype = attachment.AttachMimeTag
data = attachment.data
filename = attachment.Filename
maintype, subtype = ctype.split("/", 1)
if data is None:
continue
if isinstance(data, bytes):
data = data.decode("utf-8", "ignore")
if maintype == "text" or "message" in maintype:
attach = MIMEText(data, _subtype=subtype)
elif maintype == "image":
attach = MIMEImage(data, _subtype=subtype)
elif maintype == "audio":
attach = MIMEAudio(data, _subtype=subtype)
else:
attach = MIMEBase(maintype, subtype)
attach.set_payload(data)
# Encode the payload using Base64
encoders.encode_base64(attach)
# Set the filename parameter
base_filename = os.path.basename(filename)
attach.add_header("Content-ID", "<{}>".format(base_filename))
attach.add_header(
"Content-Disposition", "attachment", filename=base_filename
)
self.message.attach(attach)
def flatten_list(string_list):
if string_list and isinstance(string_list, list):
string = ",".join(string_list)
return string
return None
def normalize(input_str):
if not input_str:
return input_str
try:
if isinstance(input_str, list):
input_str = [s.decode("ascii") for s in input_str]
else:
input_str.decode("ascii")
return input_str
except UnicodeError:
if isinstance(input_str, bytes):
input_str = input_str.decode("utf-8", "ignore")
normalized = unicodedata.normalize("NFKD", input_str)
if not normalized.strip():
normalized = input_str.encode("unicode-escape").decode("utf-8")
return normalized
|