# PyQuarantine-Milter is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # PyQuarantine-Milter is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with PyQuarantineMilter. If not, see . # import email import logging import re from bs4 import BeautifulSoup from cgi import escape from collections import defaultdict from email.policy import default as default_policy from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.image import MIMEImage from os.path import basename from urllib.parse import quote from pyquarantine import mailer class BaseNotification(object): "Notification base class" notification_type = "base" def __init__(self, name, global_cfg, cfg, test=False): self.name = name self.logger = logging.getLogger(__name__) def notify(self, qid, storage_id, mailfrom, recipients, headers, fp, subgroups=None, named_subgroups=None, synchronous=False): fp.seek(0) pass class EMailNotification(BaseNotification): "Notification class to send notifications via mail." notification_type = "email" _bad_tags = [ "applet", "embed", "frame", "frameset", "head", "iframe", "script" ] _good_tags = [ "a", "b", "br", "center", "div", "font", "h1", "h2", "h3", "h4", "h5", "h6", "i", "img", "li", "p", "pre", "span", "table", "td", "th", "tr", "tt", "u", "ul" ] _good_attributes = [ "align", "alt", "bgcolor", "border", "cellpadding", "cellspacing", "class", "color", "colspan", "dir", "face", "headers", "height", "id", "name", "rowspan", "size", "src", "style", "title", "type", "valign", "value", "width" ] def __init__(self, name, global_cfg, cfg, test=False): super(EMailNotification, self).__init__( name, global_cfg, cfg, test) defaults = { "notification_email_replacement_img": "", "notification_email_strip_images": "false", "notification_email_parser_lib": "lxml" } # check config for opt in [ "notification_email_smtp_host", "notification_email_smtp_port", "notification_email_envelope_from", "notification_email_from", "notification_email_subject", "notification_email_template", "notification_email_embedded_imgs"] + list(defaults.keys()): if opt in cfg: continue if opt in global_cfg: cfg[opt] = global_cfg[opt] elif opt in defaults: cfg[opt] = defaults[opt] else: raise RuntimeError( f"mandatory option '{opt}' not present in config " f"section '{self.name}' or 'global'") self.smtp_host = cfg["notification_email_smtp_host"] self.smtp_port = cfg["notification_email_smtp_port"] self.mailfrom = cfg["notification_email_envelope_from"] self.from_header = cfg["notification_email_from"] self.subject = cfg["notification_email_subject"] testvars = defaultdict(str, test="TEST") # test-parse from header try: self.from_header.format_map(testvars) except ValueError as e: raise RuntimeError( f"error parsing notification_email_from: {e}") # test-parse subject try: self.subject.format_map(testvars) except ValueError as e: raise RuntimeError( f"error parsing notification_email_subject: {e}") # read and parse email notification template try: self.template = open( cfg["notification_email_template"], "r").read() self.template.format_map(testvars) except IOError as e: raise RuntimeError(f"error reading template: {e}") except ValueError as e: raise RuntimeError(f"error parsing template: {e}") strip_images = cfg["notification_email_strip_images"].strip().upper() if strip_images in ["TRUE", "ON", "YES"]: self.strip_images = True elif strip_images in ["FALSE", "OFF", "NO"]: self.strip_images = False else: raise RuntimeError( "error parsing notification_email_strip_images: unknown value") self.parser_lib = cfg["notification_email_parser_lib"].strip() if self.parser_lib not in ["lxml", "html.parser"]: raise RuntimeError( "error parsing notification_email_parser_lib: unknown value") # read email replacement image if specified replacement_img = cfg["notification_email_replacement_img"].strip() if not self.strip_images and replacement_img: try: self.replacement_img = MIMEImage( open(replacement_img, "rb").read()) except IOError as e: raise RuntimeError( f"error reading replacement image: {e}") else: self.replacement_img.add_header( "Content-ID", "") else: self.replacement_img = None # read images to embed if specified embedded_img_paths = [ p.strip() for p in cfg["notification_email_embedded_imgs"].split( ",") if p] self.embedded_imgs = [] for img_path in embedded_img_paths: # read image try: img = MIMEImage(open(img_path, "rb").read()) except IOError as e: raise RuntimeError(f"error reading image: {e}") else: filename = basename(img_path) img.add_header("Content-ID", f"<{filename}>") self.embedded_imgs.append(img) def get_email_body_soup(self, qid, msg): "Extract and decode email body and return it as BeautifulSoup object." # try to find the body part self.logger.debug(f"{qid}: trying to find email body") try: body = msg.get_body(preferencelist=("html", "plain")) except Exception as e: self.logger.error( f"{qid}: an error occured in " f"email.message.EmailMessage.get_body: {e}") body = None if body: charset = body.get_content_charset() or "utf-8" content = body.get_payload(decode=True) try: content = content.decode(encoding=charset, errors="replace") except LookupError: self.logger.info( f"{qid}: unknown encoding '{charset}', " f"falling back to UTF-8") content = content.decode("utf-8", errors="replace") content_type = body.get_content_type() if content_type == "text/plain": # convert text/plain to text/html self.logger.debug( f"{qid}: content type is {content_type}, " f"converting to text/html") content = re.sub(r"^(.*)$", r"\1
", escape(content), flags=re.MULTILINE) else: self.logger.debug( f"{qid}: content type is {content_type}") else: self.logger.error( f"{qid}: unable to find email body") content = "ERROR: unable to find email body" # create BeautifulSoup object length = len(content) self.logger.debug( f"{qid}: trying to create BeatufilSoup object with " f"parser lib {self.parser_lib}, " f"text length is {length} bytes") soup = BeautifulSoup(content, self.parser_lib) self.logger.debug( f"{qid}: sucessfully created BeautifulSoup object") return soup def sanitize(self, qid, soup): "Sanitize mail html text." self.logger.debug(f"{qid}: sanitizing email text") # completly remove bad elements for element in soup(EMailNotification._bad_tags): self.logger.debug( f"{qid}: removing dangerous tag '{element.name}' " f"and its content") element.extract() # remove not whitelisted elements, but keep their content for element in soup.find_all(True): if element.name not in EMailNotification._good_tags: self.logger.debug( f"{qid}: removing tag '{element.name}', keep its content") element.replaceWithChildren() # remove not whitelisted attributes for element in soup.find_all(True): for attribute in list(element.attrs.keys()): if attribute not in EMailNotification._good_attributes: if element.name == "a" and attribute == "href": self.logger.debug( f"{qid}: setting attribute href to '#' " f"on tag '{element.name}'") element["href"] = "#" else: self.logger.debug( f"{qid}: removing attribute '{attribute}' " f"from tag '{element.name}'") del(element.attrs[attribute]) return soup def notify(self, qid, storage_id, mailfrom, recipients, headers, fp, subgroups=None, named_subgroups=None, synchronous=False): "Notify recipients via email." super( EMailNotification, self).notify( qid, storage_id, mailfrom, recipients, headers, fp, subgroups, named_subgroups, synchronous) # extract body from email soup = self.get_email_body_soup( qid, email.message_from_binary_file(fp, policy=default_policy)) # replace picture sources image_replaced = False if self.strip_images: self.logger.debug( f"{qid}: looking for images to strip") for element in soup("img"): if "src" in element.attrs.keys(): self.logger.debug( f"{qid}: strip image: {element['src']}") element.extract() elif self.replacement_img: self.logger.debug( f"{qid}: looking for images to replace") for element in soup("img"): if "src" in element.attrs.keys(): self.logger.debug( f"{qid}: replacing image: {element['src']}") element["src"] = "cid:removed_for_security_reasons" image_replaced = True # sanitizing email text of original email sanitized_text = self.sanitize(qid, soup) del soup # sending email notifications for recipient in recipients: self.logger.debug( f"{qid}: generating notification email for '{recipient}'") self.logger.debug(f"{qid}: parsing email template") # generate dict containing all template variables variables = defaultdict( str, EMAIL_HTML_TEXT=sanitized_text, EMAIL_FROM=escape(headers["from"]), EMAIL_ENVELOPE_FROM=escape(mailfrom), EMAIL_ENVELOPE_FROM_URL=escape(quote(mailfrom)), EMAIL_TO=escape(headers["to"]), EMAIL_ENVELOPE_TO=escape(recipient), EMAIL_ENVELOPE_TO_URL=escape(quote(recipient)), EMAIL_SUBJECT=escape(headers["subject"]), EMAIL_QUARANTINE_ID=storage_id) if subgroups: number = 0 for subgroup in subgroups: variables[f"SUBGROUP_{number}"] = escape(subgroup) if named_subgroups: for key, value in named_subgroups.items(): named_subgroups[key] = escape(value) variables.update(named_subgroups) # parse template htmltext = self.template.format_map(variables) msg = MIMEMultipart('related') msg["From"] = self.from_header.format_map( defaultdict(str, EMAIL_FROM=headers["from"])) msg["To"] = headers["to"] msg["Subject"] = self.subject.format_map( defaultdict(str, EMAIL_SUBJECT=headers["subject"])) msg["Date"] = email.utils.formatdate() msg.attach(MIMEText(htmltext, "html", 'UTF-8')) if image_replaced: self.logger.debug( f"{qid}: attaching notification_replacement_img") msg.attach(self.replacement_img) for img in self.embedded_imgs: self.logger.debug(f"{qid}: attaching imgage") msg.attach(img) self.logger.debug( f"{qid}: sending notification email to: {recipient}") if synchronous: try: mailer.smtp_send(self.smtp_host, self.smtp_port, self.mailfrom, recipient, msg.as_string()) except Exception as e: raise RuntimeError( f"error while sending email to '{recipient}': {e}") else: mailer.sendmail(self.smtp_host, self.smtp_port, qid, self.mailfrom, recipient, msg.as_string(), "notification email") # list of notification types and their related notification classes TYPES = {"email": EMailNotification}