Files
pyquarantine-milter/pymodmilter/modify.py

358 lines
11 KiB
Python

# PyMod-Milter is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyMod-Milter is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyMod-Milter. If not, see <http://www.gnu.org/licenses/>.
#
__all__ = [
"AddHeader",
"ModHeader",
"DelHeader",
"AddDisclaimer",
"RewriteLinks"]
import logging
import re
from base64 import b64encode
from bs4 import BeautifulSoup
from collections import defaultdict
from copy import copy
from email.message import MIMEPart
from email.policy import SMTPUTF8
from pymodmilter import replace_illegal_chars
class AddHeader:
"""Add a mail header field."""
_headersonly = True
def __init__(self, field, value):
self.field = field
self.value = value
def execute(self, milter, pretend=False,
logger=logging.getLogger(__name__)):
header = f"{self.field}: {self.value}"
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug(f"add_header: {header}")
else:
logger.info(f"add_header: {header[0:70]}")
milter.msg.add_header(self.field, self.value)
if not pretend:
milter.addheader(self.field, self.value)
class ModHeader:
"""Change the value of a mail header field."""
_headersonly = True
def __init__(self, field, value, search=None):
self.value = value
try:
self.field = re.compile(field, re.IGNORECASE)
if search is not None:
self.search = re.compile(
search, re.MULTILINE + re.DOTALL + re.IGNORECASE)
else:
self.search = search
except re.error as e:
raise RuntimeError(e)
def execute(self, milter, pretend=False,
logger=logging.getLogger(__name__)):
idx = defaultdict(int)
for i, (field, value) in enumerate(milter.msg.items()):
field_lower = field.lower()
idx[field_lower] += 1
if not self.field.match(field):
continue
new_value = value
if self.search is not None:
new_value = self.search.sub(self.value, value).strip()
else:
new_value = self.value
if not new_value:
logger.warning(
"mod_header: resulting value is empty, "
"skip modification")
continue
if new_value == value:
continue
header = f"{field}: {value}"
new_header = f"{field}: {new_value}"
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug(f"mod_header: {header}: {new_header}")
else:
logger.info(f"mod_header: {header[0:70]}: {new_header[0:70]}")
milter.msg.replace_header(
field, replace_illegal_chars(new_value), idx=idx[field_lower])
if not pretend:
milter.chgheader(field, new_value, idx=idx[field_lower])
class DelHeader:
"""Delete a mail header field."""
_headersonly = True
def __init__(self, field, value=None):
try:
self.field = re.compile(field, re.IGNORECASE)
if value is not None:
self.value = re.compile(
value, re.MULTILINE + re.DOTALL + re.IGNORECASE)
else:
self.value = value
except re.error as e:
raise RuntimeError(e)
def execute(self, milter, pretend=False,
logger=logging.getLogger(__name__)):
idx = defaultdict(int)
for field, value in milter.msg.items():
field_lower = field.lower()
idx[field_lower] += 1
if not self.field.match(field):
continue
if self.value is not None and not self.value.search(value):
continue
header = f"{field}: {value}"
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug(f"del_header: {header}")
else:
logger.info(f"del_header: {header[0:70]}")
milter.msg.remove_header(field, idx=idx[field_lower])
if not pretend:
milter.chgheader(field, "", idx=idx[field_lower])
idx[field_lower] -= 1
def _get_body_content(msg, pref):
part = None
content = None
if not msg.is_multipart() and msg.get_content_type() == f"text/{pref}":
part = msg
else:
part = msg.get_body(preferencelist=(pref))
if part is not None:
content = part.get_content()
return (part, content)
def _has_content_before_body_tag(soup):
s = copy(soup)
for element in s.find_all("head") + s.find_all("body"):
element.extract()
if len(s.text.strip()) > 0:
return True
return False
def _inject_body(milter):
if not milter.msg.is_multipart():
milter.msg.make_mixed()
attachments = []
for attachment in milter.msg.iter_attachments():
if "content-disposition" not in attachment:
attachment["Content-Disposition"] = "attachment"
attachments.append(attachment)
milter.msg.clear_content()
milter.msg.set_content("")
milter.msg.add_alternative("", subtype="html")
milter.msg.make_mixed()
for attachment in attachments:
milter.msg.attach(attachment)
def _wrap_message(milter):
attachment = MIMEPart(policy=SMTPUTF8)
attachment.set_content(milter.msg.as_bytes(),
maintype="plain", subtype="text",
disposition="attachment",
filename=f"{milter.qid}.eml",
params={"name": f"{milter.qid}.eml"})
milter.msg.clear_content()
milter.msg.set_content(
"Please see the original email attached.")
milter.msg.add_alternative(
"<html><body>Please see the original email attached.</body></html>",
subtype="html")
milter.msg.make_mixed()
milter.msg.attach(attachment)
class AddDisclaimer:
"""Append or prepend a disclaimer to the mail body."""
_headersonly = False
def __init__(self, text_template, html_template, action, error_policy):
try:
with open(text_template, "r") as f:
self.text_template = f.read()
with open(html_template, "r") as f:
html = BeautifulSoup(f.read(), "html.parser")
except IOError as e:
raise RuntimeError(e)
body = html.find('body')
self.html_template = body or html
self.action = action
self.error_policy = error_policy
def patch_message_body(self, milter, logger):
text_body, text_content = _get_body_content(milter.msg, "plain")
html_body, html_content = _get_body_content(milter.msg, "html")
if text_content is None and html_content is None:
raise RuntimeError("message does not contain any body part")
if text_content is not None:
logger.info(f"{self.action} text disclaimer")
if self.action == "prepend":
content = f"{self.text_template}{text_content}"
else:
content = f"{text_content}{self.text_template}"
text_body.set_content(
content.encode(), maintype="text", subtype="plain")
text_body.set_param("charset", "UTF-8", header="Content-Type")
del text_body["MIME-Version"]
if html_content is not None:
logger.info(f"{self.action} html disclaimer")
soup = BeautifulSoup(html_content, "html.parser")
body = soup.find('body')
if not body:
body = soup
elif _has_content_before_body_tag(soup):
body = soup
if self.action == "prepend":
body.insert(0, copy(self.html_template))
else:
body.append(self.html_template)
html_body.set_content(
str(soup).encode(), maintype="text", subtype="html")
html_body.set_param("charset", "UTF-8", header="Content-Type")
del html_body["MIME-Version"]
def execute(self, milter, pretend=False,
logger=logging.getLogger(__name__)):
old_headers = milter.msg.items()
try:
try:
self.patch_message_body(milter, logger)
except RuntimeError as e:
logger.info(f"{e}, inject empty plain and html body")
_inject_body(milter)
self.patch_message_body(milter, logger)
except Exception as e:
logger.warning(e)
if self.error_policy == "ignore":
logger.info(
"unable to add disclaimer to message body, "
"ignore error according to policy")
return
elif self.error_policy == "reject":
logger.info(
"unable to add disclaimer to message body, "
"reject message according to policy")
return [
("reject", "Message rejected due to error")]
logger.info("wrap original message in a new message envelope")
try:
_wrap_message(milter)
self.patch_message_body(milter, logger)
except Exception as e:
logger.error(e)
raise Exception(
"unable to wrap message in a new message envelope, "
"give up ...")
if not pretend:
milter.update_headers(old_headers)
milter.replacebody()
class RewriteLinks:
"""Rewrite link targets in the mail html body."""
_headersonly = False
def __init__(self, repl):
self.repl = repl
def execute(self, milter, pretend=False,
logger=logging.getLogger(__name__)):
html_body, html_content = _get_body_content(milter.msg, "html")
if html_content is not None:
soup = BeautifulSoup(html_content, "html.parser")
rewritten = 0
for link in soup.find_all("a", href=True):
if not link["href"]:
continue
if "{URL_B64}" in self.repl:
url_b64 = b64encode(link["href"].encode()).decode()
target = self.repl.replace("{URL_B64}", url_b64)
else:
target = self.repl
link["href"] = target
rewritten += 1
if rewritten:
logger.info(f"rewrote {rewritten} link(s) in html body")
html_body.set_content(
str(soup).encode(), maintype="text", subtype="html")
html_body.set_param("charset", "UTF-8", header="Content-Type")
del html_body["MIME-Version"]
if not pretend:
milter.replacebody()