temporary remove shielding

This commit is contained in:
2020-11-06 22:06:56 +01:00
parent 5d07e08618
commit fdaf2cee53
2 changed files with 40 additions and 42 deletions

View File

@@ -119,32 +119,36 @@ class DaemonInstance:
self.stop() self.stop()
pending = self._get_pending_tasks() pending = self._get_pending_tasks()
for task in pending: if pending:
task.cancel() if self._timeout:
try: try:
await asyncio.gather(*pending) future = asyncio.shield(asyncio.gather(*pending))
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
pending = self._get_pending_tasks()
if pending:
tasks_done = False
if self._timeout:
self._log.info( self._log.info(
f"wait {self._timeout} seconds for {len(pending)} " f"wait {self._timeout} seconds for {len(pending)} "
f"remaining task(s) to complete") f"remaining task(s) to complete")
try: try:
future = asyncio.gather(*pending)
await asyncio.wait_for(future, self._timeout) await asyncio.wait_for(future, self._timeout)
tasks_done = True pending = []
except asyncio.TimeoutError: except asyncio.TimeoutError:
future.cancel()
future.exception()
if not tasks_done:
self._log.warning( self._log.warning(
f"terminate {len(pending)} remaining task(s)") f"shutdown timeout exceeded")
pending = [t for t in pending if not t.done()]
if pending:
self._log.warning(
f"cancel {len(pending)} remaining task(s)")
for task in pending:
task.cancel()
#try:
# await asyncio.gather(*pending)
#except asyncio.CancelledError:
# pass
asyncio.get_event_loop().stop() asyncio.get_event_loop().stop()
self._shutdown = False self._shutdown = False

View File

@@ -36,12 +36,9 @@ class _Task:
self._log = logging.getLogger((logname or __name__)) self._log = logging.getLogger((logname or __name__))
async def _start(self): async def _start(self):
if self._delay > 0:
try: try:
if self._delay > 0:
await asyncio.sleep(self._delay) await asyncio.sleep(self._delay)
except asyncio.CancelledError:
self._log.info(f"task {self._task_id} cancelled")
return
if self._callback is not None: if self._callback is not None:
self._callback(self._event) self._callback(self._event)
@@ -49,8 +46,10 @@ class _Task:
self._task = None self._task = None
self._log.info(f"execute task {self._task_id}") self._log.info(f"execute task {self._task_id}")
await asyncio.shield(self._job(self._event, self._task_id)) await self._job(self._event, self._task_id)
self._log.info(f"task {self._task_id} finished") self._log.info(f"task {self._task_id} finished")
except asyncio.CancelledError:
self._log.info(f"task {self._task_id} cancelled")
def start(self): def start(self):
if self._task is None: if self._task is None:
@@ -164,8 +163,8 @@ class ShellScheduler(TaskScheduler):
self._log.info(f"{task_id}: execute shell command: {cmd}") self._log.info(f"{task_id}: execute shell command: {cmd}")
try: try:
proc = await asyncio.shield(asyncio.create_subprocess_shell(cmd)) proc = await asyncio.create_subprocess_shell(cmd)
await asyncio.shield(proc.communicate()) await proc.communicate()
except Exception as e: except Exception as e:
self._log.error(f"{task_id}: {e}") self._log.error(f"{task_id}: {e}")
@@ -255,8 +254,7 @@ class FileManagerScheduler(TaskScheduler):
else: else:
mode = rule.filemode mode = rule.filemode
await asyncio.shield( await self._chmod_and_chown(path, mode, chown, task_id)
self._chmod_and_chown(path, mode, chown, task_id))
if not os.path.isdir(path): if not os.path.isdir(path):
return return
@@ -268,15 +266,13 @@ class FileManagerScheduler(TaskScheduler):
for root, dirs, files in os.walk(path): for root, dirs, files in os.walk(path):
if work_on_dirs: if work_on_dirs:
for p in [os.path.join(root, d) for d in dirs]: for p in [os.path.join(root, d) for d in dirs]:
await asyncio.shield( await self._chmod_and_chown(
self._chmod_and_chown( p, rule.dirmode, chown, task_id)
p, rule.dirmode, chown, task_id))
if work_on_files: if work_on_files:
for p in [os.path.join(root, f) for f in files]: for p in [os.path.join(root, f) for f in files]:
await asyncio.shield( await self._chmod_and_chown(
self._chmod_and_chown( p, rule.filemode, chown, task_id)
p, rule.filemode, chown, task_id))
def _get_rule_by_event(self, event): def _get_rule_by_event(self, event):
rule = None rule = None
@@ -328,9 +324,8 @@ class FileManagerScheduler(TaskScheduler):
try: try:
os.makedirs(dst_dir) os.makedirs(dst_dir)
await asyncio.shield( await self._set_mode_and_owner(
self._set_mode_and_owner( first_subdir, rule, task_id)
first_subdir, rule, task_id))
except Exception as e: except Exception as e:
raise RuntimeError(e) raise RuntimeError(e)
@@ -347,8 +342,7 @@ class FileManagerScheduler(TaskScheduler):
else: else:
os.rename(path, dst) os.rename(path, dst)
await asyncio.shield( await self._set_mode_and_owner(dst, rule, task_id)
self._set_mode_and_owner(dst, rule, task_id))
except Exception as e: except Exception as e:
raise RuntimeError(e) raise RuntimeError(e)