From 6dcfeb05f610048ef0bfce6d1d7faa5f60039bd9 Mon Sep 17 00:00:00 2001 From: Thomas Oettli Date: Fri, 10 Sep 2021 03:20:02 +0200 Subject: [PATCH] add notifications.py --- pymodmilter/notifications.py | 309 +++++++++++++++++++++++++++++++++++ pymodmilter/storages.py | 4 + 2 files changed, 313 insertions(+) create mode 100644 pymodmilter/notifications.py diff --git a/pymodmilter/notifications.py b/pymodmilter/notifications.py new file mode 100644 index 0000000..6211187 --- /dev/null +++ b/pymodmilter/notifications.py @@ -0,0 +1,309 @@ +# PyMod-Milter is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# PyMod-Milter is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with PyMod-Milter. If not, see . +# + +import email +import logging +import re + +from bs4 import BeautifulSoup +from collections import defaultdict +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.mime.image import MIMEImage +from html import escape +from os.path import basename +from urllib.parse import quote + +from pymodmilter import mailer + + +class BaseNotification(object): + "Notification base class" + + def __init__(self): + return + + def notify(self, msg, qid, mailfrom, recipients, + template_vars=defaultdict(str), synchronous=False): + return + + def execute(self, milter, pretend=False, + logger=logging.getLogger(__name__)): + return + + +class EMailNotification(BaseNotification): + "Notification class to send notifications via mail." + notification_type = "email" + _bad_tags = [ + "applet", + "embed", + "frame", + "frameset", + "head", + "iframe", + "script" + ] + _good_tags = [ + "a", + "b", + "br", + "center", + "div", + "font", + "h1", + "h2", + "h3", + "h4", + "h5", + "h6", + "i", + "img", + "li", + "p", + "pre", + "span", + "table", + "td", + "th", + "tr", + "tt", + "u", + "ul" + ] + _good_attributes = [ + "align", + "alt", + "bgcolor", + "border", + "cellpadding", + "cellspacing", + "class", + "color", + "colspan", + "dir", + "face", + "headers", + "height", + "id", + "name", + "rowspan", + "size", + "src", + "style", + "title", + "type", + "valign", + "value", + "width" + ] + + def __init__(self, smtp_host, smtp_port, envelope_from, from_header, + subject, template, embed_imgs=[], repl_img=None, + strip_imgs=False, parser_lib="lxml"): + super().__init__() + + self.smtp_host = smtp_host + self.smtp_port = smtp_port + self.mailfrom = envelope_from + self.from_header = from_header + self.subject = subject + self.template = open(template, "r").read() + self.embedded_imgs = [] + for img_path in embed_imgs: + img = MIMEImage(open(img_path, "rb").read()) + filename = basename(img_path) + img.add_header("Content-ID", f"<{filename}>") + self.embedded_imgs.append(img) + + self.replacement_img = repl_img + self.strip_images = strip_imgs + + if not strip_imgs and repl_img: + self.replacement_img = MIMEImage( + open(repl_img, "rb").read()) + self.replacement_img.add_header( + "Content-ID", "") + + self.parser_lib = parser_lib + + def get_email_body_soup(self, msg, logger=None): + "Extract and decode email body and return it as BeautifulSoup object." + if logger is None: + logger = self.logger + + # try to find the body part + logger.debug("trying to find email body") + try: + body = msg.get_body(preferencelist=("html", "plain")) + except Exception as e: + logger.error( + f"an error occured in email.message.EmailMessage.get_body: " + f"{e}") + body = None + + if body: + charset = body.get_content_charset() or "utf-8" + content = body.get_payload(decode=True) + try: + content = content.decode(encoding=charset, errors="replace") + except LookupError: + logger.info( + f"unknown encoding '{charset}', falling back to UTF-8") + content = content.decode("utf-8", errors="replace") + content_type = body.get_content_type() + if content_type == "text/plain": + # convert text/plain to text/html + logger.debug( + f"content type is {content_type}, " + f"converting to text/html") + content = re.sub(r"^(.*)$", r"\1
", + escape(content, quote=False), + flags=re.MULTILINE) + else: + logger.debug(f"content type is {content_type}") + else: + logger.error("unable to find email body") + content = "ERROR: unable to find email body" + + # create BeautifulSoup object + length = len(content) + logger.debug( + f"trying to create BeatufilSoup object with " + f"parser lib {self.parser_lib}, " + f"text length is {length} bytes") + soup = BeautifulSoup(content, self.parser_lib) + logger.debug("sucessfully created BeautifulSoup object") + + return soup + + def sanitize(self, soup, logger=None): + "Sanitize mail html text." + if logger is None: + logger = self.logger + + logger.debug("sanitizing email text") + + # completly remove bad elements + for element in soup(EMailNotification._bad_tags): + logger.debug( + f"removing dangerous tag '{element.name}' " + f"and its content") + element.extract() + + # remove not whitelisted elements, but keep their content + for element in soup.find_all(True): + if element.name not in EMailNotification._good_tags: + logger.debug( + f"removing tag '{element.name}', keep its content") + element.replaceWithChildren() + + # remove not whitelisted attributes + for element in soup.find_all(True): + for attribute in list(element.attrs.keys()): + if attribute not in EMailNotification._good_attributes: + if element.name == "a" and attribute == "href": + logger.debug( + f"setting attribute href to '#' " + f"on tag '{element.name}'") + element["href"] = "#" + else: + logger.debug( + f"removing attribute '{attribute}' " + f"from tag '{element.name}'") + del(element.attrs[attribute]) + return soup + + def notify(self, msg, qid, mailfrom, recipients, + template_vars=defaultdict(str), synchronous=False, + logger=None): + "Notify recipients via email." + super().notify( + msg, qid, mailfrom, recipients, template_vars, synchronous, logger) + + if logger is None: + logger = self.logger + + # extract body from email + soup = self.get_email_body_soup(msg, logger) + + # replace picture sources + image_replaced = False + if self.strip_images: + logger.debug("looking for images to strip") + for element in soup("img"): + if "src" in element.attrs.keys(): + logger.debug(f"strip image: {element['src']}") + element.extract() + elif self.replacement_img: + logger.debug("looking for images to replace") + for element in soup("img"): + if "src" in element.attrs.keys(): + logger.debug(f"replacing image: {element['src']}") + element["src"] = "cid:removed_for_security_reasons" + image_replaced = True + + # sanitizing email text of original email + sanitized_text = self.sanitize(soup, logger) + del soup + + # sending email notifications + for recipient in recipients: + logger.debug(f"generating notification email for '{recipient}'") + logger.debug("parsing email template") + + # generate dict containing all template variables + template_vars.update({ + "HTML_TEXT": sanitized_text, + "FROM": escape(msg["from"], quote=False), + "ENVELOPE_FROM": escape(mailfrom, quote=False), + "EMAIL_ENVELOPE_FROM_URL": escape(quote(mailfrom), + quote=False), + "TO": escape(msg["to"], quote=False), + "ENVELOPE_TO": escape(recipient, quote=False), + "ENVELOPE_TO_URL": escape(quote(recipient)), + "SUBJECT": escape(msg["subject"], quote=False)}) + + # parse template + htmltext = self.template.format_map(template_vars) + + msg = MIMEMultipart('related') + msg["From"] = self.from_header.format_map( + defaultdict(str, EMAIL_FROM=msg["from"])) + msg["To"] = msg["to"] + msg["Subject"] = self.subject.format_map( + defaultdict(str, EMAIL_SUBJECT=msg["subject"])) + msg["Date"] = email.utils.formatdate() + msg.attach(MIMEText(htmltext, "html", 'UTF-8')) + + if image_replaced: + logger.debug("attaching notification_replacement_img") + msg.attach(self.replacement_img) + + for img in self.embedded_imgs: + logger.debug("attaching imgage") + msg.attach(img) + + logger.debug(f"sending notification email to: {recipient}") + if synchronous: + try: + mailer.smtp_send(self.smtp_host, self.smtp_port, + self.mailfrom, recipient, msg.as_string()) + except Exception as e: + raise RuntimeError( + f"error while sending email to '{recipient}': {e}") + else: + mailer.sendmail(self.smtp_host, self.smtp_port, qid, + self.mailfrom, recipient, msg.as_string(), + "notification email") diff --git a/pymodmilter/storages.py b/pymodmilter/storages.py index 07fcd50..f36cb6e 100644 --- a/pymodmilter/storages.py +++ b/pymodmilter/storages.py @@ -31,6 +31,10 @@ class BaseMailStorage(object): "Add email to storage." return ("", "") + def execute(self, milter, pretend=False, + logger=logging.getLogger(__name__)): + return + def find(self, mailfrom=None, recipients=None, older_than=None): "Find emails in storage." return