change config structure

This commit is contained in:
2024-01-03 19:54:34 +01:00
parent 5d21b81530
commit 4da1a0e9b3
7 changed files with 194 additions and 99 deletions

View File

@@ -21,28 +21,31 @@ import logging.handlers
import sys
import time
from pyquarantine.config import get_milter_config, ActionConfig, ListConfig
from pyquarantine.action import Action
from pyquarantine.config import get_milter_config, ActionConfig, StorageConfig, NotificationConfig, ListConfig
from pyquarantine.storage import Quarantine
from pyquarantine.list import DatabaseList
from pyquarantine import __version__ as version
def _get_quarantines(cfg):
def _get_quarantines(milter_cfg):
quarantines = []
for rule in cfg["rules"]:
for rule in milter_cfg["rules"]:
for action in rule["actions"]:
if action["type"] == "quarantine":
quarantines.append(action)
return quarantines
def _get_quarantine(cfg, name, debug):
def _get_quarantine(milter_cfg, name, debug):
try:
quarantine = next(
(q for q in _get_quarantines(cfg) if q["name"] == name))
(q for q in _get_quarantines(milter_cfg) if q["name"] == name))
except StopIteration:
raise RuntimeError(f"invalid quarantine '{name}'")
return Quarantine(ActionConfig(quarantine, cfg["lists"]), [], debug)
cfg = ActionConfig(quarantine, milter_cfg)
return Quarantine(cfg, [], debug)
def _get_notification(cfg, name, debug):
@@ -55,7 +58,7 @@ def _get_notification(cfg, name, debug):
def _get_list(cfg, name, debug):
try:
list_cfg = ListConfig(cfg["lists"][name], {})
list_cfg = cfg["lists"][name]
except KeyError:
raise RuntimeError(f"list '{name}' is not configured")
@@ -107,7 +110,7 @@ def print_table(columns, rows):
print(row_format.format(*row))
def list_quarantines(cfg, args):
def llist(cfg, args):
quarantines = _get_quarantines(cfg)
if args.batch:
print("\n".join([q["name"] for q in quarantines]))
@@ -115,32 +118,33 @@ def list_quarantines(cfg, args):
qlist = []
for q in quarantines:
qcfg = q["options"]
storage_type = qcfg["store"]["type"]
if "notify" in cfg:
notification_type = qcfg["notify"]["type"]
if "notify" in qcfg:
notification = cfg["notifications"][qcfg["notify"]]["name"]
else:
notification_type = "NONE"
notification = "NONE"
if "lists" in qcfg:
lists_type = qcfg["lists"]
if "allowlist" in qcfg:
allowlist = qcfg["allowlist"]
else:
lists_type = "NONE"
allowlist = "NONE"
if "milter_action" in qcfg:
milter_action = qcfg["milter_action"]
else:
milter_action = "NONE"
storage_name = cfg["storages"][qcfg["store"]]["name"]
qlist.append({
"name": q["name"],
"storage": storage_type,
"notification": notification_type,
"lists": lists_type,
"storage": storage_name,
"notification": notification,
"lists": allowlist,
"action": milter_action})
print_table(
[("Name", "name"),
[("Quarantine", "name"),
("Storage", "storage"),
("Notification", "notification"),
("Allowlist", "lists"),
@@ -148,6 +152,48 @@ def list_quarantines(cfg, args):
qlist
)
if "storages" in cfg:
storages = []
for name, options in cfg["storages"].items():
storages.append({
"name": name,
"type": options["type"]})
print("\n")
print_table(
[("Storage", "name"),
("Type", "type")],
storages
)
if "notifications" in cfg:
notifications = []
for name, options in cfg["notifications"].items():
notifications.append({
"name": name,
"type": options["type"]})
print("\n")
print_table(
[("Notification", "name"),
("Type", "type")],
notifications
)
if "lists" in cfg:
lst_list = []
for name, options in cfg["lists"].items():
lst_list.append({
"name": name,
"type": options["type"]})
print("\n")
print_table(
[("List", "name"),
("Type", "type")],
lst_list
)
def list_quarantine_emails(cfg, args):
storage = _get_quarantine(cfg, args.quarantine, args.debug).storage
@@ -364,7 +410,7 @@ def main():
"-b", "--batch",
help="Print results using only quarantine names, each on a new line.",
action="store_true")
list_parser.set_defaults(func=list_quarantines)
list_parser.set_defaults(func=llist)
# quarantine command group
quar_parser = subparsers.add_parser(
@@ -638,7 +684,7 @@ def main():
try:
logger.debug("read milter configuration")
cfg = get_milter_config(args.config, raw=True)
cfg = get_milter_config(args.config, rec=False)
if "rules" not in cfg or not cfg["rules"]:
raise RuntimeError("no rules configured")