From fdaf2cee533f7dd3aa411cde2f1ce6255f4e5d9a Mon Sep 17 00:00:00 2001 From: Thomas Oettli Date: Fri, 6 Nov 2020 22:06:56 +0100 Subject: [PATCH] temporary remove shielding --- pyinotifyd/__init__.py | 36 ++++++++++++++++++-------------- pyinotifyd/scheduler.py | 46 ++++++++++++++++++----------------------- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/pyinotifyd/__init__.py b/pyinotifyd/__init__.py index 1778c30..126c27d 100755 --- a/pyinotifyd/__init__.py +++ b/pyinotifyd/__init__.py @@ -118,33 +118,37 @@ class DaemonInstance: self._log.info(f"got signal {signame}, shutdown") self.stop() - pending = self._get_pending_tasks() - for task in pending: - task.cancel() - - try: - await asyncio.gather(*pending) - except asyncio.CancelledError: - pass - pending = self._get_pending_tasks() if pending: - tasks_done = False if self._timeout: + try: + future = asyncio.shield(asyncio.gather(*pending)) + except asyncio.CancelledError: + pass + self._log.info( f"wait {self._timeout} seconds for {len(pending)} " f"remaining task(s) to complete") try: - future = asyncio.gather(*pending) await asyncio.wait_for(future, self._timeout) - tasks_done = True + pending = [] except asyncio.TimeoutError: - future.cancel() - future.exception() + self._log.warning( + f"shutdown timeout exceeded") - if not tasks_done: + pending = [t for t in pending if not t.done()] + + if pending: self._log.warning( - f"terminate {len(pending)} remaining task(s)") + f"cancel {len(pending)} remaining task(s)") + + for task in pending: + task.cancel() + + #try: + # await asyncio.gather(*pending) + #except asyncio.CancelledError: + # pass asyncio.get_event_loop().stop() self._shutdown = False diff --git a/pyinotifyd/scheduler.py b/pyinotifyd/scheduler.py index 4c90e58..b126632 100755 --- a/pyinotifyd/scheduler.py +++ b/pyinotifyd/scheduler.py @@ -36,21 +36,20 @@ class _Task: self._log = logging.getLogger((logname or __name__)) async def _start(self): - if self._delay > 0: - try: + try: + if self._delay > 0: await asyncio.sleep(self._delay) - except asyncio.CancelledError: - self._log.info(f"task {self._task_id} cancelled") - return - if self._callback is not None: - self._callback(self._event) + if self._callback is not None: + self._callback(self._event) - self._task = None + self._task = None - self._log.info(f"execute task {self._task_id}") - await asyncio.shield(self._job(self._event, self._task_id)) - self._log.info(f"task {self._task_id} finished") + self._log.info(f"execute task {self._task_id}") + await self._job(self._event, self._task_id) + self._log.info(f"task {self._task_id} finished") + except asyncio.CancelledError: + self._log.info(f"task {self._task_id} cancelled") def start(self): if self._task is None: @@ -164,8 +163,8 @@ class ShellScheduler(TaskScheduler): self._log.info(f"{task_id}: execute shell command: {cmd}") try: - proc = await asyncio.shield(asyncio.create_subprocess_shell(cmd)) - await asyncio.shield(proc.communicate()) + proc = await asyncio.create_subprocess_shell(cmd) + await proc.communicate() except Exception as e: self._log.error(f"{task_id}: {e}") @@ -255,8 +254,7 @@ class FileManagerScheduler(TaskScheduler): else: mode = rule.filemode - await asyncio.shield( - self._chmod_and_chown(path, mode, chown, task_id)) + await self._chmod_and_chown(path, mode, chown, task_id) if not os.path.isdir(path): return @@ -268,15 +266,13 @@ class FileManagerScheduler(TaskScheduler): for root, dirs, files in os.walk(path): if work_on_dirs: for p in [os.path.join(root, d) for d in dirs]: - await asyncio.shield( - self._chmod_and_chown( - p, rule.dirmode, chown, task_id)) + await self._chmod_and_chown( + p, rule.dirmode, chown, task_id) if work_on_files: for p in [os.path.join(root, f) for f in files]: - await asyncio.shield( - self._chmod_and_chown( - p, rule.filemode, chown, task_id)) + await self._chmod_and_chown( + p, rule.filemode, chown, task_id) def _get_rule_by_event(self, event): rule = None @@ -328,9 +324,8 @@ class FileManagerScheduler(TaskScheduler): try: os.makedirs(dst_dir) - await asyncio.shield( - self._set_mode_and_owner( - first_subdir, rule, task_id)) + await self._set_mode_and_owner( + first_subdir, rule, task_id) except Exception as e: raise RuntimeError(e) @@ -347,8 +342,7 @@ class FileManagerScheduler(TaskScheduler): else: os.rename(path, dst) - await asyncio.shield( - self._set_mode_and_owner(dst, rule, task_id)) + await self._set_mode_and_owner(dst, rule, task_id) except Exception as e: raise RuntimeError(e)