add whitelist functionality to quarantine

This commit is contained in:
2021-09-21 05:20:47 +02:00
parent f4bc545f9b
commit f4bb0d38eb
5 changed files with 354 additions and 3 deletions

View File

@@ -20,6 +20,7 @@ import re
from netaddr import IPAddress, IPNetwork, AddrFormatError
from pymodmilter import BaseConfig, CustomLogger
from pymodmilter.whitelist import DatabaseWhitelist
class ConditionsConfig(BaseConfig):
@@ -52,6 +53,33 @@ class ConditionsConfig(BaseConfig):
if "metavar" in cfg:
self.add_string_arg(cfg, "metavar")
if "whitelist" in cfg:
assert isinstance(cfg["whitelist"], dict), \
f"{self.name}: whitelist: invalid value, " \
f"should be dict"
whitelist = cfg["whitelist"]
assert "type" in whitelist, \
f"{self.name}: whitelist: mandatory parameter 'type' not found"
assert isinstance(whitelist["type"], str), \
f"{self.name}: whitelist: type: invalid value, " \
f"should be string"
self.args["whitelist"] = {
"type": whitelist["type"],
"name": f"{self.name}: whitelist"}
if whitelist["type"] == "db":
for arg in ["connection", "table"]:
assert arg in whitelist, \
f"{self.name}: whitelist: mandatory parameter " \
f"'{arg}' not found"
assert isinstance(whitelist[arg], str), \
f"{self.name}: whitelist: {arg}: invalid value, " \
f"should be string"
self.args["whitelist"][arg] = whitelist[arg]
else:
raise RuntimeError(
f"{self.name}: whitelist: type: invalid type")
self.logger.debug(f"{self.name}: "
f"loglevel={self.loglevel}, "
f"args={self.args}")
@@ -94,6 +122,13 @@ class Conditions:
except re.error as e:
raise RuntimeError(e)
if "whitelist" in cfg.args:
wl_cfg = cfg.args["whitelist"]
if wl_cfg["type"] == "db":
self.whitelist = DatabaseWhitelist(wl_cfg)
else:
raise RuntimeError("invalid storage type")
def match_host(self, host):
logger = CustomLogger(
self.logger, {"name": self.name})
@@ -134,6 +169,17 @@ class Conditions:
return True
def get_wl_rcpts(self, mailfrom, rcpts):
if not self.whitelist:
return {}
wl_rcpts = []
for rcpt in rcpts:
if self.whitelist.check(mailfrom, rcpt):
wl_rcpts.append(rcpt)
return wl_rcpts
def match(self, milter):
logger = CustomLogger(
self.logger, {"qid": milter.qid, "name": self.name})