From a10e34105673fde80c3da56487af967f51cde8ed Mon Sep 17 00:00:00 2001 From: Thomas Oettli Date: Mon, 3 Feb 2020 02:57:50 +0100 Subject: [PATCH] Use python library to extract body of emails --- pyquarantine/notifications.py | 50 +++++++++++++++-------------------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/pyquarantine/notifications.py b/pyquarantine/notifications.py index db46167..92ad162 100644 --- a/pyquarantine/notifications.py +++ b/pyquarantine/notifications.py @@ -19,6 +19,7 @@ import re from bs4 import BeautifulSoup from cgi import escape from collections import defaultdict +from email import policy from email.header import decode_header, make_header from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText @@ -46,8 +47,6 @@ class BaseNotification(object): class EMailNotification(BaseNotification): "Notification class to send notifications via mail." notification_type = "email" - _html_text = "text/html" - _plain_text = "text/plain" _bad_tags = [ "applet", "embed", @@ -213,32 +212,27 @@ class EMailNotification(BaseNotification): img.add_header("Content-ID", "<{}>".format(basename(img_path))) self.embedded_imgs.append(img) - def get_decoded_email_body(self, queueid, msg, preferred=_html_text): - "Find and decode email body." + def get_email_body_soup(self, queueid, msg): + "Extract and decode email body and return it as BeautifulSoup object." # try to find the body part self.logger.debug("{}: trying to find email body".format(queueid)) - body = None - for part in msg.walk(): - content_type = part.get_content_type() - if content_type in [EMailNotification._plain_text, - EMailNotification._html_text]: - body = part - if content_type == preferred: - break + body = msg.get_body(preferencelist=("html", "plain")) - if body is not None: + if body: charset = body.get_content_charset() or "utf-8" content = body.get_payload(decode=True) try: content = content.decode(encoding=charset, errors="replace") except LookupError: + self.logger.info("{}: unknown encoding '{}', falling back to UTF-8".format( + queueid, charset)) content = content.decode("utf-8", errors="replace") content_type = body.get_content_type() - if content_type == EMailNotification._plain_text: + if content_type == "text/plain": # convert text/plain to text/html self.logger.debug( - "{}: content type is {}, converting to {}".format( - queueid, content_type, EMailNotification._html_text)) + "{}: content type is {}, converting to text/html".format( + queueid, content_type)) content = re.sub(r"^(.*)$", r"\1
", escape(content), flags=re.MULTILINE) else: @@ -250,7 +244,16 @@ class EMailNotification(BaseNotification): "{}: unable to find email body".format(queueid)) content = "ERROR: unable to find email body" - return content + # create BeautifulSoup object + self.logger.debug( + "{}: trying to create BeatufilSoup object with parser lib {}, " + "text length is {} bytes".format( + queueid, self.parser_lib, len(content))) + soup = BeautifulSoup(content, self.parser_lib) + self.logger.debug( + "{}: sucessfully created BeautifulSoup object".format(queueid)) + + return soup def sanitize(self, queueid, soup): "Sanitize mail html text." @@ -304,17 +307,8 @@ class EMailNotification(BaseNotification): synchronous) # extract body from email - content = self.get_decoded_email_body( - queueid, email.message_from_binary_file(fp)) - - # create BeautifulSoup object - self.logger.debug( - "{}: trying to create BeatufilSoup object with parser lib {}, " - "text length is {} bytes".format( - queueid, self.parser_lib, len(content))) - soup = BeautifulSoup(content, self.parser_lib) - self.logger.debug( - "{}: sucessfully created BeautifulSoup object".format(queueid)) + soup = self.get_email_body_soup( + queueid, email.message_from_binary_file(fp, policy=policy.default)) # replace picture sources image_replaced = False