Improved CLI, quarantine- and error handling

This commit is contained in:
2019-03-13 09:27:58 +01:00
parent da6f08ee25
commit 88cea0e127
10 changed files with 508 additions and 301 deletions

View File

@@ -23,33 +23,37 @@ import time
import pyquarantine
def _get_quarantine_obj(config, quarantine):
if quarantine not in config.keys():
try:
quarantine_obj = next((q["quarantine_obj"] for q in config if q["name"] == quarantine))
except StopIteration:
raise RuntimeError("invalid quarantine '{}'".format(quarantine))
return config[quarantine]["quarantine_obj"]
return quarantine_obj
def _get_whitelist_obj(config, quarantine):
if quarantine not in config.keys():
try:
whitelist_obj = next((q["whitelist_obj"] for q in config if q["name"] == quarantine))
except StopIteration:
raise RuntimeError("invalid quarantine '{}'".format(quarantine))
return config[quarantine]["whitelist_obj"]
return whitelist_obj
def print_table(headers, keys, data):
if len(data) == 0:
def print_table(columns, rows):
if not rows:
return
# calculate length of each column
column_lengths = []
column_formats = []
for idx, header in enumerate(headers):
length = len(header)
key = keys[idx]
value_length=len((max(data.items(), key=lambda (k, v): len(v[key])))[1][key])
if value_length > length: length = value_length
# iterate columns to display
for header, key in columns:
# get the length of the header string
lengths = [len(header)]
# get the length of the longest value
lengths.append(len(str(max(rows, key=lambda x: len(str(x[key])))[key])))
# use the the longer one
length = max(lengths)
column_lengths.append(length)
column_formats.append("{{:<{}}}".format(length))
@@ -63,29 +67,26 @@ def print_table(headers, keys, data):
separator = "-+-".join(separators)
# print header and separator
print(row_format.format(*headers))
print(row_format.format(*[column[0] for column in columns]))
print(separator)
# print data
for key, value in data.items():
keys = [ entry[1] for entry in columns ]
# print rows
for entry in rows:
row = []
for entry in keys:
row.append(value[entry])
for key in keys:
row.append(entry[key])
print(row_format.format(*row))
def list_quarantines(config, args):
if args.batch:
print("\n".join(config.keys()))
print("\n".join([ quarantine["name"] for quarantine in config ]))
else:
print_table(
["Name", "Quarantine", "Notification", "Action"],
["name", "quarantine_type", "notification_type", "action"],
config
[("Name", "name"), ("Quarantine", "quarantine_type"), ("Notification", "notification_type"), ("Action", "action")],
config
)
return 0
def list_quarantine_emails(config, args):
@@ -104,15 +105,15 @@ def list_quarantine_emails(config, args):
emails[quarantine_id]["date_str"] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(metadata["date"]))
if args.batch:
# batch mode, print quarantine IDs, each on a new line
print("\n".join(emails.keys()))
else:
if len(emails) == 0: logger.info("quarantine '{}' is empty".format(args.quarantine))
print_table(
["Quarantine-ID", "From", "Recipient(s)", "Date"],
["quarantine_id", "from", "recipient_str", "date_str"],
emails
)
return
if not emails: logger.info("quarantine '{}' is empty".format(args.quarantine))
print_table(
[("Quarantine-ID", "quarantine_id"), ("From", "from"), ("Recipient(s)", "recipient_str"), ("Date", "date_str")],
emails.values()
)
def list_whitelist(config, args):
@@ -125,22 +126,23 @@ def list_whitelist(config, args):
# find whitelist entries
entries = whitelist.find(mailfrom=args.mailfrom, recipients=args.recipients, older_than=args.older_than)
if len(entries) == 0:
if not entries:
logger.info("whitelist of quarantine '{}' is empty".format(args.quarantine))
else:
# transform some values to strings
for entry_id, entry in entries.items():
entries[entry_id]["id"] = str(entry["id"])
entries[entry_id]["created_str"] = entry["created"].strftime('%Y-%m-%d %H:%M:%S')
entries[entry_id]["last_used_str"] = entry["last_used"].strftime('%Y-%m-%d %H:%M:%S')
entries[entry_id]["permanent_str"] = str(entry["permanent"])
return
print_table(
["ID", "From", "To", "Created", "Last used", "Comment", "Permanent"],
["id", "mailfrom", "recipient", "created_str", "last_used_str", "comment", "permanent_str"],
entries
)
# transform some values to strings
for entry_id, entry in entries.items():
entries[entry_id]["permanent_str"] = str(entry["permanent"])
entries[entry_id]["created_str"] = entry["created"].strftime('%Y-%m-%d %H:%M:%S')
entries[entry_id]["last_used_str"] = entry["last_used"].strftime('%Y-%m-%d %H:%M:%S')
print_table(
[
("ID", "id"), ("From", "mailfrom"), ("To", "recipient"), ("Created", "created_str"),
("Last used", "last_used_str"), ("Comment", "comment"), ("Permanent", "permanent_str")
],
entries.values()
)
def add_whitelist_entry(config, args):
@@ -153,58 +155,67 @@ def add_whitelist_entry(config, args):
# check existing entries
entries = whitelist.check(args.mailfrom, args.recipient)
if len(entries) > 0:
if entries:
# check if the exact entry exists already
for entry in entries.values():
if entry["mailfrom"] == args.mailfrom and entry["recipient"] == args.recipient:
raise RuntimeError("an entry with this from/to combination already exists")
if not args.force:
# the entry is already covered by others
for entry_id, entry in entries.items():
entries[entry_id]["permanent_str"] = str(entry["permanent"])
entries[entry_id]["created_str"] = entry["created"].strftime('%Y-%m-%d %H:%M:%S')
entries[entry_id]["last_used_str"] = entry["last_used"].strftime('%Y-%m-%d %H:%M:%S')
entries[entry_id]["permanent_str"] = str(entry["permanent"])
print_table(
["From", "To", "Created", "Last used", "Comment", "Permanent"],
["mailfrom", "recipient", "created_str", "last_used_str", "comment", "permanent_str"],
entries
[
("ID", "id"), ("From", "mailfrom"), ("To", "recipient"), ("Created", "created_str"),
("Last used", "last_used_str"), ("Comment", "comment"), ("Permanent", "permanent_str")
],
entries.values()
)
print("")
raise RuntimeError("from/to combination is already covered by the entries above, use --force to override.")
# add entry to whitelist
whitelist.add(args.mailfrom, args.recipient, args.comment, args.permanent)
logger.info("successfully added whitelist entry")
def delete_whitelist_entry(config, args):
logger = logging.getLogger(__name__)
whitelist = _get_whitelist_obj(config, args.quarantine)
if whitelist == None:
raise RuntimeError("whitelist type is set to None, unable to delete entries")
whitelist.delete(args.whitelist_id)
logger.info("successfully deleted whitelist entry")
def release_email(config, args):
logger = logging.getLogger(__name__)
quarantine = _get_quarantine_obj(config, args.quarantine)
if quarantine == None:
raise RuntimeError("quarantine type is set to None, unable to release e-mail")
quarantine.release(args.quarantine_id, args.recipient)
logger.info("successfully released e-mail [quarantine-id: {}] to '{}' from quarantine '{}'".format(args.quarantine_id, args.recipient, args.quarantine))
def delete_email(config, args):
logger = logging.getLogger(__name__)
quarantine = _get_quarantine_obj(config, args.quarantine)
if quarantine == None:
raise RuntimeError("quarantine type is set to None, unable to delete e-mail")
quarantine.delete(args.quarantine_id, args.recipient)
logger.info("successfully deleted e-mail [quarantine-id: {}] to '{}' from quarantine '{}'".format(args.quarantine_id, args.recipient, args.quarantine))
quarantine.delete(args.quarantine_id, args.recipient)
if args.recipient:
logger.info("successfully deleted e-mail [quarantine-id: {}] to '{}' from quarantine '{}'".format(args.quarantine_id, args.recipient, args.quarantine))
else:
logger.info("successfully deleted e-mail [quarantine-id: {}] from quarantine '{}'".format(args.quarantine_id, args.quarantine))
class StdErrFilter(logging.Filter):
@@ -212,13 +223,11 @@ class StdErrFilter(logging.Filter):
return rec.levelno in (logging.ERROR, logging.WARNING)
class StdOutFilter(logging.Filter):
def filter(self, rec):
return rec.levelno in (logging.DEBUG, logging.INFO)
def main():
"PyQuarantine command-line interface."
# parse command line
@@ -229,6 +238,7 @@ def main():
parser.add_argument("-d", "--debug", help="Log debugging messages.", action="store_true")
parser.set_defaults(syslog=False)
subparsers = parser.add_subparsers()
# list command
list_parser = subparsers.add_parser("list", help="List available quarantines.", formatter_class=formatter_class)
list_parser.add_argument("-b", "--batch", help="Print results using only quarantine names, each on a new line.", action="store_true")
@@ -236,10 +246,10 @@ def main():
# quarantine command group
quarantine_parser = subparsers.add_parser("quarantine", description="Manage quarantines.", help="Manage quarantines.", formatter_class=formatter_class)
quarantine_parser.add_argument("quarantine", metavar="QUARANTINE", help="Quarantine name.")
quarantine_subparsers = quarantine_parser.add_subparsers()
# quarantine list command
quarantine_list_parser = quarantine_subparsers.add_parser("list", description="List e-mails in quarantines.", help="List e-mails in quarantine.", formatter_class=formatter_class)
quarantine_list_parser.add_argument("quarantine", metavar="QUARANTINE", help="Quarantine name.")
quarantine_list_parser.add_argument("-f", "--from", dest="mailfrom", help="Filter e-mails by from address.", default=None, nargs="+")
quarantine_list_parser.add_argument("-t", "--to", dest="recipients", help="Filter e-mails by recipient address.", default=None, nargs="+")
quarantine_list_parser.add_argument("-o", "--older-than", dest="older_than", help="Filter e-mails by age (days).", default=None, type=float)
@@ -247,7 +257,6 @@ def main():
quarantine_list_parser.set_defaults(func=list_quarantine_emails)
# quarantine release command
quarantine_release_parser = quarantine_subparsers.add_parser("release", description="Release e-mail from quarantine.", help="Release e-mail from quarantine.", formatter_class=formatter_class)
quarantine_release_parser.add_argument("quarantine", metavar="QUARANTINE", help="Quarantine name.")
quarantine_release_parser.add_argument("quarantine_id", metavar="ID", help="Quarantine ID.")
quarantine_release_parser.add_argument("-n", "--disable-syslog", dest="syslog", help="Disable syslog messages.", action="store_false")
quarantine_release_parser_group = quarantine_release_parser.add_mutually_exclusive_group(required=True)
@@ -256,7 +265,6 @@ def main():
quarantine_release_parser.set_defaults(func=release_email)
# quarantine delete command
quarantine_delete_parser = quarantine_subparsers.add_parser("delete", description="Delete e-mail from quarantine.", help="Delete e-mail from quarantine.", formatter_class=formatter_class)
quarantine_delete_parser.add_argument("quarantine", metavar="QUARANTINE", help="Quarantine name.")
quarantine_delete_parser.add_argument("quarantine_id", metavar="ID", help="Quarantine ID.")
quarantine_delete_parser.add_argument("-n", "--disable-syslog", dest="syslog", help="Disable syslog messages.", action="store_false")
quarantine_delete_parser_group = quarantine_delete_parser.add_mutually_exclusive_group(required=True)
@@ -266,26 +274,24 @@ def main():
# whitelist command group
whitelist_parser = subparsers.add_parser("whitelist", description="Manage whitelists.", help="Manage whitelists.", formatter_class=formatter_class)
whitelist_parser.add_argument("quarantine", metavar="QUARANTINE", help="Quarantine name.")
whitelist_subparsers = whitelist_parser.add_subparsers()
# whitelist list command
whitelist_list_parser = whitelist_subparsers.add_parser("list", description="List whitelist entries.", help="List whitelist entries.", formatter_class=formatter_class)
whitelist_list_parser.add_argument("quarantine", metavar="QUARANTINE", help="Quarantine name.")
whitelist_list_parser.add_argument("-f", "--from", dest="mailfrom", help="Filter entries by from address.", default=None, nargs="+")
whitelist_list_parser.add_argument("-t", "--to", dest="recipients", help="Filter entries by recipient address.", default=None, nargs="+")
whitelist_list_parser.add_argument("-o", "--older-than", dest="older_than", help="Filter e-mails by last used date (days).", default=None, type=float)
whitelist_list_parser.set_defaults(func=list_whitelist)
# whitelist add command
whitelist_add_parser = whitelist_subparsers.add_parser("add", description="Add whitelist entry.", help="Add whitelist entry.", formatter_class=formatter_class)
whitelist_add_parser.add_argument("quarantine", metavar="QUARANTINE", help="Quarantine name.")
whitelist_add_parser.add_argument("-f", "--from", dest="mailfrom", help="From address.", required=True)
whitelist_add_parser.add_argument("-t", "--to", dest="recipient", help="Recipient address.", required=True)
whitelist_add_parser.add_argument("-c", "--comment", help="Comment.", default="added by CLI", required=False)
whitelist_add_parser.add_argument("-c", "--comment", help="Comment.", default="added by CLI")
whitelist_add_parser.add_argument("-p", "--permanent", help="Add a permanent entry.", action="store_true")
whitelist_add_parser.add_argument("--force", help="Force adding an entry, even if already covered by another entry.", action="store_true")
whitelist_add_parser.set_defaults(func=add_whitelist_entry)
# whitelist delete command
whitelist_delete_parser = whitelist_subparsers.add_parser("delete", description="Delete whitelist entry.", help="Delete whitelist entry.", formatter_class=formatter_class)
whitelist_delete_parser.add_argument("quarantine", metavar="QUARANTINE", help="Quarantine name.")
whitelist_delete_parser.add_argument("whitelist_id", metavar="ID", help="Whitelist ID.")
whitelist_delete_parser.set_defaults(func=delete_whitelist_entry)
@@ -317,7 +323,7 @@ def main():
# try to generate milter configs
try:
config = pyquarantine.generate_milter_config(config_files=args.config, configtest=True)
global_config, config = pyquarantine.generate_milter_config(config_files=args.config, configtest=True)
except RuntimeError as e:
logger.error(e)
sys.exit(255)
@@ -341,6 +347,5 @@ def main():
sys.exit(1)
if __name__ == "__main__":
main()