Make source PEP8 conform

This commit is contained in:
2019-10-17 22:25:10 +02:00
parent 6ea167bc52
commit 89a01d92c8
7 changed files with 865 additions and 366 deletions

View File

@@ -2,12 +2,12 @@
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
#
# PyQuarantine-Milter is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
#
# You should have received a copy of the GNU General Public License
# along with PyQuarantineMilter. If not, see <http://www.gnu.org/licenses/>.
#
@@ -26,15 +26,18 @@ from os.path import basename
from pyquarantine import mailer
class BaseNotification(object):
"Notification base class"
def __init__(self, global_config, config, configtest=False):
self.quarantine_name = config["name"]
self.global_config = global_config
self.config = config
self.logger = logging.getLogger(__name__)
def notify(self, queueid, quarantine_id, mailfrom, recipients, headers, fp, subgroups=None, named_subgroups=None, synchronous=False):
def notify(self, queueid, quarantine_id, mailfrom, recipients, headers,
fp, subgroups=None, named_subgroups=None, synchronous=False):
fp.seek(0)
pass
@@ -44,74 +47,85 @@ class EMailNotification(BaseNotification):
_html_text = "text/html"
_plain_text = "text/plain"
_bad_tags = [
"applet",
"embed",
"frame",
"frameset",
"head",
"iframe",
"script"
"applet",
"embed",
"frame",
"frameset",
"head",
"iframe",
"script"
]
_good_tags = [
"a",
"b",
"br",
"center",
"div",
"font",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"i",
"img",
"li",
"span",
"table",
"td",
"th",
"tr",
"tt",
"u",
"ul"
"a",
"b",
"br",
"center",
"div",
"font",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"i",
"img",
"li",
"span",
"table",
"td",
"th",
"tr",
"tt",
"u",
"ul"
]
good_attributes = [
"align",
"alt",
"bgcolor",
"border",
"cellpadding",
"cellspacing",
"color",
"colspan",
"dir",
"face",
"headers",
"height",
"id",
"name",
"rowspan",
"size",
"src",
"style",
"title",
"type",
"valign",
"value",
"width"
"align",
"alt",
"bgcolor",
"border",
"cellpadding",
"cellspacing",
"color",
"colspan",
"dir",
"face",
"headers",
"height",
"id",
"name",
"rowspan",
"size",
"src",
"style",
"title",
"type",
"valign",
"value",
"width"
]
def __init__(self, global_config, config, configtest=False):
super(EMailNotification, self).__init__(global_config, config, configtest)
super(EMailNotification, self).__init__(
global_config, config, configtest)
# check if mandatory options are present in config
for option in ["smtp_host", "smtp_port", "notification_email_envelope_from", "notification_email_from", "notification_email_subject", "notification_email_template", "notification_email_replacement_img", "notification_email_embedded_imgs"]:
for option in [
"smtp_host",
"smtp_port",
"notification_email_envelope_from",
"notification_email_from",
"notification_email_subject",
"notification_email_template",
"notification_email_replacement_img",
"notification_email_embedded_imgs"]:
if option not in self.config.keys() and option in self.global_config.keys():
self.config[option] = self.global_config[option]
if option not in self.config.keys():
raise RuntimeError("mandatory option '{}' not present in config section '{}' or 'global'".format(option, self.quarantine_name))
raise RuntimeError(
"mandatory option '{}' not present in config section '{}' or 'global'".format(
option, self.quarantine_name))
self.smtp_host = self.config["smtp_host"]
self.smtp_port = self.config["smtp_port"]
@@ -125,17 +139,20 @@ class EMailNotification(BaseNotification):
try:
self.from_header.format_map(testvars)
except ValueError as e:
raise RuntimeError("error parsing notification_email_from: {}".format(e))
raise RuntimeError(
"error parsing notification_email_from: {}".format(e))
# test-parse subject
try:
self.subject.format_map(testvars)
except ValueError as e:
raise RuntimeError("error parsing notification_email_subject: {}".format(e))
raise RuntimeError(
"error parsing notification_email_subject: {}".format(e))
# read and parse email notification template
try:
self.template = open(self.config["notification_email_template"], "r").read()
self.template = open(
self.config["notification_email_template"], "r").read()
self.template.format_map(testvars)
except IOError as e:
raise RuntimeError("error reading template: {}".format(e))
@@ -143,19 +160,24 @@ class EMailNotification(BaseNotification):
raise RuntimeError("error parsing template: {}".format(e))
# read email replacement image if specified
replacement_img_path = self.config["notification_email_replacement_img"].strip()
replacement_img_path = self.config["notification_email_replacement_img"].strip(
)
if replacement_img_path:
try:
self.replacement_img = MIMEImage(open(replacement_img_path, "rb").read())
self.replacement_img = MIMEImage(
open(replacement_img_path, "rb").read())
except IOError as e:
raise RuntimeError("error reading replacement image: {}".format(e))
raise RuntimeError(
"error reading replacement image: {}".format(e))
else:
self.replacement_img.add_header("Content-ID", "<removed_for_security_reasons>")
self.replacement_img.add_header(
"Content-ID", "<removed_for_security_reasons>")
else:
self.replacement_img = None
# read images to embed if specified
embedded_img_paths = [ p.strip() for p in self.config["notification_email_embedded_imgs"].split(",") if p]
embedded_img_paths = [
p.strip() for p in self.config["notification_email_embedded_imgs"].split(",") if p]
self.embedded_imgs = []
for img_path in embedded_img_paths:
# read image
@@ -167,19 +189,24 @@ class EMailNotification(BaseNotification):
img.add_header("Content-ID", "<{}>".format(basename(img_path)))
self.embedded_imgs.append(img)
def get_text(self, queueid, part):
"Get the mail text in html form from email part."
mimetype = part.get_content_type()
self.logger.debug("{}: extracting content of email text part".format(queueid))
self.logger.debug(
"{}: extracting content of email text part".format(queueid))
text = part.get_payload(decode=True)
if mimetype == EMailNotification._plain_text:
self.logger.debug("{}: content mimetype is {}, converting to {}".format(queueid, mimetype, self._html_text))
text = re.sub(r"^(.*)$", r"\1<br/>\n", text.decode(), flags=re.MULTILINE)
self.logger.debug(
"{}: content mimetype is {}, converting to {}".format(
queueid, mimetype, self._html_text))
text = re.sub(r"^(.*)$", r"\1<br/>\n",
text.decode(), flags=re.MULTILINE)
else:
self.logger.debug("{}: content mimetype is {}".format(queueid, mimetype))
self.logger.debug(
"{}: content mimetype is {}".format(
queueid, mimetype))
return BeautifulSoup(text, "lxml")
@@ -189,12 +216,13 @@ class EMailNotification(BaseNotification):
for part in msg.get_payload():
mimetype = part.get_content_type()
if mimetype in [EMailNotification._plain_text, EMailNotification._html_text]:
if mimetype in [EMailNotification._plain_text,
EMailNotification._html_text]:
soup = self.get_text(queueid, part)
elif mimetype.startswith("multipart"):
soup = self.get_text_multipart(queueid, part, preferred)
if soup != None and mimetype == preferred:
if soup is not None and mimetype == preferred:
break
return soup
@@ -204,13 +232,17 @@ class EMailNotification(BaseNotification):
# completly remove bad elements
for element in soup(EMailNotification._bad_tags):
self.logger.debug("{}: removing dangerous tag '{}' and its content".format(queueid, element.name))
self.logger.debug(
"{}: removing dangerous tag '{}' and its content".format(
queueid, element.name))
element.extract()
# remove not whitelisted elements, but keep their content
for element in soup.find_all(True):
if element.name not in EMailNotification._good_tags:
self.logger.debug("{}: removing tag '{}', keep its content".format(queueid, element.name))
self.logger.debug(
"{}: removing tag '{}', keep its content".format(
queueid, element.name))
element.replaceWithChildren()
# remove not whitelisted attributes
@@ -218,10 +250,14 @@ class EMailNotification(BaseNotification):
for attribute in element.attrs.keys():
if attribute not in EMailNotification.good_attributes:
if element.name == "a" and attribute == "href":
self.logger.debug("{}: setting attribute href to '#' on tag '{}'".format(queueid, element.name))
self.logger.debug(
"{}: setting attribute href to '#' on tag '{}'".format(
queueid, element.name))
element["href"] = "#"
else:
self.logger.debug("{}: removing attribute '{}' from tag '{}'".format(queueid, attribute, element.name))
self.logger.debug(
"{}: removing attribute '{}' from tag '{}'".format(
queueid, attribute, element.name))
del(element.attrs[attribute])
return soup
@@ -230,33 +266,52 @@ class EMailNotification(BaseNotification):
soup = None
mimetype = msg.get_content_type()
self.logger.debug("{}: trying to find text part of email".format(queueid))
if mimetype in [EMailNotification._plain_text, EMailNotification._html_text]:
self.logger.debug(
"{}: trying to find text part of email".format(queueid))
if mimetype in [EMailNotification._plain_text,
EMailNotification._html_text]:
soup = self.get_text(queueid, msg)
elif mimetype.startswith("multipart"):
soup = self.get_text_multipart(queueid, msg)
if soup == None:
self.logger.error("{}: unable to extract text part of email".format(queueid))
if soup is None:
self.logger.error(
"{}: unable to extract text part of email".format(queueid))
text = "ERROR: unable to extract text from email body"
soup = BeautifulSoup(text, "lxml", "UTF-8")
return soup
def notify(self, queueid, quarantine_id, mailfrom, recipients, headers, fp, subgroups=None, named_subgroups=None, synchronous=False):
def notify(self, queueid, quarantine_id, mailfrom, recipients, headers, fp,
subgroups=None, named_subgroups=None, synchronous=False):
"Notify recipients via email."
super(EMailNotification, self).notify(queueid, quarantine_id, mailfrom, recipients, headers, fp, subgroups, named_subgroups, synchronous)
super(
EMailNotification,
self).notify(
queueid,
quarantine_id,
mailfrom,
recipients,
headers,
fp,
subgroups,
named_subgroups,
synchronous)
# extract html text from email
self.logger.debug("{}: extraction email text from original email".format(queueid))
soup = self.get_html_text_part(queueid, email.message_from_binary_file(fp))
self.logger.debug(
"{}: extraction email text from original email".format(queueid))
soup = self.get_html_text_part(
queueid, email.message_from_binary_file(fp))
# replace picture sources
image_replaced = False
if self.replacement_img:
for element in soup("img"):
if "src" in element.attrs.keys():
self.logger.debug("{}: replacing image: {}".format(queueid, element["src"]))
self.logger.debug(
"{}: replacing image: {}".format(
queueid, element["src"]))
element["src"] = "cid:removed_for_security_reasons"
image_replaced = True
@@ -266,24 +321,27 @@ class EMailNotification(BaseNotification):
# sending email notifications
for recipient in recipients:
self.logger.debug("{}: generating notification email for '{}'".format(queueid, recipient))
self.logger.debug(
"{}: generating notification email for '{}'".format(
queueid, recipient))
self.logger.debug("{}: parsing email template".format(queueid))
# generate dict containing all template variables
variables = defaultdict(str,
EMAIL_HTML_TEXT=sanitized_text,
EMAIL_FROM=escape(headers["from"]),
EMAIL_ENVELOPE_FROM=escape(mailfrom),
EMAIL_TO=escape(recipient),
EMAIL_SUBJECT=escape(headers["subject"]),
EMAIL_QUARANTINE_ID=quarantine_id)
EMAIL_HTML_TEXT=sanitized_text,
EMAIL_FROM=escape(headers["from"]),
EMAIL_ENVELOPE_FROM=escape(mailfrom),
EMAIL_TO=escape(recipient),
EMAIL_SUBJECT=escape(headers["subject"]),
EMAIL_QUARANTINE_ID=quarantine_id)
if subgroups:
number = 0
for subgroup in subgroups:
variables["SUBGROUP_{}".format(number)] = escape(subgroup)
if named_subgroups:
for key, value in named_subgroups.items(): named_subgroups[key] = escape(value)
for key, value in named_subgroups.items():
named_subgroups[key] = escape(value)
variables.update(named_subgroups)
# parse template
@@ -297,21 +355,29 @@ class EMailNotification(BaseNotification):
msg.attach(MIMEText(htmltext, "html", 'UTF-8'))
if image_replaced:
self.logger.debug("{}: attaching notification_replacement_img".format(queueid))
self.logger.debug(
"{}: attaching notification_replacement_img".format(queueid))
msg.attach(self.replacement_img)
for img in self.embedded_imgs:
self.logger.debug("{}: attaching imgage".format(queueid))
msg.attach(img)
self.logger.debug("{}: sending notification email to: {}".format(queueid, recipient))
self.logger.debug(
"{}: sending notification email to: {}".format(
queueid, recipient))
if synchronous:
try:
mailer.smtp_send(self.smtp_host, self.smtp_port, self.mailfrom, recipient, msg.as_string())
mailer.smtp_send(self.smtp_host, self.smtp_port,
self.mailfrom, recipient, msg.as_string())
except Exception as e:
raise RuntimeError("error while sending email to '{}': {}".format(recipient, e))
raise RuntimeError(
"error while sending email to '{}': {}".format(
recipient, e))
else:
mailer.sendmail(self.smtp_host, self.smtp_port, queueid, self.mailfrom, recipient, msg.as_string(), "notification email")
mailer.sendmail(self.smtp_host, self.smtp_port, queueid,
self.mailfrom, recipient, msg.as_string(),
"notification email")
# list of notification types and their related notification classes