add class Rule to configure FileManager
This commit is contained in:
BIN
.config.py.swp
BIN
.config.py.swp
Binary file not shown.
@@ -244,45 +244,56 @@ class Watch:
|
||||
return pyinotify.AsyncioNotifier(wm, loop, default_proc_fun=handler)
|
||||
|
||||
|
||||
class Rule:
|
||||
valid_actions = ["copy", "move", "delete"]
|
||||
|
||||
def __init__(self, action, src_re, dst_re="", auto_create=False,
|
||||
rec=False):
|
||||
|
||||
assert action in self.valid_actions, \
|
||||
f"action: expected [{Rule.valid_actions.join(', ')}], got{action}"
|
||||
self.action = action
|
||||
self.src_re = re.compile(src_re)
|
||||
assert isinstance(dst_re, str), \
|
||||
f"dst_re: expected {type('')}, got {type(dst_re)}"
|
||||
self.dst_re = dst_re
|
||||
assert isinstance(auto_create, bool), \
|
||||
f"auto_create: expected {type(bool)}, got {type(auto_create)}"
|
||||
self.auto_create = auto_create
|
||||
assert isinstance(rec, bool), \
|
||||
f"rec: expected {type(bool)}, got {type(rec)}"
|
||||
self.rec = rec
|
||||
|
||||
|
||||
class FileManager:
|
||||
def __init__(self, rules, auto_create=True, rec=False,
|
||||
logname="FileManager"):
|
||||
self._rules = []
|
||||
def __init__(self, rules, logname="FileManager"):
|
||||
if not isinstance(rules, list):
|
||||
rules = [rules]
|
||||
|
||||
for rule in rules:
|
||||
if rule["action"] in ["copy", "move"]:
|
||||
self._rules.append((rule["action"],
|
||||
re.compile(rule["src_re"]),
|
||||
rule["dst_re"]))
|
||||
elif rule["action"] == "delete":
|
||||
self._rules.append(
|
||||
(rule["action"], re.compile(rule["src_re"])))
|
||||
else:
|
||||
raise ValueError(f"invalid action type: {rule['action']}")
|
||||
assert isinstance(rule, Rule), \
|
||||
f"rules: expected {type(Rule)}, got {type(rule)}"
|
||||
|
||||
self._auto_create = auto_create
|
||||
self._rec = rec
|
||||
self._rules = rules
|
||||
self._log = logging.getLogger(logname)
|
||||
|
||||
def add_rule(self, *args, **kwargs):
|
||||
self._rules.append(Rule(*args, **kwargs))
|
||||
|
||||
async def task(self, event, task_id):
|
||||
path = event.pathname
|
||||
match = None
|
||||
for rule in self._rules:
|
||||
src_re = rule[1]
|
||||
match = src_re.match(path)
|
||||
match = rule.src_re.match(path)
|
||||
if match:
|
||||
break
|
||||
|
||||
if match is not None:
|
||||
action = rule[0]
|
||||
try:
|
||||
if action in ["copy", "move"]:
|
||||
dst_re = rule[2]
|
||||
dest = src_re.sub(dst_re, path)
|
||||
if rule.action in ["copy", "move"]:
|
||||
dest = src_re.sub(rule.dst_re, path)
|
||||
dest_dir = os.path.dirname(dest)
|
||||
if not os.path.isdir(dest_dir) and self._auto_create:
|
||||
if not os.path.isdir(dest_dir) and rule.auto_create:
|
||||
self._log.info(
|
||||
f"{task_id}: create directory '{dest_dir}'")
|
||||
os.makedirs(dest_dir)
|
||||
@@ -301,11 +312,11 @@ class FileManager:
|
||||
else:
|
||||
os.rename(path, dest)
|
||||
|
||||
elif action == "delete":
|
||||
elif rule.action == "delete":
|
||||
self._log.info(
|
||||
f"{task_id}: {action} '{path}'")
|
||||
f"{task_id}: delete '{path}'")
|
||||
if os.path.isdir(path):
|
||||
if self._rec:
|
||||
if rule.rec:
|
||||
shutil.rmtree(path)
|
||||
else:
|
||||
shutil.rmdir(path)
|
||||
|
||||
Reference in New Issue
Block a user