Files
pyquarantine-milter/pymodmilter/modify.py
2021-09-10 04:12:14 +02:00

321 lines
10 KiB
Python

# PyMod-Milter is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyMod-Milter is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyMod-Milter. If not, see <http://www.gnu.org/licenses/>.
#
__all__ = [
"AddHeader",
"ModHeader",
"DelHeader",
"AddDisclaimer",
"RewriteLinks"]
import logging
from base64 import b64encode
from bs4 import BeautifulSoup
from collections import defaultdict
from copy import copy
from email.message import MIMEPart
from pymodmilter import replace_illegal_chars
class AddHeader:
"""Add a mail header field."""
def __init__(self, field, value):
self.field = field
self.value = value
def execute(self, milter, pretend=False,
logger=logging.getLogger(__name__)):
header = f"{self.field}: {self.value}"
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug(f"add_header: {header}")
else:
logger.info(f"add_header: {header[0:70]}")
milter.msg.add_header(self.field, self.value)
if not pretend:
milter.addheader(self.field, self.value)
class ModHeader:
"""Change the value of a mail header field."""
def __init__(self, field, value, search=None):
self.field = field
self.value = value
self.search = search
def execute(self, milter, pretend=False,
logger=logging.getLogger(__name__)):
idx = defaultdict(int)
for i, (field, value) in enumerate(milter.msg.items()):
field_lower = field.lower()
idx[field_lower] += 1
if not self.field.match(field):
continue
new_value = value
if self.search is not None:
new_value = self.search.sub(self.value, value).strip()
else:
new_value = self.value
if not new_value:
logger.warning(
"mod_header: resulting value is empty, "
"skip modification")
continue
if new_value == value:
continue
header = f"{field}: {value}"
new_header = f"{field}: {new_value}"
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug(f"mod_header: {header}: {new_header}")
else:
logger.info(f"mod_header: {header[0:70]}: {new_header[0:70]}")
milter.msg.replace_header(
field, replace_illegal_chars(new_value), idx=idx[field_lower])
if not pretend:
milter.chgheader(field, new_value, idx=idx[field_lower])
class DelHeader:
"""Delete a mail header field."""
def __init__(self, field, value=None):
self.field = field
self.value = value
def execute(self, milter, pretend=False,
logger=logging.getLogger(__name__)):
idx = defaultdict(int)
for field, value in milter.msg.items():
field_lower = field.lower()
idx[field_lower] += 1
if not self.field.match(field):
continue
if self.value is not None and not self.value.search(value):
continue
header = f"{field}: {value}"
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug(f"del_header: {header}")
else:
logger.info(f"del_header: {header[0:70]}")
milter.msg.remove_header(field, idx=idx[field_lower])
if not pretend:
milter.chgheader(field, "", idx=idx[field_lower])
idx[field_lower] -= 1
def _get_body_content(msg, pref):
part = None
content = None
if not msg.is_multipart() and msg.get_content_type() == f"text/{pref}":
part = msg
else:
part = msg.get_body(preferencelist=(pref))
if part is not None:
content = part.get_content()
return (part, content)
def _has_content_before_body_tag(soup):
s = copy(soup)
for element in s.find_all("head") + s.find_all("body"):
element.extract()
if len(s.text.strip()) > 0:
return True
return False
def _inject_body(milter):
if not milter.msg.is_multipart():
milter.msg.make_mixed()
attachments = []
for attachment in milter.msg.iter_attachments():
if "content-disposition" not in attachment:
attachment["Content-Disposition"] = "attachment"
attachments.append(attachment)
milter.msg.clear_content()
milter.msg.set_content("")
milter.msg.add_alternative("", subtype="html")
milter.msg.make_mixed()
for attachment in attachments:
milter.msg.attach(attachment)
def _wrap_message(milter):
attachment = MIMEPart()
attachment.set_content(milter.msg.as_bytes(),
maintype="plain", subtype="text",
disposition="attachment",
filename=f"{milter.qid}.eml",
params={"name": f"{milter.qid}.eml"})
milter.msg.clear_content()
milter.msg.set_content(
"Please see the original email attached.")
milter.msg.add_alternative(
"<html><body>Please see the original email attached.</body></html>",
subtype="html")
milter.msg.make_mixed()
milter.msg.attach(attachment)
class AddDisclaimer:
"""Append or prepend a disclaimer to the mail body."""
def __init__(self, text_template, html_template, action, error_policy):
self.text_template = text_template
self.html_template = html_template
self.action = action
self.error_policy = error_policy
def patch_message_body(self, milter, logger):
text_body, text_content = _get_body_content(milter.msg, "plain")
html_body, html_content = _get_body_content(milter.msg, "html")
if text_content is None and html_content is None:
raise RuntimeError("message does not contain any body part")
if text_content is not None:
logger.info(f"{self.action} text disclaimer")
if self.action == "prepend":
content = f"{self.text_template}{text_content}"
else:
content = f"{text_content}{self.text_template}"
text_body.set_content(
content.encode(), maintype="text", subtype="plain")
text_body.set_param("charset", "UTF-8", header="Content-Type")
del text_body["MIME-Version"]
if html_content is not None:
logger.info(f"{self.action} html disclaimer")
soup = BeautifulSoup(html_content, "html.parser")
body = soup.find('body')
if not body:
body = soup
elif _has_content_before_body_tag(soup):
body = soup
if self.action == "prepend":
body.insert(0, copy(self.html_template))
else:
body.append(self.html_template)
html_body.set_content(
str(soup).encode(), maintype="text", subtype="html")
html_body.set_param("charset", "UTF-8", header="Content-Type")
del html_body["MIME-Version"]
def execute(self, milter, pretend=False,
logger=logging.getLogger(__name__)):
old_headers = milter.msg.items()
try:
try:
self.patch_message_body(milter, logger)
except RuntimeError as e:
logger.info(f"{e}, inject empty plain and html body")
_inject_body(milter)
self.patch_message_body(milter, logger)
except Exception as e:
logger.warning(e)
if self.error_policy == "ignore":
logger.info(
"unable to add disclaimer to message body, "
"ignore error according to policy")
return
elif self.error_policy == "reject":
logger.info(
"unable to add disclaimer to message body, "
"reject message according to policy")
return [
("reject", "Message rejected due to error")]
logger.info("wrap original message in a new message envelope")
try:
_wrap_message(milter)
self.patch_message_body(milter, logger)
except Exception as e:
logger.error(e)
raise Exception(
"unable to wrap message in a new message envelope, "
"give up ...")
if not pretend:
milter.update_headers(old_headers)
milter.replacebody()
class RewriteLinks:
"""Rewrite link targets in the mail html body."""
def __init__(self, repl):
self.repl = repl
def execute(self, milter, pretend=False,
logger=logging.getLogger(__name__)):
html_body, html_content = _get_body_content(milter.msg, "html")
if html_content is not None:
soup = BeautifulSoup(html_content, "html.parser")
rewritten = 0
for link in soup.find_all("a", href=True):
if not link["href"]:
continue
if "{URL_B64}" in self.repl:
url_b64 = b64encode(link["href"].encode()).decode()
target = self.repl.replace("{URL_B64}", url_b64)
else:
target = self.repl
link["href"] = target
rewritten += 1
if rewritten:
logger.info(f"rewrote {rewritten} link(s) in html body")
html_body.set_content(
str(soup).encode(), maintype="text", subtype="html")
html_body.set_param("charset", "UTF-8", header="Content-Type")
del html_body["MIME-Version"]
if not pretend:
milter.replacebody()