From 5d21b815309cc29f19a67e737c19fe49aa22f322 Mon Sep 17 00:00:00 2001 From: Thomas Oettli Date: Tue, 12 Dec 2023 19:20:22 +0100 Subject: [PATCH] change CLI to handle global lists --- pyquarantine/cli.py | 146 +++++++++++++++++++++-------------------- pyquarantine/config.py | 3 +- 2 files changed, 77 insertions(+), 72 deletions(-) diff --git a/pyquarantine/cli.py b/pyquarantine/cli.py index 1fde908..f0a94fe 100644 --- a/pyquarantine/cli.py +++ b/pyquarantine/cli.py @@ -23,6 +23,7 @@ import time from pyquarantine.config import get_milter_config, ActionConfig, ListConfig from pyquarantine.storage import Quarantine +from pyquarantine.list import DatabaseList from pyquarantine import __version__ as version @@ -41,8 +42,6 @@ def _get_quarantine(cfg, name, debug): (q for q in _get_quarantines(cfg) if q["name"] == name)) except StopIteration: raise RuntimeError(f"invalid quarantine '{name}'") - for name, lst in cfg["lists"].items(): - cfg["lists"][name] = ListConfig(lst, {}) return Quarantine(ActionConfig(quarantine, cfg["lists"]), [], debug) @@ -54,12 +53,17 @@ def _get_notification(cfg, name, debug): return notification -def _get_allowlist(cfg, name, debug): - allowlist = _get_quarantine(cfg, name, debug).allowlist - if not allowlist: - raise RuntimeError( - "allowlist type is set to NONE") - return allowlist +def _get_list(cfg, name, debug): + try: + list_cfg = ListConfig(cfg["lists"][name], {}) + except KeyError: + raise RuntimeError(f"list '{name}' is not configured") + + if list_cfg["type"] == "db": + list_cfg["loglevel"] = cfg["loglevel"] + return DatabaseList(list_cfg, debug) + else: + raise RuntimeError("invalid lists type") def print_table(columns, rows): @@ -110,21 +114,21 @@ def list_quarantines(cfg, args): else: qlist = [] for q in quarantines: - cfg = q["options"] - storage_type = cfg["store"]["type"] + qcfg = q["options"] + storage_type = qcfg["store"]["type"] if "notify" in cfg: - notification_type = cfg["notify"]["type"] + notification_type = qcfg["notify"]["type"] else: notification_type = "NONE" - if "allowlist" in cfg: - allowlist_type = cfg["allowlist"]["type"] + if "lists" in qcfg: + lists_type = qcfg["lists"] else: - allowlist_type = "NONE" + lists_type = "NONE" - if "milter_action" in cfg: - milter_action = cfg["milter_action"] + if "milter_action" in qcfg: + milter_action = qcfg["milter_action"] else: milter_action = "NONE" @@ -132,14 +136,14 @@ def list_quarantines(cfg, args): "name": q["name"], "storage": storage_type, "notification": notification_type, - "allowlist": allowlist_type, + "lists": lists_type, "action": milter_action}) print_table( [("Name", "name"), ("Storage", "storage"), ("Notification", "notification"), - ("Allowlist", "allowlist"), + ("Allowlist", "lists"), ("Action", "action")], qlist ) @@ -193,16 +197,16 @@ def list_quarantine_emails(cfg, args): ) -def list_allowlist(cfg, args): - allowlist = _get_allowlist(cfg, args.quarantine, args.debug) +def list_list(cfg, args): + lists = _get_list(cfg, args.list, args.debug) - # find allowlist entries - entries = allowlist.find( + # find lists entries + entries = lists.find( mailfrom=args.mailfrom, recipients=args.recipients, older_than=args.older_than) if not entries: - print(f"allowlist of quarantine '{args.quarantine}' is empty") + print("list is empty") return # transform some values to strings @@ -223,12 +227,12 @@ def list_allowlist(cfg, args): ) -def add_allowlist_entry(cfg, args): +def add_list_entry(cfg, args): logger = logging.getLogger(__name__) - allowlist = _get_allowlist(cfg, args.quarantine, args.debug) + lists = _get_list(cfg, args.list, args.debug) # check existing entries - entries = allowlist.check(args.mailfrom, args.recipient, logger) + entries = lists.check(args.mailfrom, args.recipient, logger) if entries: # check if the exact entry exists already for entry in entries.values(): @@ -258,15 +262,15 @@ def add_allowlist_entry(cfg, args): "from/to combination is already covered by the entries above, " "use --force to override.") - # add entry to allowlist - allowlist.add(args.mailfrom, args.recipient, args.comment, args.permanent) - print("allowlist entry added successfully") + # add entry to lists + lists.add(args.mailfrom, args.recipient, args.comment, args.permanent) + print("list entry added successfully") -def delete_allowlist_entry(cfg, args): - allowlist = _get_allowlist(cfg, args.quarantine, args.debug) - allowlist.delete(args.allowlist_id) - print("allowlist entry deleted successfully") +def delete_list_entry(cfg, args): + lists = _get_list(cfg, args.list, args.debug) + lists.delete(args.lists_id) + print("list entry deleted successfully") def notify(cfg, args): @@ -524,86 +528,86 @@ def main(): help="Quarantine ID.") quar_metadata_parser.set_defaults(func=metadata) - # allowlist command group - allowlist_parser = subparsers.add_parser( - "allowlist", - description="Manage allowlists.", - help="Manage allowlists.", + # lists command group + lists_parser = subparsers.add_parser( + "lists", + description="Manage lists.", + help="Manage lists.", formatter_class=formatter_class) - allowlist_parser.add_argument( - "quarantine", - metavar="QUARANTINE", - help="Quarantine name.") - allowlist_subparsers = allowlist_parser.add_subparsers( - dest="command", - title="Allowlist commands") - allowlist_subparsers.required = True - # allowlist list command - allowlist_list_parser = allowlist_subparsers.add_parser( + lists_parser.add_argument( "list", - description="List allowlist entries.", - help="List allowlist entries.", + metavar="LIST", + help="List name.") + lists_subparsers = lists_parser.add_subparsers( + dest="command", + title="Lists commands.") + lists_subparsers.required = True + # lists list command + lists_list_parser = lists_subparsers.add_parser( + "list", + description="List list entries.", + help="List list entries.", formatter_class=formatter_class) - allowlist_list_parser.add_argument( + lists_list_parser.add_argument( "-f", "--from", dest="mailfrom", help="Filter entries by from address.", default=None, nargs="+") - allowlist_list_parser.add_argument( + lists_list_parser.add_argument( "-t", "--to", dest="recipients", help="Filter entries by recipient address.", default=None, nargs="+") - allowlist_list_parser.add_argument( + lists_list_parser.add_argument( "-o", "--older-than", dest="older_than", help="Filter emails by last used date (days).", default=None, type=float) - allowlist_list_parser.set_defaults(func=list_allowlist) - # allowlist add command - allowlist_add_parser = allowlist_subparsers.add_parser( + lists_list_parser.set_defaults(func=list_list) + # lists add command + lists_add_parser = lists_subparsers.add_parser( "add", - description="Add allowlist entry.", - help="Add allowlist entry.", + description="Add list entry.", + help="Add list entry.", formatter_class=formatter_class) - allowlist_add_parser.add_argument( + lists_add_parser.add_argument( "-f", "--from", dest="mailfrom", help="From address.", required=True) - allowlist_add_parser.add_argument( + lists_add_parser.add_argument( "-t", "--to", dest="recipient", help="Recipient address.", required=True) - allowlist_add_parser.add_argument( + lists_add_parser.add_argument( "-c", "--comment", help="Comment.", default="added by CLI") - allowlist_add_parser.add_argument( + lists_add_parser.add_argument( "-p", "--permanent", help="Add a permanent entry.", action="store_true") - allowlist_add_parser.add_argument( + lists_add_parser.add_argument( "--force", help="Force adding an entry, " "even if already covered by another entry.", action="store_true") - allowlist_add_parser.set_defaults(func=add_allowlist_entry) - # allowlist delete command - allowlist_delete_parser = allowlist_subparsers.add_parser( + lists_add_parser.set_defaults(func=add_list_entry) + # lists delete command + lists_delete_parser = lists_subparsers.add_parser( "delete", - description="Delete allowlist entry.", - help="Delete allowlist entry.", + description="Delete list entry.", + help="Delete list entry.", formatter_class=formatter_class) - allowlist_delete_parser.add_argument( - "allowlist_id", + lists_delete_parser.add_argument( + "lists_id", metavar="ID", help="List ID.") - allowlist_delete_parser.set_defaults(func=delete_allowlist_entry) + lists_delete_parser.set_defaults(func=delete_list_entry) args = parser.parse_args() diff --git a/pyquarantine/config.py b/pyquarantine/config.py index 3e6b571..123815f 100644 --- a/pyquarantine/config.py +++ b/pyquarantine/config.py @@ -299,7 +299,8 @@ class ActionConfig(BaseConfig): return if "conditions" in self: self["conditions"] = ConditionsConfig(self["conditions"], lists) - self["action"] = self.ACTION_TYPES[self["type"]](self["options"], lists) + self["action"] = self.ACTION_TYPES[self["type"]]( + self["options"], lists) class RuleConfig(BaseConfig):