improve error handling in FileManagerScheduler
This commit is contained in:
@@ -117,6 +117,16 @@ class DaemonInstance:
|
|||||||
self._shutdown = True
|
self._shutdown = True
|
||||||
self._log.info(f"got signal {signame}, shutdown")
|
self._log.info(f"got signal {signame}, shutdown")
|
||||||
self.stop()
|
self.stop()
|
||||||
|
|
||||||
|
pending = self._get_pending_tasks()
|
||||||
|
for task in pending:
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
try:
|
||||||
|
await asyncio.gather(*pending)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
pending = self._get_pending_tasks()
|
pending = self._get_pending_tasks()
|
||||||
if pending:
|
if pending:
|
||||||
tasks_done = False
|
tasks_done = False
|
||||||
@@ -133,14 +143,8 @@ class DaemonInstance:
|
|||||||
future.exception()
|
future.exception()
|
||||||
|
|
||||||
if not tasks_done:
|
if not tasks_done:
|
||||||
self._log.warning("terminate remaining task(s)")
|
self._log.warning(
|
||||||
for task in pending:
|
f"terminate {len(pending)} remaining task(s)")
|
||||||
task.cancel()
|
|
||||||
|
|
||||||
try:
|
|
||||||
await asyncio.gather(*pending)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
asyncio.get_event_loop().stop()
|
asyncio.get_event_loop().stop()
|
||||||
self._shutdown = False
|
self._shutdown = False
|
||||||
|
|||||||
@@ -302,12 +302,12 @@ class FileManagerScheduler(TaskScheduler):
|
|||||||
dst = rule.src_re.sub(rule.dst_re, path)
|
dst = rule.src_re.sub(rule.dst_re, path)
|
||||||
if not dst:
|
if not dst:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"{task_id}: unable to {rule.action} '{path}', "
|
f"unable to {rule.action} '{path}', "
|
||||||
f"resulting destination path is empty")
|
f"resulting destination path is empty")
|
||||||
|
|
||||||
if os.path.exists(dst):
|
if os.path.exists(dst):
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"{task_id}: unable to move file from '{path} "
|
f"unable to move file from '{path} "
|
||||||
f"to '{dst}', dstination path exists already")
|
f"to '{dst}', dstination path exists already")
|
||||||
|
|
||||||
dst_dir = os.path.dirname(dst)
|
dst_dir = os.path.dirname(dst)
|
||||||
@@ -321,35 +321,47 @@ class FileManagerScheduler(TaskScheduler):
|
|||||||
first_subdir = parent
|
first_subdir = parent
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
os.makedirs(dst_dir)
|
|
||||||
await asyncio.shield(
|
try:
|
||||||
self._set_mode_and_owner(first_subdir, rule, task_id))
|
os.makedirs(dst_dir)
|
||||||
|
await asyncio.shield(
|
||||||
|
self._set_mode_and_owner(
|
||||||
|
first_subdir, rule, task_id))
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(e)
|
||||||
|
|
||||||
self._log.info(
|
self._log.info(
|
||||||
f"{task_id}: {rule.action} '{path}' to '{dst}'")
|
f"{task_id}: {rule.action} '{path}' to '{dst}'")
|
||||||
if rule.action == "copy":
|
|
||||||
if os.path.isdir(path):
|
try:
|
||||||
shutil.copytree(path, dst)
|
if rule.action == "copy":
|
||||||
|
if os.path.isdir(path):
|
||||||
|
shutil.copytree(path, dst)
|
||||||
|
else:
|
||||||
|
shutil.copy2(path, dst)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
shutil.copy2(path, dst)
|
os.rename(path, dst)
|
||||||
|
|
||||||
else:
|
await asyncio.shield(
|
||||||
os.rename(path, dst)
|
self._set_mode_and_owner(dst, rule, task_id))
|
||||||
|
except Exception as e:
|
||||||
await asyncio.shield(
|
raise RuntimeError(e)
|
||||||
self._set_mode_and_owner(dst, rule, task_id))
|
|
||||||
|
|
||||||
elif rule.action == "delete":
|
elif rule.action == "delete":
|
||||||
self._log.info(
|
self._log.info(
|
||||||
f"{task_id}: {rule.action} '{path}'")
|
f"{task_id}: {rule.action} '{path}'")
|
||||||
if os.path.isdir(path):
|
try:
|
||||||
if rule.rec:
|
if os.path.isdir(path):
|
||||||
shutil.rmtree(path)
|
if rule.rec:
|
||||||
else:
|
shutil.rmtree(path)
|
||||||
shutil.rmdir(path)
|
else:
|
||||||
|
shutil.rmdir(path)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
os.remove(path)
|
os.remove(path)
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(e)
|
||||||
|
|
||||||
except RuntimeError as e:
|
except RuntimeError as e:
|
||||||
self._log.error(f"{task_id}: {e}")
|
self._log.error(f"{task_id}: {e}")
|
||||||
|
|||||||
Reference in New Issue
Block a user