diff --git a/pyheadermilter/__init__.py b/pyheadermilter/__init__.py index 56ac12d..434eae3 100644 --- a/pyheadermilter/__init__.py +++ b/pyheadermilter/__init__.py @@ -2,18 +2,17 @@ # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. -# +# # PyHeader-Milter is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# +# # You should have received a copy of the GNU General Public License # along with PyHeader-Milter. If not, see . # __all__ = ["HeaderRule", "HeaderMilter"] -name = "pyheadermilter" import Milter import argparse @@ -34,7 +33,8 @@ from netaddr import IPAddress, IPNetwork, AddrFormatError class HeaderRule: """HeaderRule to implement a rule to apply on e-mail headers.""" - def __init__(self, name, action, header, search="", value="", ignore_hosts=[], ignore_envfrom=None, only_hosts=[], log=True): + def __init__(self, name, action, header, search="", value="", + ignore_hosts=[], ignore_envfrom=None, only_hosts=[], log=True): self.logger = logging.getLogger(__name__) self.name = name self.action = action @@ -49,38 +49,46 @@ class HeaderRule: if action in ["del", "mod"]: # compile header regex try: - self.header = re.compile(header, re.MULTILINE + re.DOTALL + re.IGNORECASE) + self.header = re.compile( + header, re.MULTILINE + re.DOTALL + re.IGNORECASE) except re.error as e: - raise RuntimeError("unable to parse option 'header' of rule '{}': {}".format(name, e)) + raise RuntimeError( + f"unable to parse option 'header' of rule '{name}': {e}") if action == "mod": # compile search regex try: - self.search = re.compile(search, re.MULTILINE + re.DOTALL + re.IGNORECASE) + self.search = re.compile( + search, re.MULTILINE + re.DOTALL + re.IGNORECASE) except re.error as e: - raise RuntimeError("unable to parse option 'search' of rule '{}': {}".format(name, e)) + raise RuntimeError( + f"unable to parse option 'search' of rule '{name}': {e}") if action in ["add", "mod"] and not value: raise RuntimeError("value of option 'value' is empty") - # replace strings in ignore_hosts and only_hosts with IPNetwork instances + # replace strings in ignore_hosts and only_hosts with IPNetwork + # instances try: for index, ignore in enumerate(ignore_hosts): self.ignore_hosts[index] = IPNetwork(ignore) except AddrFormatError as e: - raise RuntimeError("unable to parse option 'ignore_hosts' of rule '{}': {}".format(name, e)) + raise RuntimeError( + f"unable to parse option 'ignore_hosts' of rule '{name}': {e}") if self.ignore_envfrom: try: self.ignore_envfrom = re.compile(ignore_envfrom, re.IGNORECASE) except re.error as e: - raise RuntimeError("unable to parse option 'ignore_envfrom' of rule '{}': {}".format(name, e)) + raise RuntimeError( + f"unable to parse option 'ignore_envfrom' of rule '{name}': {e}") try: for index, only in enumerate(only_hosts): self.only_hosts[index] = IPNetwork(only) except AddrFormatError as e: - raise RuntimeError("unable to parse option 'only_hosts' of rule '{}': {}".format(name, e)) + raise RuntimeError( + f"unable to parse option 'only_hosts' of rule '{name}': {e}") def ignore_host(self, host): ip = IPAddress(host) @@ -101,7 +109,7 @@ class HeaderRule: break if ignore: - self.logger.debug("host {} is ignored by rule {}".format(host, self.name)) + self.logger.debug(f"host {host} is ignored by rule {self.name}") return ignore def ignore_from(self, envfrom): @@ -110,7 +118,8 @@ class HeaderRule: if self.ignore_envfrom: if self.ignore_envfrom.search(envfrom): ignore = True - self.logger.debug("envelope-from {} is ignored by rule {}".format(envfrom, self.name)) + self.logger.debug( + f"envelope-from {envfrom} is ignored by rule {self.name}") return ignore def execute(self, headers): @@ -123,25 +132,26 @@ class HeaderRule: occurrences = {} # iterate headers - for name, hdr in headers: - # keep track of the occurrence of each header, needed by Milter.Base.chgheader + for name, header in headers: + # keep track of the occurrence of each header, needed by + # Milter.Base.chgheader if name not in occurrences.keys(): occurrences[name] = 1 else: occurrences[name] += 1 # check if header line matches regex - value = hdr[name] - if self.header.search("{}: {}".format(name, value)): + value = header[name] + if self.header.search(f"{name}: {value}"): if self.action == "del": # set an empty value to delete the header new_value = "" else: new_value = self.search.sub(self.value, value) if value != new_value: - hdr = EmailMessage(policy=default_policy) - hdr.add_header(name, new_value) - modified.append((name, hdr, index, occurrences[name])) + header = EmailMessage(policy=default_policy) + header.add_header(name, new_value) + modified.append((name, header, index, occurrences[name])) index += 1 return modified @@ -161,7 +171,8 @@ class HeaderMilter(Milter.Base): self.rules = HeaderMilter._rules.copy() def connect(self, IPname, family, hostaddr): - self.logger.debug("accepted milter connection from {} port {}".format(*hostaddr)) + self.logger.debug( + f"accepted milter connection from {hostaddr[0]} port {hostaddr[1]}") ip = IPAddress(hostaddr[0]) # remove rules which ignore this host @@ -170,9 +181,10 @@ class HeaderMilter(Milter.Base): self.rules.remove(rule) if not self.rules: - self.logger.debug("host {} is ignored by all rules, skip further processing".format(hostaddr[0])) + self.logger.debug( + f"host {hostaddr[0]} is ignored by all rules, skip further processing") return Milter.ACCEPT - return Milter.CONTINUE + return Milter.CONTINUE def envfrom(self, mailfrom, *str): mailfrom = "@".join(parse_addr(mailfrom)).lower() @@ -181,14 +193,15 @@ class HeaderMilter(Milter.Base): self.rules.remove(rule) if not self.rules: - self.logger.debug("mail from {} is ignored by all rules, skip further processing".format(mailfrom)) + self.logger.debug( + f"mail from {mailfrom} is ignored by all rules, skip further processing") return Milter.ACCEPT - return Milter.CONTINUE + return Milter.CONTINUE @Milter.noreply def data(self): - self.queueid = self.getsymval('i') - self.logger.debug("{}: received queue-id from MTA".format(self.queueid)) + self.qid = self.getsymval('i') + self.logger.debug(f"{self.qid}: received queue-id from MTA") self.headers = [] return Milter.CONTINUE @@ -196,64 +209,89 @@ class HeaderMilter(Milter.Base): def header(self, name, value): # remove surrogates from value value = value.encode(errors="surrogateescape").decode(errors="replace") - self.logger.debug(f"{self.queueid}: received header: {name}: {value}") - hdr = HeaderParser(policy=default_policy).parsestr(f"{name}: {value}") - self.logger.debug(f"{self.queueid}: decoded header: {name}: {hdr[name]}") - self.headers.append((name, hdr)) + self.logger.debug(f"{self.qid}: received header: {name}: {value}") + header = HeaderParser( + policy=default_policy).parsestr(f"{name}: {value}") + self.logger.debug( + f"{self.qid}: decoded header: {name}: {header[name]}") + self.headers.append((name, header)) return Milter.CONTINUE def eom(self): try: for rule in self.rules: - self.logger.debug("{}: executing rule '{}'".format(self.queueid, rule.name)) + self.logger.debug(f"{self.qid}: executing rule '{rule.name}'") modified = rule.execute(self.headers) - for name, hdr, index, occurrence in modified: - value = hdr[name] + for name, header, index, occurrence in modified: + value = header[name] # remove illegal characters, pymilter does not like them - encoded_value = hdr.as_string().replace("\r", "").replace("\n", "").replace("\x00", "").split(":", 1)[1].strip() - mod_header = "{}: {}".format(name, value) + enc_value = header.as_string().replace( + "\r", "").replace( + "\n", "").replace( + "\x00", "").split( + ":", 1)[1].strip() + mod_header = f"{name}: {value}" if rule.action == "add": if rule.log: - self.logger.info("{}: add: header: {}".format(self.queueid, mod_header[0:70])) + self.logger.info( + f"{self.qid}: add: header: {mod_header[0:70]}") else: - self.logger.debug("{}: add: header: {}".format(self.queueid, mod_header)) - self.headers.insert(0, (name, hdr)) - self.addheader(name, encoded_value, 1) + self.logger.debug( + f"{self.qid}: add: header: {mod_header}") + self.headers.insert(0, (name, header)) + self.addheader(name, enc_value, 1) else: if rule.action == "mod": old_value = self.headers[index][1][name] - old_header = "{}: {}".format(name, old_value) + old_header = f"{name}: {old_value}" if rule.log: - self.logger.info("{}: modify: header: {}: {}".format( - self.queueid, old_header[0:70], mod_header[0:70])) + self.logger.info(f"{self.qid}: modify: header: {old_header[0:70]}: {mod_header[0:70]}") else: - self.logger.debug("{}: modify: header (occ. {}): {}: {}".format( - self.queueid, occurrence, old_header, mod_header)) - self.headers[index] = (name, hdr) + self.logger.debug( + f"{self.qid}: modify: header (occ. {occurrence}): {old_header}: {mod_header}") + self.headers[index] = (name, header) elif rule.action == "del": if rule.log: - self.logger.info("{}: delete: header: {}".format(self.queueid, mod_header[0:70])) + self.logger.info( + f"{self.qid}: delete: header: {mod_header[0:70]}") else: - self.logger.debug("{}: delete: header (occ. {}): {}".format(self.queueid, occurrence, mod_header)) + self.logger.debug( + f"{self.qid}: delete: header (occ. {occurrence}): {mod_header}") del self.headers[index] - self.chgheader(name, occurrence, encoded_value) + self.chgheader(name, occurrence, enc_value) return Milter.ACCEPT except Exception as e: - self.logger.exception("an exception occured in eom function: {}".format(e)) + self.logger.exception(f"an exception occured in eom function: {e}") return Milter.TEMPFAIL def main(): "Run PyHeader-Milter." # parse command line - parser = argparse.ArgumentParser(description="PyHeader milter daemon", - formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=45, width=140)) - parser.add_argument("-c", "--config", help="Config file to read.", default="/etc/pyheader-milter.conf") - parser.add_argument("-s", "--socket", help="Socket used to communicate with the MTA.", required=True) - parser.add_argument("-d", "--debug", help="Log debugging messages.", action="store_true") - parser.add_argument("-t", "--test", help="Check configuration.", action="store_true") + parser = argparse.ArgumentParser( + description="PyHeader milter daemon", + formatter_class=lambda prog: argparse.HelpFormatter( + prog, max_help_position=45, width=140)) + parser.add_argument( + "-c", "--config", help="Config file to read.", + default="/etc/pyheader-milter.conf") + parser.add_argument( + "-s", + "--socket", + help="Socket used to communicate with the MTA.", + required=True) + parser.add_argument( + "-d", + "--debug", + help="Log debugging messages.", + action="store_true") + parser.add_argument( + "-t", + "--test", + help="Check configuration.", + action="store_true") args = parser.parse_args() # setup logging @@ -262,8 +300,8 @@ def main(): syslog_name = logname if args.debug: loglevel = logging.DEBUG - logname = "{}[%(name)s]".format(logname) - syslog_name = "{}: [%(name)s] %(levelname)s".format(syslog_name) + logname = f"{logname}[%(name)s]" + syslog_name = f"{syslog_name}: [%(name)s] %(levelname)s" # set config files for milter class root_logger = logging.getLogger() @@ -272,11 +310,11 @@ def main(): # setup console log stdouthandler = logging.StreamHandler(sys.stdout) stdouthandler.setLevel(logging.DEBUG) - formatter = logging.Formatter("%(message)s".format(logname)) + formatter = logging.Formatter("%(message)s") stdouthandler.setFormatter(formatter) root_logger.addHandler(stdouthandler) logger = logging.getLogger(__name__) - + try: # read config file parser = configparser.ConfigParser() @@ -285,21 +323,25 @@ def main(): # check if mandatory config options in global section are present if "global" not in parser.sections(): - raise RuntimeError("mandatory section 'global' not present in config file") + raise RuntimeError( + "mandatory section 'global' not present in config file") for option in ["rules"]: if not parser.has_option("global", option): - raise RuntimeError("mandatory option '{}' not present in config section 'global'".format(option)) + raise RuntimeError( + f"mandatory option '{option}' not present in config section 'global'") # read global config section global_config = dict(parser.items("global")) # read active rules - active_rules = [ r.strip() for r in global_config["rules"].split(",") ] + active_rules = [r.strip() for r in global_config["rules"].split(",")] if len(active_rules) != len(set(active_rules)): - raise RuntimeError("at least one rule is specified multiple times in 'rules' option") + raise RuntimeError( + "at least one rule is specified multiple times in 'rules' option") if "global" in active_rules: active_rules.remove("global") - logger.warning("removed illegal rule name 'global' from list of active rules") + logger.warning( + "removed illegal rule name 'global' from list of active rules") if not active_rules: raise RuntimeError("no rules configured") @@ -307,9 +349,10 @@ def main(): rules = [] # iterate active rules for rule_name in active_rules: - # check if config section exists + # check if config section exists if rule_name not in parser.sections(): - raise RuntimeError("config section '{}' does not exist".format(rule_name)) + raise RuntimeError( + f"config section '{rule_name}' does not exist") config = dict(parser.items(rule_name)) # check if mandatory option action is present in config @@ -318,10 +361,12 @@ def main(): option in global_config.keys(): config[option] = global_config[option] if option not in config.keys(): - raise RuntimeError("mandatory option '{}' not specified for rule '{}'".format(option, rule_name)) + raise RuntimeError( + f"mandatory option '{option}' not specified for rule '{rule_name}'") config["action"] = config["action"].lower() if config["action"] not in ["add", "del", "mod"]: - raise RuntimeError("invalid action specified for rule '{}'".format(rule_name)) + raise RuntimeError( + f"invalid action specified for rule '{rule_name}'") # check if mandatory options are present in config mandatory = ["header"] @@ -334,7 +379,8 @@ def main(): option in global_config.keys(): config[option] = global_config[option] if option not in config.keys(): - raise RuntimeError("mandatory option '{}' not specified for rule '{}'".format(option, rule_name)) + raise RuntimeError( + f"mandatory option '{option}' not specified for rule '{rule_name}'") # check if optional config options are present in config defaults = { @@ -350,19 +396,22 @@ def main(): if option not in config.keys(): config[option] = defaults[option] if config["ignore_hosts"]: - config["ignore_hosts"] = [ h.strip() for h in config["ignore_hosts"].split(",") ] + config["ignore_hosts"] = [h.strip() + for h in config["ignore_hosts"].split(",")] if config["only_hosts"]: - config["only_hosts"] = [ h.strip() for h in config["only_hosts"].split(",") ] + config["only_hosts"] = [h.strip() + for h in config["only_hosts"].split(",")] config["log"] = config["log"].lower() if config["log"] == "true": config["log"] = True elif config["log"] == "false": config["log"] = False else: - raise RuntimeError("invalid value specified for option 'log' for rule '{}'".format(rule_name)) + raise RuntimeError( + f"invalid value specified for option 'log' for rule '{rule_name}'") # add rule - logging.debug("adding rule '{}'".format(rule_name)) + logging.debug(f"adding rule '{rule_name}'") rules.append(HeaderRule(name=rule_name, **config)) except RuntimeError as e: @@ -374,13 +423,16 @@ def main(): sys.exit(0) # change log format for runtime - formatter = logging.Formatter("%(asctime)s {}: [%(levelname)s] %(message)s".format(logname), datefmt="%Y-%m-%d %H:%M:%S") + formatter = logging.Formatter( + f"%(asctime)s {logname}: [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S") stdouthandler.setFormatter(formatter) # setup syslog - sysloghandler = logging.handlers.SysLogHandler(address="/dev/log", facility=logging.handlers.SysLogHandler.LOG_MAIL) + sysloghandler = logging.handlers.SysLogHandler( + address="/dev/log", facility=logging.handlers.SysLogHandler.LOG_MAIL) sysloghandler.setLevel(loglevel) - formatter = logging.Formatter("{}: %(message)s".format(syslog_name)) + formatter = logging.Formatter(f"{syslog_name}: %(message)s") sysloghandler.setFormatter(formatter) root_logger.addHandler(sysloghandler)