# PyQuarantine-Milter is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # PyQuarantine-Milter is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with PyQuarantineMilter. If not, see . # import Milter import configparser import logging import os import re from Milter.utils import parse_addr from collections import defaultdict from email.charset import Charset from email.header import Header, decode_header from io import BytesIO from itertools import groupby from netaddr import IPAddress, IPNetwork, AddrFormatError from pyquarantine import mailer from pyquarantine import notifications from pyquarantine import storages from pyquarantine import whitelists __all__ = [ "make_header", "Quarantine", "QuarantineMilter", "setup_milter", "reload_config", "cli", "mailer", "notifications", "storages", "run", "whitelists"] __version__ = "1.0.6" def make_header(decoded_seq, maxlinelen=None, header_name=None, continuation_ws=' ', errors='strict'): """Create a Header from a sequence of pairs as returned by decode_header() decode_header() takes a header value string and returns a sequence of pairs of the format (decoded_string, charset) where charset is the string name of the character set. This function takes one of those sequence of pairs and returns a Header instance. Optional maxlinelen, header_name, and continuation_ws are as in the Header constructor. """ h = Header(maxlinelen=maxlinelen, header_name=header_name, continuation_ws=continuation_ws) for s, charset in decoded_seq: # None means us-ascii but we can simply pass it on to h.append() if charset is not None and not isinstance(charset, Charset): charset = Charset(charset) h.append(s, charset, errors=errors) return h class Quarantine(object): """Quarantine class suitable for QuarantineMilter The class holds all the objects and functions needed for QuarantineMilter quarantine. """ # list of possible actions _actions = { "ACCEPT": Milter.ACCEPT, "REJECT": Milter.REJECT, "DISCARD": Milter.DISCARD} def __init__(self, name, index=0, regex=None, storage=None, whitelist=None, host_whitelist=[], notification=None, action="ACCEPT", reject_reason=None): self.logger = logging.getLogger(__name__) self.name = name self.index = index self.regex = regex if regex: self.re = re.compile( regex, re.MULTILINE + re.DOTALL + re.IGNORECASE) self.storage = storage self.whitelist = whitelist self.host_whitelist = host_whitelist self.notification = notification action = action.upper() assert action in self._actions self.action = action self.milter_action = self._actions[action] self.reject_reason = reject_reason def setup_from_cfg(self, global_cfg, cfg, test=False): defaults = { "action": "accept", "reject_reason": "Message rejected", "storage_type": "none", "notification_type": "none", "whitelist_type": "none", "host_whitelist": "" } # check config for opt in ["regex", "smtp_host", "smtp_port"] + list(defaults.keys()): if opt in cfg: continue if opt in global_cfg: cfg[opt] = global_cfg[opt] elif opt in defaults: cfg[opt] = defaults[opt] else: raise RuntimeError( f"mandatory option '{opt}' not present in " f"config section '{self.name}' or 'global'") # pre-compile regex self.logger.debug( f"{self.name}: compiling regex '{cfg['regex']}'") self.re = re.compile( cfg["regex"], re.MULTILINE + re.DOTALL + re.IGNORECASE) self.smtp_host = cfg["smtp_host"] self.smtp_port = cfg["smtp_port"] # create storage instance storage_type = cfg["storage_type"].lower() if storage_type in storages.TYPES: self.logger.debug( f"{self.name}: initializing storage " f"type '{storage_type.upper()}'") self.storage = storages.TYPES[storage_type]( self.name, global_cfg, cfg, test) elif storage_type == "none": self.logger.debug(f"{self.name}: storage is NONE") self.storage = None else: raise RuntimeError( f"{self.name}: unknown storage type '{storage_type}'") # create whitelist instance whitelist_type = cfg["whitelist_type"].lower() if whitelist_type in whitelists.TYPES: self.logger.debug( f"{self.name}: initializing whitelist " f"type '{whitelist_type.upper()}'") self.whitelist = whitelists.TYPES[whitelist_type]( self.name, global_cfg, cfg, test) elif whitelist_type == "none": self.logger.debug(f"{self.name}: whitelist is NONE") self.whitelist = None else: raise RuntimeError( f"{self.name}: unknown whitelist type '{whitelist_type}'") # create notification instance notification_type = cfg["notification_type"].lower() if notification_type in notifications.TYPES: self.logger.debug( f"{self.name}: initializing notification " f"type '{notification_type.upper()}'") self.notification = notifications.TYPES[notification_type]( self.name, global_cfg, cfg, test) elif notification_type == "none": self.logger.debug(f"{self.name}: notification is NONE") self.notification = None else: raise RuntimeError( f"{self.name}: unknown notification " f"type '{notification_type}'") # determining milter action for this quarantine action = cfg["action"].upper() if action in self._actions: self.logger.debug(f"{self.name}: action is {action}") self.action = action self.milter_action = self._actions[action] else: raise RuntimeError( f"{self.name}: unknown action '{action}'") self.reject_reason = cfg["reject_reason"] # create host/network whitelist self.host_whitelist = [] host_whitelist = set([p.strip() for p in cfg["host_whitelist"].split(",") if p]) for host in host_whitelist: if not host: continue # parse network notation try: net = IPNetwork(host) except AddrFormatError as e: raise RuntimeError( f"{self.name}: error parsing host_whitelist: {e}") else: self.host_whitelist.append(net) if self.host_whitelist: whitelist = ", ".join([str(ip) for ip in host_whitelist]) self.logger.debug( f"{self.name}: host whitelist: {whitelist}") def notify(self, storage_id, recipient=None, synchronous=True): "Notify recipient about email in storage." if not self.storage: raise RuntimeError( "storage type is set to None, unable to send notification") if not self.notification: raise RuntimeError( "notification type is set to None, " "unable to send notification") fp, metadata = self.storage.get_mail(storage_id) if recipient is not None: if recipient not in metadata["recipients"]: raise RuntimeError(f"invalid recipient '{recipient}'") recipients = [recipient] else: recipients = metadata["recipients"] self.notification.notify( metadata["queue_id"], storage_id, metadata["mailfrom"], recipients, metadata["headers"], fp, metadata["subgroups"], metadata["named_subgroups"], synchronous) fp.close() def release(self, storage_id, recipients=None): "Release email from storage." if not self.storage: raise RuntimeError( "storage type is set to None, unable to release email") fp, metadata = self.storage.get_mail(storage_id) try: mail = fp.read() fp.close() except IOError as e: raise RuntimeError(f"unable to read data file: {e}") if recipients and type(recipients) == str: recipients = [recipients] else: recipients = metadata["recipients"] for recipient in recipients: if recipient not in metadata["recipients"]: raise RuntimeError(f"invalid recipient '{recipient}'") try: mailer.smtp_send( self.smtp_host, self.smtp_port, metadata["mailfrom"], recipient, mail) except Exception as e: raise RuntimeError( f"error while sending email to '{recipient}': {e}") self.storage.delete(storage_id, recipient) def get_storage(self): return self.storage def get_notification(self): return self.notification def get_whitelist(self): return self.whitelist def host_in_whitelist(self, hostaddr): ip = IPAddress(hostaddr[0]) for entry in self.host_whitelist: if ip in entry: return True return False def match(self, header): return self.re.search(header) class QuarantineMilter(Milter.Base): """QuarantineMilter based on Milter.Base to implement milter communication The class variable quarantines needs to be filled by runng the setup_milter function. """ quarantines = [] preferred_action = "first" # list of default config files _cfg_files = [ "/etc/pyquarantine/pyquarantine.conf", os.path.expanduser('~/pyquarantine.conf'), "pyquarantine.conf"] def __init__(self): self.logger = logging.getLogger(__name__) # save runtime config, it must not change during runtime self.quarantines = QuarantineMilter.quarantines def _get_preferred_quarantine(self): matching_quarantines = [ q for q in self.rcpts_quarantines.values() if q] if self.preferred_action == "first": quarantine = sorted( matching_quarantines, key=lambda q: q.index)[0] else: quarantine = sorted( matching_quarantines, key=lambda q: q.index, reverse=True)[0] return quarantine @staticmethod def get_cfg_files(): return QuarantineMilter._cfg_files @staticmethod def set_cfg_files(cfg_files): QuarantineMilter._cfg_files = cfg_files def connect(self, hostname, family, hostaddr): self.hostaddr = hostaddr self.logger.debug( f"accepted milter connection from {hostaddr[0]} " f"port {hostaddr[1]}") for quarantine in self.quarantines.copy(): if quarantine.host_in_whitelist(hostaddr): self.logger.debug( f"host {hostaddr[0]} is in whitelist of " f"quarantine {quarantine.name}") self.quarantines.remove(quarantine) if not self.quarantines: self.logger.debug( f"host {hostaddr[0]} is in whitelist of all " f"quarantines, skip further processing") return Milter.ACCEPT return Milter.CONTINUE @Milter.noreply def envfrom(self, mailfrom, *str): self.mailfrom = "@".join(parse_addr(mailfrom)).lower() self.recipients = set() return Milter.CONTINUE @Milter.noreply def envrcpt(self, to, *str): self.recipients.add("@".join(parse_addr(to)).lower()) return Milter.CONTINUE @Milter.noreply def data(self): self.qid = self.getsymval('i') self.logger.debug( f"{self.qid}: received queue-id from MTA") self.recipients = list(self.recipients) self.logger.debug( f"{self.qid}: initializing memory buffer to save email data") # initialize memory buffer to save email data self.fp = BytesIO() self.headers = [] return Milter.CONTINUE def header(self, name, value): try: # remove surrogates from value value = value.encode( errors="surrogateescape").decode(errors="replace") self.logger.debug(f"{self.qid}: received header: {name}: {value}") # write email header to memory buffer self.fp.write(f"{name}: {value}\r\n".encode( encoding="ascii", errors="replace")) header = make_header(decode_header(value), errors="replace") value = str(header).replace("\x00", "") self.logger.debug( f"{self.qid}: decoded header: {name}: {value}") self.headers.append((name, value)) return Milter.CONTINUE except Exception as e: self.logger.exception( f"an exception occured in header function: {e}") return Milter.TEMPFAIL def eoh(self): try: self.fp.write("\r\n".encode(encoding="ascii")) self.wl_cache = whitelists.WhitelistCache() # initialize dicts to set quaranines per recipient and keep matches self.rcpts_quarantines = {} self.matches = {} # iterate email headers rcpts_to_check = self.recipients.copy() for name, value in self.headers: header = f"{name}: {value}" self.logger.debug( f"{self.qid}: checking header against configured " f"quarantines: {header}") # iterate quarantines for quarantine in self.quarantines: if len(self.rcpts_quarantines) == len( self.recipients): # every recipient matched a quarantine already if quarantine.index >= max( [q.index for q in self.rcpts_quarantines.values()]): # all recipients matched a quarantine with at least # the same precedence already, skip checks against # quarantines with lower precedence self.logger.debug( f"{self.qid}: {quarantine.name}: skip further " f"checks of this header") break # check email header against quarantine regex self.logger.debug( f"{self.qid}: {quarantine.name}: checking header " f"against regex '{str(quarantine.regex)}'") match = quarantine.match(header) if match: self.logger.debug( f"{self.qid}: {quarantine.name}: " f"header matched regex") # check for whitelisted recipients whitelist = quarantine.get_whitelist() if whitelist: try: wl_recipients = self.wl_cache.get_recipients( whitelist, self.mailfrom, rcpts_to_check) except RuntimeError as e: self.logger.error( f"{self.qid}: {quarantine.name}: unable " f"to query whitelist: {e}") return Milter.TEMPFAIL else: wl_recipients = {} # iterate recipients for rcpt in rcpts_to_check.copy(): if rcpt in wl_recipients: # recipient is whitelisted in this quarantine self.logger.debug( f"{self.qid}: {quarantine.name}: " f"recipient '{rcpt}' is whitelisted") continue if rcpt not in self.rcpts_quarantines or \ self.rcpts_quarantines[rcpt].index > \ quarantine.index: self.logger.debug( f"{self.qid}: {quarantine.name}: set " f"quarantine for recipient '{rcpt}'") # save match for later use as template # variables self.matches[quarantine.name] = match self.rcpts_quarantines[rcpt] = quarantine if quarantine.index == 0: # we do not need to check recipients which # matched the quarantine with the highest # precedence already rcpts_to_check.remove(rcpt) else: self.logger.debug( f"{self.qid}: {quarantine.name}: a " f"quarantine with same or higher " f"precedence matched already for " f"recipient '{rcpt}'") if not rcpts_to_check: self.logger.debug( f"{self.qid}: all recipients matched the first " f"quarantine, skipping all remaining header checks") break # check if no quarantine has matched for all recipients if not self.rcpts_quarantines: # accept email self.logger.info( f"{self.qid}: passed clean for all recipients") return Milter.ACCEPT # check if the mail body is needed for recipient, quarantine in self.rcpts_quarantines.items(): if quarantine.get_storage() or quarantine.get_notification(): # mail body is needed, continue processing return Milter.CONTINUE # quarantine and notification are disabled on all matching # quarantines, just return configured action quarantine = self._get_preferred_quarantine() self.logger.info( f"{self.qid}: {self.preferred_action} " f"matching quarantine is '{quarantine.name}', performing " f"milter action {quarantine.action}") if quarantine.action == "REJECT": self.setreply("554", "5.7.0", quarantine.reject_reason) return quarantine.milter_action except Exception as e: self.logger.exception( f"an exception occured in eoh function: {e}") return Milter.TEMPFAIL def body(self, chunk): try: # save received body chunk self.fp.write(chunk) except Exception as e: self.logger.exception( f"an exception occured in body function: {e}") return Milter.TEMPFAIL return Milter.CONTINUE def eom(self): try: # processing recipients grouped by quarantines quarantines = [] for quarantine, recipients in groupby( sorted(self.rcpts_quarantines, key=lambda x: self.rcpts_quarantines[x].index), lambda x: self.rcpts_quarantines[x]): quarantines.append((quarantine, list(recipients))) # iterate quarantines sorted by index for quarantine, recipients in sorted( quarantines, key=lambda x: x[0].index): headers = defaultdict(str) for name, value in self.headers: headers[name.lower()] = value subgroups = self.matches[quarantine.name].groups( default="") named_subgroups = self.matches[quarantine.name].groupdict( default="") rcpts = ", ".join(recipients) # check if a storage is configured storage_id = "" storage = quarantine.get_storage() if storage: # add email to quarantine self.logger.info( f"{self.qid}: adding to quarantine " f"'{quarantine.name}' for: {rcpts}") try: storage_id = storage.add( self.qid, self.mailfrom, recipients, headers, self.fp, subgroups, named_subgroups) except RuntimeError as e: self.logger.error( f"{self.qid}: unable to add to quarantine " f"'{quarantine.name}': {e}") return Milter.TEMPFAIL # check if a notification is configured notification = quarantine.get_notification() if notification: # notify self.logger.info( f"{self.qid}: sending notification to: {rcpts}") try: notification.notify( self.qid, storage_id, self.mailfrom, recipients, headers, self.fp, subgroups, named_subgroups) except RuntimeError as e: self.logger.error( f"{self.qid}: unable to send notification: {e}") return Milter.TEMPFAIL # remove processed recipient for recipient in recipients: self.delrcpt(recipient) self.recipients.remove(recipient) self.fp.close() # email passed clean for at least one recipient, accepting email if self.recipients: rcpts = ", ".join(recipients) self.logger.info( f"{self.qid}: passed clean for: {rcpts}") return Milter.ACCEPT # return configured action quarantine = self._get_preferred_quarantine() self.logger.info( f"{self.qid}: {self.preferred_action} matching " f"quarantine is '{quarantine.name}', performing milter " f"action {quarantine.action}") if quarantine.action == "REJECT": self.setreply("554", "5.7.0", quarantine.reject_reason) return quarantine.milter_action except Exception as e: self.logger.exception( f"an exception occured in eom function: {e}") return Milter.TEMPFAIL def close(self): self.logger.debug( f"disconnect from {self.hostaddr[0]} port {self.hostaddr[1]}") return Milter.CONTINUE def setup_milter(test=False, cfg_files=[]): "Generate the configuration for QuarantineMilter class." logger = logging.getLogger(__name__) # read config file parser = configparser.ConfigParser() if not cfg_files: cfg_files = parser.read(QuarantineMilter.get_cfg_files()) else: cfg_files = parser.read(cfg_files) if not cfg_files: raise RuntimeError("config file not found") QuarantineMilter.set_cfg_files(cfg_files) os.chdir(os.path.dirname(cfg_files[0])) # check if mandatory config options in global section are present if "global" not in parser.sections(): raise RuntimeError( "mandatory section 'global' not present in config file") for option in ["quarantines", "preferred_quarantine_action"]: if not parser.has_option("global", option): raise RuntimeError( f"mandatory option '{option}' not present in config " f"section 'global'") # read global config section global_cfg = dict(parser.items("global")) preferred_action = global_cfg["preferred_quarantine_action"].lower() if preferred_action not in ["first", "last"]: raise RuntimeError( "option preferred_quarantine_action has illegal value") # read active quarantine names quarantines = [ q.strip() for q in global_cfg["quarantines"].split(",")] if len(quarantines) != len(set(quarantines)): raise RuntimeError( "at least one quarantine is specified multiple times " "in quarantines option") if "global" in quarantines: quarantines.remove("global") logger.warning( "removed illegal quarantine name 'global' from list of " "active quarantines") if not quarantines: raise RuntimeError("no quarantines configured") milter_quarantines = [] logger.debug("preparing milter configuration ...") # iterate quarantine names for index, name in enumerate(quarantines): # check if config section for current quarantine exists if name not in parser.sections(): raise RuntimeError( f"config section '{name}' does not exist") cfg = dict(parser.items(name)) quarantine = Quarantine(name, index) quarantine.setup_from_cfg(global_cfg, cfg, test) milter_quarantines.append(quarantine) QuarantineMilter.preferred_action = preferred_action QuarantineMilter.quarantines = milter_quarantines def reload_config(): "Reload the configuration of QuarantineMilter class." logger = logging.getLogger(__name__) try: setup_milter() except RuntimeError as e: logger.info(e) logger.info("daemon is still running with previous configuration") else: logger.info("reloaded configuration")