shorten file names
This commit is contained in:
320
pymodmilter/modify.py
Normal file
320
pymodmilter/modify.py
Normal file
@@ -0,0 +1,320 @@
|
||||
# PyMod-Milter is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# PyMod-Milter is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with PyMod-Milter. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
__all__ = [
|
||||
"AddHeader",
|
||||
"ModHeader",
|
||||
"DelHeader",
|
||||
"AddDisclaimer",
|
||||
"RewriteLinks"]
|
||||
|
||||
import logging
|
||||
|
||||
from base64 import b64encode
|
||||
from bs4 import BeautifulSoup
|
||||
from collections import defaultdict
|
||||
from copy import copy
|
||||
from email.message import MIMEPart
|
||||
|
||||
from pymodmilter import replace_illegal_chars
|
||||
|
||||
|
||||
class AddHeader:
|
||||
"""Add a mail header field."""
|
||||
def __init__(self, field, value):
|
||||
self.field = field
|
||||
self.value = value
|
||||
|
||||
def execute(self, milter, pretend=False,
|
||||
logger=logging.getLogger(__name__)):
|
||||
header = f"{self.field}: {self.value}"
|
||||
if logger.getEffectiveLevel() == logging.DEBUG:
|
||||
logger.debug(f"add_header: {header}")
|
||||
else:
|
||||
logger.info(f"add_header: {header[0:70]}")
|
||||
|
||||
milter.msg.add_header(self.field, self.value)
|
||||
|
||||
if not pretend:
|
||||
milter.addheader(self.field, self.value)
|
||||
|
||||
|
||||
class ModHeader:
|
||||
"""Change the value of a mail header field."""
|
||||
def __init__(self, field, value, search=None):
|
||||
self.field = field
|
||||
self.value = value
|
||||
self.search = search
|
||||
|
||||
def execute(self, milter, pretend=False,
|
||||
logger=logging.getLogger(__name__)):
|
||||
idx = defaultdict(int)
|
||||
|
||||
for i, (field, value) in enumerate(milter.msg.items()):
|
||||
field_lower = field.lower()
|
||||
idx[field_lower] += 1
|
||||
|
||||
if not self.field.match(field):
|
||||
continue
|
||||
|
||||
new_value = value
|
||||
if self.search is not None:
|
||||
new_value = self.search.sub(self.value, value).strip()
|
||||
else:
|
||||
new_value = self.value
|
||||
|
||||
if not new_value:
|
||||
logger.warning(
|
||||
"mod_header: resulting value is empty, "
|
||||
"skip modification")
|
||||
continue
|
||||
|
||||
if new_value == value:
|
||||
continue
|
||||
|
||||
header = f"{field}: {value}"
|
||||
new_header = f"{field}: {new_value}"
|
||||
|
||||
if logger.getEffectiveLevel() == logging.DEBUG:
|
||||
logger.debug(f"mod_header: {header}: {new_header}")
|
||||
else:
|
||||
logger.info(f"mod_header: {header[0:70]}: {new_header[0:70]}")
|
||||
|
||||
milter.msg.replace_header(
|
||||
field, replace_illegal_chars(new_value), idx=idx[field_lower])
|
||||
|
||||
if not pretend:
|
||||
milter.chgheader(field, new_value, idx=idx[field_lower])
|
||||
|
||||
|
||||
class DelHeader:
|
||||
"""Delete a mail header field."""
|
||||
def __init__(self, field, value=None):
|
||||
self.field = field
|
||||
self.value = value
|
||||
|
||||
def execute(self, milter, pretend=False,
|
||||
logger=logging.getLogger(__name__)):
|
||||
idx = defaultdict(int)
|
||||
|
||||
for field, value in milter.msg.items():
|
||||
field_lower = field.lower()
|
||||
idx[field_lower] += 1
|
||||
|
||||
if not self.field.match(field):
|
||||
continue
|
||||
|
||||
if self.value is not None and not self.value.search(value):
|
||||
continue
|
||||
|
||||
header = f"{field}: {value}"
|
||||
if logger.getEffectiveLevel() == logging.DEBUG:
|
||||
logger.debug(f"del_header: {header}")
|
||||
else:
|
||||
logger.info(f"del_header: {header[0:70]}")
|
||||
milter.msg.remove_header(field, idx=idx[field_lower])
|
||||
|
||||
if not pretend:
|
||||
milter.chgheader(field, "", idx=idx[field_lower])
|
||||
|
||||
idx[field_lower] -= 1
|
||||
|
||||
|
||||
def _get_body_content(msg, pref):
|
||||
part = None
|
||||
content = None
|
||||
if not msg.is_multipart() and msg.get_content_type() == f"text/{pref}":
|
||||
part = msg
|
||||
else:
|
||||
part = msg.get_body(preferencelist=(pref))
|
||||
|
||||
if part is not None:
|
||||
content = part.get_content()
|
||||
|
||||
return (part, content)
|
||||
|
||||
|
||||
def _has_content_before_body_tag(soup):
|
||||
s = copy(soup)
|
||||
for element in s.find_all("head") + s.find_all("body"):
|
||||
element.extract()
|
||||
|
||||
if len(s.text.strip()) > 0:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _inject_body(milter):
|
||||
if not milter.msg.is_multipart():
|
||||
milter.msg.make_mixed()
|
||||
|
||||
attachments = []
|
||||
for attachment in milter.msg.iter_attachments():
|
||||
if "content-disposition" not in attachment:
|
||||
attachment["Content-Disposition"] = "attachment"
|
||||
attachments.append(attachment)
|
||||
|
||||
milter.msg.clear_content()
|
||||
milter.msg.set_content("")
|
||||
milter.msg.add_alternative("", subtype="html")
|
||||
milter.msg.make_mixed()
|
||||
|
||||
for attachment in attachments:
|
||||
milter.msg.attach(attachment)
|
||||
|
||||
|
||||
def _wrap_message(milter):
|
||||
attachment = MIMEPart()
|
||||
attachment.set_content(milter.msg.as_bytes(),
|
||||
maintype="plain", subtype="text",
|
||||
disposition="attachment",
|
||||
filename=f"{milter.qid}.eml",
|
||||
params={"name": f"{milter.qid}.eml"})
|
||||
|
||||
milter.msg.clear_content()
|
||||
milter.msg.set_content(
|
||||
"Please see the original email attached.")
|
||||
milter.msg.add_alternative(
|
||||
"<html><body>Please see the original email attached.</body></html>",
|
||||
subtype="html")
|
||||
milter.msg.make_mixed()
|
||||
milter.msg.attach(attachment)
|
||||
|
||||
|
||||
class AddDisclaimer:
|
||||
"""Append or prepend a disclaimer to the mail body."""
|
||||
def __init__(self, text_template, html_template, action, error_policy):
|
||||
self.text_template = text_template
|
||||
self.html_template = html_template
|
||||
self.action = action
|
||||
self.error_policy = error_policy
|
||||
|
||||
def patch_message_body(self, milter, logger):
|
||||
text_body, text_content = _get_body_content(milter.msg, "plain")
|
||||
html_body, html_content = _get_body_content(milter.msg, "html")
|
||||
|
||||
if text_content is None and html_content is None:
|
||||
raise RuntimeError("message does not contain any body part")
|
||||
|
||||
if text_content is not None:
|
||||
logger.info(f"{self.action} text disclaimer")
|
||||
|
||||
if self.action == "prepend":
|
||||
content = f"{self.text_template}{text_content}"
|
||||
else:
|
||||
content = f"{text_content}{self.text_template}"
|
||||
|
||||
text_body.set_content(
|
||||
content.encode(), maintype="text", subtype="plain")
|
||||
text_body.set_param("charset", "UTF-8", header="Content-Type")
|
||||
del text_body["MIME-Version"]
|
||||
|
||||
if html_content is not None:
|
||||
logger.info(f"{self.action} html disclaimer")
|
||||
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
|
||||
body = soup.find('body')
|
||||
if not body:
|
||||
body = soup
|
||||
elif _has_content_before_body_tag(soup):
|
||||
body = soup
|
||||
|
||||
if self.action == "prepend":
|
||||
body.insert(0, copy(self.html_template))
|
||||
else:
|
||||
body.append(self.html_template)
|
||||
|
||||
html_body.set_content(
|
||||
str(soup).encode(), maintype="text", subtype="html")
|
||||
html_body.set_param("charset", "UTF-8", header="Content-Type")
|
||||
del html_body["MIME-Version"]
|
||||
|
||||
def execute(self, milter, pretend=False,
|
||||
logger=logging.getLogger(__name__)):
|
||||
old_headers = milter.msg.items()
|
||||
|
||||
try:
|
||||
try:
|
||||
self.patch_message_body(milter, logger)
|
||||
except RuntimeError as e:
|
||||
logger.info(f"{e}, inject empty plain and html body")
|
||||
_inject_body(milter)
|
||||
self.patch_message_body(milter, logger)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(e)
|
||||
if self.error_policy == "ignore":
|
||||
logger.info(
|
||||
"unable to add disclaimer to message body, "
|
||||
"ignore error according to policy")
|
||||
return
|
||||
elif self.error_policy == "reject":
|
||||
logger.info(
|
||||
"unable to add disclaimer to message body, "
|
||||
"reject message according to policy")
|
||||
return [
|
||||
("reject", "Message rejected due to error")]
|
||||
|
||||
logger.info("wrap original message in a new message envelope")
|
||||
try:
|
||||
_wrap_message(milter)
|
||||
self.patch_message_body(milter, logger)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
raise Exception(
|
||||
"unable to wrap message in a new message envelope, "
|
||||
"give up ...")
|
||||
|
||||
if not pretend:
|
||||
milter.update_headers(old_headers)
|
||||
milter.replacebody()
|
||||
|
||||
|
||||
class RewriteLinks:
|
||||
"""Rewrite link targets in the mail html body."""
|
||||
def __init__(self, repl):
|
||||
self.repl = repl
|
||||
|
||||
def execute(self, milter, pretend=False,
|
||||
logger=logging.getLogger(__name__)):
|
||||
html_body, html_content = _get_body_content(milter.msg, "html")
|
||||
if html_content is not None:
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
|
||||
rewritten = 0
|
||||
for link in soup.find_all("a", href=True):
|
||||
if not link["href"]:
|
||||
continue
|
||||
|
||||
if "{URL_B64}" in self.repl:
|
||||
url_b64 = b64encode(link["href"].encode()).decode()
|
||||
target = self.repl.replace("{URL_B64}", url_b64)
|
||||
else:
|
||||
target = self.repl
|
||||
|
||||
link["href"] = target
|
||||
rewritten += 1
|
||||
|
||||
if rewritten:
|
||||
logger.info(f"rewrote {rewritten} link(s) in html body")
|
||||
|
||||
html_body.set_content(
|
||||
str(soup).encode(), maintype="text", subtype="html")
|
||||
html_body.set_param("charset", "UTF-8", header="Content-Type")
|
||||
del html_body["MIME-Version"]
|
||||
|
||||
if not pretend:
|
||||
milter.replacebody()
|
||||
Reference in New Issue
Block a user