Fix header decoding and switch to python f-strings

This commit is contained in:
2020-02-29 00:33:29 +01:00
parent 400c65eec8
commit d8e9dd2685
8 changed files with 181 additions and 250 deletions

View File

@@ -20,7 +20,7 @@ from bs4 import BeautifulSoup
from cgi import escape
from collections import defaultdict
from email import policy
from email.header import decode_header, make_header
from email.header import decode_header
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
@@ -38,7 +38,7 @@ class BaseNotification(object):
self.name = name
self.logger = logging.getLogger(__name__)
def notify(self, queueid, storage_id, mailfrom, recipients, headers,
def notify(self, qid, storage_id, mailfrom, recipients, headers,
fp, subgroups=None, named_subgroups=None, synchronous=False):
fp.seek(0)
pass
@@ -136,8 +136,7 @@ class EMailNotification(BaseNotification):
cfg[opt] = defaults[opt]
else:
raise RuntimeError(
"mandatory option '{}' not present in config section '{}' or 'global'".format(
opt, self.name))
f"mandatory option '{opt}' not present in config section '{self.name}' or 'global'")
self.smtp_host = cfg["notification_email_smtp_host"]
self.smtp_port = cfg["notification_email_smtp_port"]
@@ -152,14 +151,14 @@ class EMailNotification(BaseNotification):
self.from_header.format_map(testvars)
except ValueError as e:
raise RuntimeError(
"error parsing notification_email_from: {}".format(e))
f"error parsing notification_email_from: {e}")
# test-parse subject
try:
self.subject.format_map(testvars)
except ValueError as e:
raise RuntimeError(
"error parsing notification_email_subject: {}".format(e))
f"error parsing notification_email_subject: {e}")
# read and parse email notification template
try:
@@ -167,9 +166,9 @@ class EMailNotification(BaseNotification):
cfg["notification_email_template"], "r").read()
self.template.format_map(testvars)
except IOError as e:
raise RuntimeError("error reading template: {}".format(e))
raise RuntimeError(f"error reading template: {e}")
except ValueError as e:
raise RuntimeError("error parsing template: {}".format(e))
raise RuntimeError(f"error parsing template: {e}")
strip_images = cfg["notification_email_strip_images"].strip().upper()
if strip_images in ["TRUE", "ON", "YES"]:
@@ -191,7 +190,7 @@ class EMailNotification(BaseNotification):
open(replacement_img, "rb").read())
except IOError as e:
raise RuntimeError(
"error reading replacement image: {}".format(e))
f"error reading replacement image: {e}")
else:
self.replacement_img.add_header(
"Content-ID", "<removed_for_security_reasons>")
@@ -207,20 +206,20 @@ class EMailNotification(BaseNotification):
try:
img = MIMEImage(open(img_path, "rb").read())
except IOError as e:
raise RuntimeError("error reading image: {}".format(e))
raise RuntimeError(f"error reading image: {e}")
else:
img.add_header("Content-ID", "<{}>".format(basename(img_path)))
filename = basename(img_path)
img.add_header(f"Content-ID", f"<{filename}>")
self.embedded_imgs.append(img)
def get_email_body_soup(self, queueid, msg):
def get_email_body_soup(self, qid, msg):
"Extract and decode email body and return it as BeautifulSoup object."
# try to find the body part
self.logger.debug("{}: trying to find email body".format(queueid))
self.logger.debug(f"{qid}: trying to find email body")
try:
body = msg.get_body(preferencelist=("html", "plain"))
except Exception as e:
self.logger.error("{}: an error occured in email.message.EmailMessage.get_body: {}".format(
queueid, e))
self.logger.error(f"{qid}: an error occured in email.message.EmailMessage.get_body: {e}")
body = None
if body:
@@ -229,54 +228,49 @@ class EMailNotification(BaseNotification):
try:
content = content.decode(encoding=charset, errors="replace")
except LookupError:
self.logger.info("{}: unknown encoding '{}', falling back to UTF-8".format(
queueid, charset))
self.logger.info(f"{qid}: unknown encoding '{charset}', falling back to UTF-8")
content = content.decode("utf-8", errors="replace")
content_type = body.get_content_type()
if content_type == "text/plain":
# convert text/plain to text/html
self.logger.debug(
"{}: content type is {}, converting to text/html".format(
queueid, content_type))
f"{qid}: content type is {content_type}, converting to text/html")
content = re.sub(r"^(.*)$", r"\1<br/>",
escape(content), flags=re.MULTILINE)
else:
self.logger.debug(
"{}: content type is {}".format(
queueid, content_type))
f"{qid}: content type is {content_type}")
else:
self.logger.error(
"{}: unable to find email body".format(queueid))
f"{qid}: unable to find email body")
content = "ERROR: unable to find email body"
# create BeautifulSoup object
length = len(content)
self.logger.debug(
"{}: trying to create BeatufilSoup object with parser lib {}, "
"text length is {} bytes".format(
queueid, self.parser_lib, len(content)))
f"{qid}: trying to create BeatufilSoup object with parser lib {self.parser_lib}, "
f"text length is {length} bytes")
soup = BeautifulSoup(content, self.parser_lib)
self.logger.debug(
"{}: sucessfully created BeautifulSoup object".format(queueid))
f"{qid}: sucessfully created BeautifulSoup object")
return soup
def sanitize(self, queueid, soup):
def sanitize(self, qid, soup):
"Sanitize mail html text."
self.logger.debug("{}: sanitizing email text".format(queueid))
self.logger.debug(f"{qid}: sanitizing email text")
# completly remove bad elements
for element in soup(EMailNotification._bad_tags):
self.logger.debug(
"{}: removing dangerous tag '{}' and its content".format(
queueid, element.name))
f"{qid}: removing dangerous tag '{element_name}' and its content")
element.extract()
# remove not whitelisted elements, but keep their content
for element in soup.find_all(True):
if element.name not in EMailNotification._good_tags:
self.logger.debug(
"{}: removing tag '{}', keep its content".format(
queueid, element.name))
f"{qid}: removing tag '{element.name}', keep its content")
element.replaceWithChildren()
# remove not whitelisted attributes
@@ -285,23 +279,21 @@ class EMailNotification(BaseNotification):
if attribute not in EMailNotification._good_attributes:
if element.name == "a" and attribute == "href":
self.logger.debug(
"{}: setting attribute href to '#' on tag '{}'".format(
queueid, element.name))
f"{qid}: setting attribute href to '#' on tag '{element.name}'")
element["href"] = "#"
else:
self.logger.debug(
"{}: removing attribute '{}' from tag '{}'".format(
queueid, attribute, element.name))
f"{qid}: removing attribute '{attribute}' from tag '{element.name}'")
del(element.attrs[attribute])
return soup
def notify(self, queueid, storage_id, mailfrom, recipients, headers, fp,
def notify(self, qid, storage_id, mailfrom, recipients, headers, fp,
subgroups=None, named_subgroups=None, synchronous=False):
"Notify recipients via email."
super(
EMailNotification,
self).notify(
queueid,
qid,
storage_id,
mailfrom,
recipients,
@@ -313,66 +305,54 @@ class EMailNotification(BaseNotification):
# extract body from email
soup = self.get_email_body_soup(
queueid, email.message_from_binary_file(fp, policy=policy.default))
qid, email.message_from_binary_file(fp, policy=policy.default))
# replace picture sources
image_replaced = False
if self.strip_images:
self.logger.debug(
"{}: looking for images to strip".format(queueid))
f"{qid}: looking for images to strip")
for element in soup("img"):
if "src" in element.attrs.keys():
self.logger.debug(
"{}: strip image: {}".format(
queueid, element["src"]))
f"{qid}: strip image: {element['src']}")
element.extract()
elif self.replacement_img:
self.logger.debug(
"{}: looking for images to replace".format(queueid))
f"{qid}: looking for images to replace")
for element in soup("img"):
if "src" in element.attrs.keys():
self.logger.debug(
"{}: replacing image: {}".format(
queueid, element["src"]))
f"{qid}: replacing image: {element['src']}")
element["src"] = "cid:removed_for_security_reasons"
image_replaced = True
# sanitizing email text of original email
sanitized_text = self.sanitize(queueid, soup)
sanitized_text = self.sanitize(qid, soup)
del soup
# sending email notifications
for recipient in recipients:
self.logger.debug(
"{}: generating notification email for '{}'".format(
queueid, recipient))
self.logger.debug("{}: parsing email template".format(queueid))
# decode some headers
decoded_headers = {}
for header in ["from", "to", "subject"]:
if header in headers:
decoded_headers[header] = str(
make_header(decode_header(headers[header])))
else:
headers[header] = ""
decoded_headers[header] = ""
f"{qid}: generating notification email for '{recipient}'")
self.logger.debug(f"{qid}: parsing email template")
# generate dict containing all template variables
variables = defaultdict(str,
EMAIL_HTML_TEXT=sanitized_text,
EMAIL_FROM=escape(decoded_headers["from"]),
EMAIL_FROM=escape(headers["from"]),
EMAIL_ENVELOPE_FROM=escape(mailfrom),
EMAIL_ENVELOPE_FROM_URL=escape(quote(mailfrom)),
EMAIL_TO=escape(decoded_headers["to"]),
EMAIL_TO=escape(headers["to"]),
EMAIL_ENVELOPE_TO=escape(recipient),
EMAIL_ENVELOPE_TO_URL=escape(quote(recipient)),
EMAIL_SUBJECT=escape(decoded_headers["subject"]),
EMAIL_SUBJECT=escape(headers["subject"]),
EMAIL_QUARANTINE_ID=storage_id)
if subgroups:
number = 0
for subgroup in subgroups:
variables["SUBGROUP_{}".format(number)] = escape(subgroup)
variables[f"SUBGROUP_{number}"] = escape(subgroup)
if named_subgroups:
for key, value in named_subgroups.items():
named_subgroups[key] = escape(value)
@@ -392,26 +372,24 @@ class EMailNotification(BaseNotification):
if image_replaced:
self.logger.debug(
"{}: attaching notification_replacement_img".format(queueid))
f"{qid}: attaching notification_replacement_img")
msg.attach(self.replacement_img)
for img in self.embedded_imgs:
self.logger.debug("{}: attaching imgage".format(queueid))
self.logger.debug(f"{qid}: attaching imgage")
msg.attach(img)
self.logger.debug(
"{}: sending notification email to: {}".format(
queueid, recipient))
f"{qid}: sending notification email to: {recipient}")
if synchronous:
try:
mailer.smtp_send(self.smtp_host, self.smtp_port,
self.mailfrom, recipient, msg.as_string())
except Exception as e:
raise RuntimeError(
"error while sending email to '{}': {}".format(
recipient, e))
f"error while sending email to '{recipient}': {e}")
else:
mailer.sendmail(self.smtp_host, self.smtp_port, queueid,
mailer.sendmail(self.smtp_host, self.smtp_port, qid,
self.mailfrom, recipient, msg.as_string(),
"notification email")