lazy load quarantine objects in cli
This commit is contained in:
@@ -25,24 +25,24 @@ from pyquarantine.storage import Quarantine
|
||||
from pyquarantine import __version__ as version
|
||||
|
||||
|
||||
def _get_quarantine(quarantines, name):
|
||||
def _get_quarantine(quarantines, name, debug):
|
||||
try:
|
||||
quarantine = next((q for q in quarantines if q.name == name))
|
||||
quarantine = next((q for q in quarantines if q["name"] == name))
|
||||
except StopIteration:
|
||||
raise RuntimeError(f"invalid quarantine '{name}'")
|
||||
return quarantine
|
||||
return Quarantine(quarantine, [], debug)
|
||||
|
||||
|
||||
def _get_notification(quarantines, name):
|
||||
notification = _get_quarantine(quarantines, name).notification
|
||||
def _get_notification(quarantines, name, debug):
|
||||
notification = _get_quarantine(quarantines, name, debug).notification
|
||||
if not notification:
|
||||
raise RuntimeError(
|
||||
"notification type is set to NONE")
|
||||
return notification
|
||||
|
||||
|
||||
def _get_whitelist(quarantines, name):
|
||||
whitelist = _get_quarantine(quarantines, name).whitelist
|
||||
def _get_whitelist(quarantines, name, debug):
|
||||
whitelist = _get_quarantine(quarantines, name, debug).whitelist
|
||||
if not whitelist:
|
||||
raise RuntimeError(
|
||||
"whitelist type is set to NONE")
|
||||
@@ -96,24 +96,26 @@ def list_quarantines(quarantines, args):
|
||||
else:
|
||||
qlist = []
|
||||
for q in quarantines:
|
||||
storage_type = q.storage.type
|
||||
cfg = q["args"]
|
||||
storage_type = cfg["store"]["type"]
|
||||
|
||||
if q.notification:
|
||||
notification_type = q.notification.type
|
||||
if "notify" in cfg:
|
||||
notification_type = cfg["notify"]["type"]
|
||||
else:
|
||||
notification_type = "NONE"
|
||||
|
||||
if q.whitelist:
|
||||
whitelist_type = q.whitelist.type
|
||||
if "whitelist" in cfg:
|
||||
whitelist_type = cfg["whitelist"]["whitelist"]["type"]
|
||||
else:
|
||||
whitelist_type = "NONE"
|
||||
|
||||
qlist.append({
|
||||
"name": q.name,
|
||||
"name": q["name"],
|
||||
"storage": storage_type,
|
||||
"notification": notification_type,
|
||||
"whitelist": whitelist_type,
|
||||
"action": q.milter_action})
|
||||
"action": q["args"]["milter_action"]})
|
||||
|
||||
print_table(
|
||||
[("Name", "name"),
|
||||
("Storage", "storage"),
|
||||
@@ -126,7 +128,7 @@ def list_quarantines(quarantines, args):
|
||||
|
||||
def list_quarantine_emails(quarantines, args):
|
||||
logger = logging.getLogger(__name__)
|
||||
storage = _get_quarantine(quarantines, args.quarantine).storage
|
||||
storage = _get_quarantine(quarantines, args.quarantine, args.debug).storage
|
||||
|
||||
# find emails and transform some metadata values to strings
|
||||
rows = []
|
||||
@@ -173,7 +175,7 @@ def list_quarantine_emails(quarantines, args):
|
||||
|
||||
def list_whitelist(quarantines, args):
|
||||
logger = logging.getLogger(__name__)
|
||||
whitelist = _get_whitelist(quarantines, args.quarantine)
|
||||
whitelist = _get_whitelist(quarantines, args.quarantine, args.debug)
|
||||
|
||||
# find whitelist entries
|
||||
entries = whitelist.find(
|
||||
@@ -205,7 +207,7 @@ def list_whitelist(quarantines, args):
|
||||
|
||||
def add_whitelist_entry(quarantines, args):
|
||||
logger = logging.getLogger(__name__)
|
||||
whitelist = _get_whitelist(quarantines, args.quarantine)
|
||||
whitelist = _get_whitelist(quarantines, args.quarantine, args.debug)
|
||||
|
||||
# check existing entries
|
||||
entries = whitelist.check(args.mailfrom, args.recipient, logger)
|
||||
@@ -245,34 +247,34 @@ def add_whitelist_entry(quarantines, args):
|
||||
|
||||
def delete_whitelist_entry(quarantines, args):
|
||||
logger = logging.getLogger(__name__)
|
||||
whitelist = _get_whitelist(quarantines, args.quarantine)
|
||||
whitelist = _get_whitelist(quarantines, args.quarantine, args.debug)
|
||||
whitelist.delete(args.whitelist_id)
|
||||
logger.info("whitelist entry deleted successfully")
|
||||
|
||||
|
||||
def notify(quarantines, args):
|
||||
logger = logging.getLogger(__name__)
|
||||
quarantine = _get_quarantine(quarantines, args.quarantine)
|
||||
quarantine = _get_quarantine(quarantines, args.quarantine, args.debug)
|
||||
quarantine.notify(args.quarantine_id, args.recipient)
|
||||
logger.info("notification sent successfully")
|
||||
|
||||
|
||||
def release(quarantines, args):
|
||||
logger = logging.getLogger(__name__)
|
||||
quarantine = _get_quarantine(quarantines, args.quarantine)
|
||||
quarantine = _get_quarantine(quarantines, args.quarantine, args.debug)
|
||||
quarantine.release(args.quarantine_id, args.recipient)
|
||||
logger.info("quarantined email released successfully")
|
||||
|
||||
|
||||
def delete(quarantines, args):
|
||||
logger = logging.getLogger(__name__)
|
||||
storage = _get_quarantine(quarantines, args.quarantine).storage
|
||||
storage = _get_quarantine(quarantines, args.quarantine, args.debug).storage
|
||||
storage.delete(args.quarantine_id, args.recipient)
|
||||
logger.info("quarantined email deleted successfully")
|
||||
|
||||
|
||||
def get(quarantines, args):
|
||||
storage = _get_quarantine(quarantines, args.quarantine).storage
|
||||
storage = _get_quarantine(quarantines, args.quarantine, args.debug).storage
|
||||
_, msg = storage.get_mail(args.quarantine_id)
|
||||
print(msg.as_string())
|
||||
|
||||
@@ -581,8 +583,7 @@ def main():
|
||||
for rule in cfg["rules"]:
|
||||
for action in rule["actions"]:
|
||||
if action["type"] == "quarantine":
|
||||
quarantines.append(
|
||||
Quarantine(action, [], args.debug))
|
||||
quarantines.append(action)
|
||||
|
||||
if args.syslog:
|
||||
# setup syslog
|
||||
|
||||
Reference in New Issue
Block a user