Use python library to extract body of emails
This commit is contained in:
@@ -19,6 +19,7 @@ import re
|
||||
from bs4 import BeautifulSoup
|
||||
from cgi import escape
|
||||
from collections import defaultdict
|
||||
from email import policy
|
||||
from email.header import decode_header, make_header
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
@@ -46,8 +47,6 @@ class BaseNotification(object):
|
||||
class EMailNotification(BaseNotification):
|
||||
"Notification class to send notifications via mail."
|
||||
notification_type = "email"
|
||||
_html_text = "text/html"
|
||||
_plain_text = "text/plain"
|
||||
_bad_tags = [
|
||||
"applet",
|
||||
"embed",
|
||||
@@ -213,32 +212,27 @@ class EMailNotification(BaseNotification):
|
||||
img.add_header("Content-ID", "<{}>".format(basename(img_path)))
|
||||
self.embedded_imgs.append(img)
|
||||
|
||||
def get_decoded_email_body(self, queueid, msg, preferred=_html_text):
|
||||
"Find and decode email body."
|
||||
def get_email_body_soup(self, queueid, msg):
|
||||
"Extract and decode email body and return it as BeautifulSoup object."
|
||||
# try to find the body part
|
||||
self.logger.debug("{}: trying to find email body".format(queueid))
|
||||
body = None
|
||||
for part in msg.walk():
|
||||
content_type = part.get_content_type()
|
||||
if content_type in [EMailNotification._plain_text,
|
||||
EMailNotification._html_text]:
|
||||
body = part
|
||||
if content_type == preferred:
|
||||
break
|
||||
body = msg.get_body(preferencelist=("html", "plain"))
|
||||
|
||||
if body is not None:
|
||||
if body:
|
||||
charset = body.get_content_charset() or "utf-8"
|
||||
content = body.get_payload(decode=True)
|
||||
try:
|
||||
content = content.decode(encoding=charset, errors="replace")
|
||||
except LookupError:
|
||||
self.logger.info("{}: unknown encoding '{}', falling back to UTF-8".format(
|
||||
queueid, charset))
|
||||
content = content.decode("utf-8", errors="replace")
|
||||
content_type = body.get_content_type()
|
||||
if content_type == EMailNotification._plain_text:
|
||||
if content_type == "text/plain":
|
||||
# convert text/plain to text/html
|
||||
self.logger.debug(
|
||||
"{}: content type is {}, converting to {}".format(
|
||||
queueid, content_type, EMailNotification._html_text))
|
||||
"{}: content type is {}, converting to text/html".format(
|
||||
queueid, content_type))
|
||||
content = re.sub(r"^(.*)$", r"\1<br/>",
|
||||
escape(content), flags=re.MULTILINE)
|
||||
else:
|
||||
@@ -250,7 +244,16 @@ class EMailNotification(BaseNotification):
|
||||
"{}: unable to find email body".format(queueid))
|
||||
content = "ERROR: unable to find email body"
|
||||
|
||||
return content
|
||||
# create BeautifulSoup object
|
||||
self.logger.debug(
|
||||
"{}: trying to create BeatufilSoup object with parser lib {}, "
|
||||
"text length is {} bytes".format(
|
||||
queueid, self.parser_lib, len(content)))
|
||||
soup = BeautifulSoup(content, self.parser_lib)
|
||||
self.logger.debug(
|
||||
"{}: sucessfully created BeautifulSoup object".format(queueid))
|
||||
|
||||
return soup
|
||||
|
||||
def sanitize(self, queueid, soup):
|
||||
"Sanitize mail html text."
|
||||
@@ -304,17 +307,8 @@ class EMailNotification(BaseNotification):
|
||||
synchronous)
|
||||
|
||||
# extract body from email
|
||||
content = self.get_decoded_email_body(
|
||||
queueid, email.message_from_binary_file(fp))
|
||||
|
||||
# create BeautifulSoup object
|
||||
self.logger.debug(
|
||||
"{}: trying to create BeatufilSoup object with parser lib {}, "
|
||||
"text length is {} bytes".format(
|
||||
queueid, self.parser_lib, len(content)))
|
||||
soup = BeautifulSoup(content, self.parser_lib)
|
||||
self.logger.debug(
|
||||
"{}: sucessfully created BeautifulSoup object".format(queueid))
|
||||
soup = self.get_email_body_soup(
|
||||
queueid, email.message_from_binary_file(fp, policy=policy.default))
|
||||
|
||||
# replace picture sources
|
||||
image_replaced = False
|
||||
|
||||
Reference in New Issue
Block a user