rename whitelist to allowlist

This commit is contained in:
2023-12-11 18:42:06 +01:00
parent 9a86e8cd24
commit bbd4d2c95b
7 changed files with 135 additions and 135 deletions

View File

@@ -24,7 +24,7 @@ __all__ = [
"rule",
"run",
"storage",
"whitelist",
"list",
"QuarantineMilter"]
__version__ = "2.0.8"

View File

@@ -42,12 +42,12 @@ def _get_notification(quarantines, name, debug):
return notification
def _get_whitelist(quarantines, name, debug):
whitelist = _get_quarantine(quarantines, name, debug).whitelist
if not whitelist:
def _get_allowlist(quarantines, name, debug):
allowlist = _get_quarantine(quarantines, name, debug).allowlist
if not allowlist:
raise RuntimeError(
"whitelist type is set to NONE")
return whitelist
"allowlist type is set to NONE")
return allowlist
def print_table(columns, rows):
@@ -105,10 +105,10 @@ def list_quarantines(quarantines, args):
else:
notification_type = "NONE"
if "whitelist" in cfg:
whitelist_type = cfg["whitelist"]["type"]
if "allowlist" in cfg:
allowlist_type = cfg["allowlist"]["type"]
else:
whitelist_type = "NONE"
allowlist_type = "NONE"
if "milter_action" in cfg:
milter_action = cfg["milter_action"]
@@ -119,14 +119,14 @@ def list_quarantines(quarantines, args):
"name": q["name"],
"storage": storage_type,
"notification": notification_type,
"whitelist": whitelist_type,
"allowlist": allowlist_type,
"action": milter_action})
print_table(
[("Name", "name"),
("Storage", "storage"),
("Notification", "notification"),
("Whitelist", "whitelist"),
("Allowlist", "allowlist"),
("Action", "action")],
qlist
)
@@ -180,16 +180,16 @@ def list_quarantine_emails(quarantines, args):
)
def list_whitelist(quarantines, args):
whitelist = _get_whitelist(quarantines, args.quarantine, args.debug)
def list_allowlist(quarantines, args):
allowlist = _get_allowlist(quarantines, args.quarantine, args.debug)
# find whitelist entries
entries = whitelist.find(
# find allowlist entries
entries = allowlist.find(
mailfrom=args.mailfrom,
recipients=args.recipients,
older_than=args.older_than)
if not entries:
print(f"whitelist of quarantine '{args.quarantine}' is empty")
print(f"allowlist of quarantine '{args.quarantine}' is empty")
return
# transform some values to strings
@@ -210,12 +210,12 @@ def list_whitelist(quarantines, args):
)
def add_whitelist_entry(quarantines, args):
def add_allowlist_entry(quarantines, args):
logger = logging.getLogger(__name__)
whitelist = _get_whitelist(quarantines, args.quarantine, args.debug)
allowlist = _get_allowlist(quarantines, args.quarantine, args.debug)
# check existing entries
entries = whitelist.check(args.mailfrom, args.recipient, logger)
entries = allowlist.check(args.mailfrom, args.recipient, logger)
if entries:
# check if the exact entry exists already
for entry in entries.values():
@@ -245,15 +245,15 @@ def add_whitelist_entry(quarantines, args):
"from/to combination is already covered by the entries above, "
"use --force to override.")
# add entry to whitelist
whitelist.add(args.mailfrom, args.recipient, args.comment, args.permanent)
print("whitelist entry added successfully")
# add entry to allowlist
allowlist.add(args.mailfrom, args.recipient, args.comment, args.permanent)
print("allowlist entry added successfully")
def delete_whitelist_entry(quarantines, args):
whitelist = _get_whitelist(quarantines, args.quarantine, args.debug)
whitelist.delete(args.whitelist_id)
print("whitelist entry deleted successfully")
def delete_allowlist_entry(quarantines, args):
allowlist = _get_allowlist(quarantines, args.quarantine, args.debug)
allowlist.delete(args.allowlist_id)
print("allowlist entry deleted successfully")
def notify(quarantines, args):
@@ -509,86 +509,86 @@ def main():
help="Quarantine ID.")
quar_metadata_parser.set_defaults(func=metadata)
# whitelist command group
whitelist_parser = subparsers.add_parser(
"whitelist",
description="Manage whitelists.",
help="Manage whitelists.",
# allowlist command group
allowlist_parser = subparsers.add_parser(
"allowlist",
description="Manage allowlists.",
help="Manage allowlists.",
formatter_class=formatter_class)
whitelist_parser.add_argument(
allowlist_parser.add_argument(
"quarantine",
metavar="QUARANTINE",
help="Quarantine name.")
whitelist_subparsers = whitelist_parser.add_subparsers(
allowlist_subparsers = allowlist_parser.add_subparsers(
dest="command",
title="Whitelist commands")
whitelist_subparsers.required = True
# whitelist list command
whitelist_list_parser = whitelist_subparsers.add_parser(
title="Allowlist commands")
allowlist_subparsers.required = True
# allowlist list command
allowlist_list_parser = allowlist_subparsers.add_parser(
"list",
description="List whitelist entries.",
help="List whitelist entries.",
description="List allowlist entries.",
help="List allowlist entries.",
formatter_class=formatter_class)
whitelist_list_parser.add_argument(
allowlist_list_parser.add_argument(
"-f", "--from",
dest="mailfrom",
help="Filter entries by from address.",
default=None,
nargs="+")
whitelist_list_parser.add_argument(
allowlist_list_parser.add_argument(
"-t", "--to",
dest="recipients",
help="Filter entries by recipient address.",
default=None,
nargs="+")
whitelist_list_parser.add_argument(
allowlist_list_parser.add_argument(
"-o", "--older-than",
dest="older_than",
help="Filter emails by last used date (days).",
default=None,
type=float)
whitelist_list_parser.set_defaults(func=list_whitelist)
# whitelist add command
whitelist_add_parser = whitelist_subparsers.add_parser(
allowlist_list_parser.set_defaults(func=list_allowlist)
# allowlist add command
allowlist_add_parser = allowlist_subparsers.add_parser(
"add",
description="Add whitelist entry.",
help="Add whitelist entry.",
description="Add allowlist entry.",
help="Add allowlist entry.",
formatter_class=formatter_class)
whitelist_add_parser.add_argument(
allowlist_add_parser.add_argument(
"-f", "--from",
dest="mailfrom",
help="From address.",
required=True)
whitelist_add_parser.add_argument(
allowlist_add_parser.add_argument(
"-t", "--to",
dest="recipient",
help="Recipient address.",
required=True)
whitelist_add_parser.add_argument(
allowlist_add_parser.add_argument(
"-c", "--comment",
help="Comment.",
default="added by CLI")
whitelist_add_parser.add_argument(
allowlist_add_parser.add_argument(
"-p", "--permanent",
help="Add a permanent entry.",
action="store_true")
whitelist_add_parser.add_argument(
allowlist_add_parser.add_argument(
"--force",
help="Force adding an entry, "
"even if already covered by another entry.",
action="store_true")
whitelist_add_parser.set_defaults(func=add_whitelist_entry)
# whitelist delete command
whitelist_delete_parser = whitelist_subparsers.add_parser(
allowlist_add_parser.set_defaults(func=add_allowlist_entry)
# allowlist delete command
allowlist_delete_parser = allowlist_subparsers.add_parser(
"delete",
description="Delete whitelist entry.",
help="Delete whitelist entry.",
description="Delete allowlist entry.",
help="Delete allowlist entry.",
formatter_class=formatter_class)
whitelist_delete_parser.add_argument(
"whitelist_id",
allowlist_delete_parser.add_argument(
"allowlist_id",
metavar="ID",
help="Whitelist ID.")
whitelist_delete_parser.set_defaults(func=delete_whitelist_entry)
help="List ID.")
allowlist_delete_parser.set_defaults(func=delete_allowlist_entry)
args = parser.parse_args()

View File

@@ -19,7 +19,7 @@ import re
from netaddr import IPAddress, IPNetwork, AddrFormatError
from pyquarantine import CustomLogger
from pyquarantine.whitelist import DatabaseWhitelist
from pyquarantine.list import DatabaseList
class Conditions:
@@ -62,14 +62,14 @@ class Conditions:
else:
setattr(self, arg, cfg[arg])
self.whitelist = cfg["whitelist"] if "whitelist" in cfg else None
if self.whitelist is not None:
self.whitelist["name"] = f"{cfg['name']}: whitelist"
self.whitelist["loglevel"] = cfg["loglevel"]
if self.whitelist["type"] == "db":
self.whitelist = DatabaseWhitelist(self.whitelist, debug)
self.allowlist = cfg["allowlist"] if "allowlist" in cfg else None
if self.allowlist is not None:
self.allowlist["name"] = f"{cfg['name']}: allowlist"
self.allowlist["loglevel"] = cfg["loglevel"]
if self.allowlist["type"] == "db":
self.allowlist = DatabaseList(self.allowlist, debug)
else:
raise RuntimeError("invalid whitelist type")
raise RuntimeError("invalid allowlist type")
def __str__(self):
cfg = []
@@ -77,12 +77,12 @@ class Conditions:
"var", "metavar"):
if arg in self.cfg:
cfg.append(f"{arg}={self.cfg[arg]}")
if self.whitelist is not None:
cfg.append(f"whitelist={self.whitelist}")
if self.allowlist is not None:
cfg.append(f"allowlist={self.allowlist}")
return "Conditions(" + ", ".join(cfg) + ")"
def get_whitelist(self):
return self.whitelist
def get_allowlist(self):
return self.allowlist
def match_host(self, host):
logger = CustomLogger(
@@ -124,12 +124,12 @@ class Conditions:
return True
def get_wl_rcpts(self, mailfrom, rcpts, logger):
if not self.whitelist:
if not self.allowlist:
return {}
wl_rcpts = []
for rcpt in rcpts:
if self.whitelist.check(mailfrom, rcpt, logger):
if self.allowlist.check(mailfrom, rcpt, logger):
wl_rcpts.append(rcpt)
return wl_rcpts

View File

@@ -22,7 +22,7 @@ __all__ = [
"RewriteLinksConfig",
"StoreConfig",
"NotifyConfig",
"WhitelistConfig",
"AllowListConfig",
"QuarantineConfig",
"ActionConfig",
"RuleConfig",
@@ -89,7 +89,7 @@ class BaseConfig:
return self._config
class WhitelistConfig(BaseConfig):
class AllowListConfig(BaseConfig):
JSON_SCHEMA = {
"type": "object",
"required": ["type"],
@@ -121,14 +121,14 @@ class ConditionsConfig(BaseConfig):
"headers": {"type": "array",
"items": {"type": "string"}},
"var": {"type": "string"},
"whitelist": {"type": "object"}}}
"allowlist": {"type": "object"}}}
def __init__(self, config, rec=True):
super().__init__(config)
if not rec:
return
if "whitelist" in self:
self["whitelist"] = WhitelistConfig(self["whitelist"])
if "allowlist" in self:
self["allowlist"] = AllowListConfig(self["allowlist"])
class AddHeaderConfig(BaseConfig):
@@ -242,7 +242,7 @@ class QuarantineConfig(BaseConfig):
"notify": {"type": "object"},
"milter_action": {"type": "string"},
"reject_reason": {"type": "string"},
"whitelist": {"type": "object"},
"allowlist": {"type": "object"},
"store": {"type": "object"},
"smtp_host": {"type": "string"},
"smtp_port": {"type": "number"}}}
@@ -256,9 +256,9 @@ class QuarantineConfig(BaseConfig):
self["store"] = StoreConfig(self["store"])
if "notify" in self:
self["notify"] = NotifyConfig(self["notify"])
if "whitelist" in self:
self["whitelist"] = ConditionsConfig(
{"whitelist": self["whitelist"]}, rec)
if "allowlist" in self:
self["allowlist"] = ConditionsConfig(
{"allowlist": self["allowlist"]}, rec)
class ActionConfig(BaseConfig):

View File

@@ -13,8 +13,8 @@
#
__all__ = [
"DatabaseWhitelist",
"WhitelistBase"]
"DatabaseList",
"ListBase"]
import logging
import peewee
@@ -24,8 +24,8 @@ from datetime import datetime
from playhouse.db_url import connect
class WhitelistBase:
"Whitelist base class"
class ListBase:
"List base class"
def __init__(self, cfg, debug):
self.cfg = cfg
self.logger = logging.getLogger(cfg["name"])
@@ -47,15 +47,15 @@ class WhitelistBase:
return self.batv_regex.sub(r"\g<LEFT_PART>@", addr, count=1)
def check(self, mailfrom, recipient):
"Check if mailfrom/recipient combination is whitelisted."
"Check if mailfrom/recipient combination is listed."
return
def find(self, mailfrom=None, recipients=None, older_than=None):
"Find whitelist entries."
"Find list entries."
return
def add(self, mailfrom, recipient, comment, permanent):
"Add entry to whitelist."
"Add entry to list."
# check if mailfrom and recipient are valid
if not self.valid_entry_regex.match(mailfrom):
raise RuntimeError("invalid from address")
@@ -63,12 +63,12 @@ class WhitelistBase:
raise RuntimeError("invalid recipient")
return
def delete(self, whitelist_id):
"Delete entry from whitelist."
def delete(self, list_id):
"Delete entry from list."
return
class WhitelistModel(peewee.Model):
class DatabaseListModel(peewee.Model):
mailfrom = peewee.CharField()
recipient = peewee.CharField()
created = peewee.DateTimeField(default=datetime.now)
@@ -84,9 +84,9 @@ class Meta:
)
class DatabaseWhitelist(WhitelistBase):
"Whitelist class to store whitelist in a database"
whitelist_type = "db"
class DatabaseList(ListBase):
"List class to store lists in a database"
list_type = "db"
_db_connections = {}
_db_tables = {}
@@ -96,8 +96,8 @@ class DatabaseWhitelist(WhitelistBase):
tablename = cfg["table"]
connection_string = cfg["connection"]
if connection_string in DatabaseWhitelist._db_connections.keys():
db = DatabaseWhitelist._db_connections[connection_string]
if connection_string in DatabaseList._db_connections.keys():
db = DatabaseList._db_connections[connection_string]
else:
try:
# connect to database
@@ -112,22 +112,22 @@ class DatabaseWhitelist(WhitelistBase):
raise RuntimeError(
f"unable to connect to database: {e}")
DatabaseWhitelist._db_connections[connection_string] = db
DatabaseList._db_connections[connection_string] = db
# generate model meta class
self.meta = Meta
self.meta.database = db
self.meta.table_name = tablename
self.model = type(
f"WhitelistModel_{self.cfg['name']}",
(WhitelistModel,),
f"DatabaseListModel_{self.cfg['name']}",
(DatabaseListModel,),
{"Meta": self.meta})
if connection_string not in DatabaseWhitelist._db_tables.keys():
DatabaseWhitelist._db_tables[connection_string] = []
if connection_string not in DatabaseList._db_tables.keys():
DatabaseList._db_tables[connection_string] = []
if tablename not in DatabaseWhitelist._db_tables[connection_string]:
DatabaseWhitelist._db_tables[connection_string].append(tablename)
if tablename not in DatabaseList._db_tables[connection_string]:
DatabaseList._db_tables[connection_string].append(tablename)
try:
db.create_tables([self.model])
except Exception as e:
@@ -139,7 +139,7 @@ class DatabaseWhitelist(WhitelistBase):
for arg in ("connection", "table"):
if arg in self.cfg:
cfg.append(f"{arg}={self.cfg[arg]}")
return "DatabaseWhitelist(" + ", ".join(cfg) + ")"
return "DatabaseList(" + ", ".join(cfg) + ")"
def _entry_to_dict(self, entry):
result = {}
@@ -164,14 +164,14 @@ class DatabaseWhitelist(WhitelistBase):
return value
def check(self, mailfrom, recipient, logger):
# check if mailfrom/recipient combination is whitelisted
# check if mailfrom/recipient combination is listed
super().check(mailfrom, recipient)
mailfrom = self.remove_batv(mailfrom)
recipient = self.remove_batv(recipient)
# generate list of possible mailfroms
logger.debug(
f"query database for whitelist entries from <{mailfrom}> "
f"query database for list entries from <{mailfrom}> "
f"to <{recipient}>")
mailfroms = [""]
if "@" in mailfrom and not mailfrom.startswith("@"):
@@ -196,7 +196,7 @@ class DatabaseWhitelist(WhitelistBase):
raise RuntimeError(f"unable to query database: {e}")
if not entries:
# no whitelist entry found
# no list entry found
return {}
if len(entries) > 1:
@@ -213,7 +213,7 @@ class DatabaseWhitelist(WhitelistBase):
return result
def find(self, mailfrom=None, recipients=None, older_than=None):
"Find whitelist entries."
"Find list entries."
super().find(mailfrom, recipients, older_than)
if isinstance(mailfrom, str):
@@ -244,7 +244,7 @@ class DatabaseWhitelist(WhitelistBase):
return entries
def add(self, mailfrom, recipient, comment, permanent):
"Add entry to whitelist."
"Add entry to list."
super().add(
mailfrom,
recipient,
@@ -263,16 +263,16 @@ class DatabaseWhitelist(WhitelistBase):
except Exception as e:
raise RuntimeError(f"unable to add entry to database: {e}")
def delete(self, whitelist_id):
"Delete entry from whitelist."
super().delete(whitelist_id)
def delete(self, list_id):
"Delete entry from list."
super().delete(list_id)
try:
query = self.model.delete().where(self.model.id == whitelist_id)
query = self.model.delete().where(self.model.id == list_id)
deleted = query.execute()
except Exception as e:
raise RuntimeError(
f"unable to delete entry from database: {e}")
if deleted == 0:
raise RuntimeError("invalid whitelist id")
raise RuntimeError("invalid list id")

View File

@@ -203,14 +203,14 @@ class EMailNotification(BaseNotification):
f"and its content")
element.extract()
# remove not whitelisted elements, but keep their content
# remove not allowed elements, but keep their content
for element in soup.find_all(True):
if element.name not in EMailNotification._good_tags:
logger.debug(
f"removing tag '{element.name}', keep its content")
element.replaceWithChildren()
# remove not whitelisted attributes
# remove not allowed attributes
for element in soup.find_all(True):
for attribute in list(element.attrs.keys()):
if attribute not in EMailNotification._good_attributes:

View File

@@ -428,13 +428,13 @@ class Quarantine:
"options": cfg["options"]["notify"].get_config()})
self._notification = Notify(notify_cfg, local_addrs, debug)
self._whitelist = None
if "whitelist" in cfg["options"]:
whitelist_cfg = cfg["options"]["whitelist"]
whitelist_cfg["name"] = cfg["name"]
whitelist_cfg["loglevel"] = cfg["loglevel"]
self._whitelist = Conditions(
whitelist_cfg,
self._allowlist = None
if "allowlist" in cfg["options"]:
allowlist_cfg = cfg["options"]["allowlist"]
allowlist_cfg["name"] = cfg["name"]
allowlist_cfg["loglevel"] = cfg["loglevel"]
self._allowlist = Conditions(
allowlist_cfg,
local_addrs=[],
debug=debug)
@@ -456,8 +456,8 @@ class Quarantine:
cfg.append(f"store={str(self._storage)}")
if self._notification is not None:
cfg.append(f"notify={str(self._notification)}")
if self._whitelist is not None:
cfg.append(f"whitelist={str(self._whitelist)}")
if self._allowlist is not None:
cfg.append(f"allowlist={str(self._allowlist)}")
for key in ["milter_action", "reject_reason"]:
if key not in self.cfg["options"]:
continue
@@ -481,10 +481,10 @@ class Quarantine:
return self._notification.get_notification()
@property
def whitelist(self):
if self._whitelist is None:
def allowlist(self):
if self._allowlist is None:
return None
return self._whitelist.get_whitelist()
return self._allowlist.get_allowlist()
@property
def milter_action(self):
@@ -554,14 +554,14 @@ class Quarantine:
self.logger, {"name": self.cfg["name"], "qid": milter.qid})
rcpts = milter.msginfo["rcpts"]
wl_rcpts = []
if self._whitelist:
wl_rcpts = self._whitelist.get_wl_rcpts(
if self._allowlist:
wl_rcpts = self._allowlist.get_wl_rcpts(
milter.msginfo["mailfrom"], rcpts, logger)
if wl_rcpts:
logger.info(f"whitelisted recipients: {wl_rcpts}")
logger.info(f"allowed recipients: {wl_rcpts}")
rcpts = [rcpt for rcpt in rcpts if rcpt not in wl_rcpts]
if not rcpts:
# all recipients whitelisted
# all recipients allowed
return
milter.msginfo["rcpts"] = rcpts.copy()