From e2d3a16125262d44e7095a9cbb069151fc582379 Mon Sep 17 00:00:00 2001 From: Thomas Oettli Date: Mon, 9 Nov 2020 01:48:03 +0100 Subject: [PATCH] correctly handle loop argument where needed --- pyinotifyd/__init__.py | 32 +++++++++++++++++++------------- pyinotifyd/scheduler.py | 17 ++++++++++------- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/pyinotifyd/__init__.py b/pyinotifyd/__init__.py index 302d249..8a1df4f 100755 --- a/pyinotifyd/__init__.py +++ b/pyinotifyd/__init__.py @@ -38,15 +38,16 @@ __version__ = "0.0.2" class _SchedulerList: - def __init__(self, schedulers=[]): + def __init__(self, schedulers=[], loop=None): if not isinstance(schedulers, list): schedulers = [schedulers] self._schedulers = schedulers + self._loop = (loop or asyncio.get_event_loop()) def process_event(self, event): for scheduler in self._schedulers: - asyncio.create_task(scheduler.process_event(event)) + self._loop.create_task(scheduler.process_event(event)) def schedulers(self): return self._schedulers @@ -57,8 +58,9 @@ class EventMap(ProcessEvent): **pyinotify.EventsCodes.OP_FLAGS, **pyinotify.EventsCodes.EVENT_FLAGS} - def my_init(self, event_map=None, default_sched=None): + def my_init(self, event_map=None, default_sched=None, loop=None): self._map = {} + self._loop = (loop or asyncio.get_event_loop()) if default_sched is not None: for flag in EventMap.flags: @@ -83,9 +85,10 @@ class EventMap(ProcessEvent): isinstance(scheduler, Cancel): instances.append(scheduler) else: - instances.append(TaskScheduler(scheduler)) + instances.append( + TaskScheduler(scheduler, loop=self._loop)) - self._map[flag] = _SchedulerList(instances) + self._map[flag] = _SchedulerList(instances, loop=self._loop) elif flag in self._map: del self._map[flag] @@ -107,7 +110,7 @@ class EventMap(ProcessEvent): class Watch: def __init__(self, path, event_map=None, default_sched=None, rec=False, - auto_add=False, logname="watch"): + auto_add=False, logname="watch", loop=None): assert isinstance(path, str), \ f"path: expected {type('')}, got {type(path)}" @@ -122,6 +125,7 @@ class Watch: assert isinstance(auto_add, bool), \ f"auto_add: expected {type(bool)}, got {type(auto_add)}" logname = (logname or __name__) + self._loop = loop self._path = path self._rec = rec @@ -137,7 +141,8 @@ class Watch: def event_map(self): return self._event_map - def start(self, loop=asyncio.get_event_loop()): + def start(self, loop=None): + loop = (loop or self._loop) self._watch_manager.add_watch(self._path, pyinotify.ALL_EVENTS, rec=self._rec, auto_add=self._auto_add, do_glob=True) @@ -154,12 +159,14 @@ class Watch: class Pyinotifyd: name = "pyinotifyd" - def __init__(self, watches=[], shutdown_timeout=30, logname="daemon"): + def __init__(self, watches=[], shutdown_timeout=30, logname="daemon", + loop=None): self.set_watches(watches) self.set_shutdown_timeout(shutdown_timeout) logname = (logname or __name__) + self._loop = (loop or asyncio.get_event_loop()) + self._log = logging.getLogger(logname) - self._loop = asyncio.get_event_loop() @staticmethod def from_cfg_file(config_file): @@ -203,8 +210,7 @@ class Pyinotifyd: return list(set(schedulers)) def start(self, loop=None): - if not loop: - loop = self._loop + loop = (loop or self._loop) if len(self._watches) == 0: self._log.warning( @@ -396,12 +402,12 @@ def main(): for signame in ["SIGINT", "SIGTERM"]: loop.add_signal_handler( getattr(signal, signame), - lambda: asyncio.ensure_future( + lambda: loop.create_task( daemon.shutdown(signame))) loop.add_signal_handler( getattr(signal, "SIGHUP"), - lambda: asyncio.ensure_future( + lambda: loop.create_task( daemon.reload("SIGHUP", args.config, args.debug))) daemon.start() diff --git a/pyinotifyd/scheduler.py b/pyinotifyd/scheduler.py index f1c678b..22e0a7a 100755 --- a/pyinotifyd/scheduler.py +++ b/pyinotifyd/scheduler.py @@ -43,7 +43,8 @@ class TaskScheduler: task: asyncio.Task = None cancelable: bool = True - def __init__(self, job, files=True, dirs=False, delay=0, logname="sched"): + def __init__(self, job, files=True, dirs=False, delay=0, logname="sched", + loop=None): assert iscoroutinefunction(job), \ f"job: expected coroutine, got {type(job)}" assert isinstance(files, bool), \ @@ -58,6 +59,7 @@ class TaskScheduler: self._dirs = dirs self._delay = delay self._log = logging.getLogger((logname or __name__)) + self._loop = (loop or asyncio.get_event_loop()) self._tasks = {} self._pause = False @@ -78,7 +80,8 @@ class TaskScheduler: self._log.info( f"wait {timeout} seconds for {len(pending)} " f"remaining task(s) to complete") - done, pending = await asyncio.wait([*pending], timeout=timeout) + done, pending = await asyncio.wait([*pending], timeout=timeout, + loop=self._loop) if pending: self._log.warning( f"shutdown timeout exceeded, " @@ -86,7 +89,7 @@ class TaskScheduler: for task in pending: task.cancel() try: - await asyncio.gather(*pending) + await asyncio.gather(*pending, loop=self._loop) except asyncio.CancelledError: pass else: @@ -94,8 +97,8 @@ class TaskScheduler: async def _run_job(self, event, task_state, restart=False): if self._delay > 0: - task_state.task = asyncio.create_task( - asyncio.sleep(self._delay)) + task_state.task = self._loop.create_task( + asyncio.sleep(self._delay, loop=self._loop)) try: if restart: @@ -110,7 +113,7 @@ class TaskScheduler: except asyncio.CancelledError: return - task_state.task = asyncio.create_task( + task_state.task = self._loop.create_task( self._job(event, task_state.id)) self._log.info( @@ -215,7 +218,7 @@ class ShellScheduler(TaskScheduler): self._log.info(f"{task_id}: execute shell command: {cmd}") try: - proc = await asyncio.create_subprocess_shell(cmd) + proc = await asyncio.create_subprocess_shell(cmd, loop=self._loop) await proc.communicate() except Exception as e: self._log.error(f"{task_id}: {e}")