# PyMod-Milter is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # PyMod-Milter is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with PyMod-Milter. If not, see . # __all__ = [ "CustomLogger", "BaseConfig", "MilterMessage", "replace_illegal_chars"] import logging from email.message import MIMEPart class CustomLogger(logging.LoggerAdapter): def process(self, msg, kwargs): if "name" in self.extra: msg = f"{self.extra['name']}: {msg}" if "qid" in self.extra: msg = f"{self.extra['qid']}: {msg}" if self.logger.getEffectiveLevel() != logging.DEBUG: msg = msg.replace("\n", "").replace("\r", "") return msg, kwargs class BaseConfig: def __init__(self, cfg={}, debug=False): if "name" in cfg: assert isinstance(cfg["name"], str), \ "name: invalid value, should be string" self.name = cfg["name"] else: self.name = __name__ self.logger = logging.getLogger(self.name) if debug: self.loglevel = logging.DEBUG elif "loglevel" in cfg: if isinstance(cfg["loglevel"], int): self.loglevel = cfg["loglevel"] else: level = getattr(logging, cfg["loglevel"].upper(), None) assert isinstance(level, int), \ f"{self.name}: loglevel: invalid value" self.loglevel = level else: self.loglevel = logging.INFO self.logger.setLevel(self.loglevel) # the keys/values in args are used as parameters # to initialize action classes self.args = {} def add_string_arg(self, cfg, args): if isinstance(args, str): args = [args] for arg in args: assert arg in cfg, \ f"{self.name}: mandatory parameter '{arg}' not found" assert isinstance(cfg[arg], str), \ f"{self.name}: {arg}: invalid value, should be string" self.args[arg] = cfg[arg] def add_bool_arg(self, cfg, args): if isinstance(args, str): args = [args] for arg in args: assert arg in cfg, \ f"{self.name}: mandatory parameter '{arg}' not found" assert isinstance(cfg[arg], bool), \ f"{self.name}: {arg}: invalid value, should be bool" self.args[arg] = cfg[arg] def add_int_arg(self, cfg, args): if isinstance(args, str): args = [args] for arg in args: assert arg in cfg, \ f"{self.name}: mandatory parameter '{arg}' not found" assert isinstance(cfg[arg], int), \ f"{self.name}: {arg}: invalid value, should be integer" self.args[arg] = cfg[arg] class MilterMessage(MIMEPart): def replace_header(self, _name, _value, idx=None): _name = _name.lower() counter = 0 for i, (k, v) in zip(range(len(self._headers)), self._headers): if k.lower() == _name: counter += 1 if not idx or counter == idx: self._headers[i] = self.policy.header_store_parse( k, _value) break else: raise KeyError(_name) def remove_header(self, name, idx=None): name = name.lower() newheaders = [] counter = 0 for k, v in self._headers: if k.lower() == name: counter += 1 if counter != idx: newheaders.append((k, v)) else: newheaders.append((k, v)) self._headers = newheaders def replace_illegal_chars(string): """Remove illegal characters from header values.""" return "".join(string.replace("\x00", "").splitlines())