correctly handle loop argument where needed

This commit is contained in:
2020-11-09 01:48:03 +01:00
parent 532b2f80ff
commit e2d3a16125
2 changed files with 29 additions and 20 deletions

View File

@@ -38,15 +38,16 @@ __version__ = "0.0.2"
class _SchedulerList: class _SchedulerList:
def __init__(self, schedulers=[]): def __init__(self, schedulers=[], loop=None):
if not isinstance(schedulers, list): if not isinstance(schedulers, list):
schedulers = [schedulers] schedulers = [schedulers]
self._schedulers = schedulers self._schedulers = schedulers
self._loop = (loop or asyncio.get_event_loop())
def process_event(self, event): def process_event(self, event):
for scheduler in self._schedulers: for scheduler in self._schedulers:
asyncio.create_task(scheduler.process_event(event)) self._loop.create_task(scheduler.process_event(event))
def schedulers(self): def schedulers(self):
return self._schedulers return self._schedulers
@@ -57,8 +58,9 @@ class EventMap(ProcessEvent):
**pyinotify.EventsCodes.OP_FLAGS, **pyinotify.EventsCodes.OP_FLAGS,
**pyinotify.EventsCodes.EVENT_FLAGS} **pyinotify.EventsCodes.EVENT_FLAGS}
def my_init(self, event_map=None, default_sched=None): def my_init(self, event_map=None, default_sched=None, loop=None):
self._map = {} self._map = {}
self._loop = (loop or asyncio.get_event_loop())
if default_sched is not None: if default_sched is not None:
for flag in EventMap.flags: for flag in EventMap.flags:
@@ -83,9 +85,10 @@ class EventMap(ProcessEvent):
isinstance(scheduler, Cancel): isinstance(scheduler, Cancel):
instances.append(scheduler) instances.append(scheduler)
else: else:
instances.append(TaskScheduler(scheduler)) instances.append(
TaskScheduler(scheduler, loop=self._loop))
self._map[flag] = _SchedulerList(instances) self._map[flag] = _SchedulerList(instances, loop=self._loop)
elif flag in self._map: elif flag in self._map:
del self._map[flag] del self._map[flag]
@@ -107,7 +110,7 @@ class EventMap(ProcessEvent):
class Watch: class Watch:
def __init__(self, path, event_map=None, default_sched=None, rec=False, def __init__(self, path, event_map=None, default_sched=None, rec=False,
auto_add=False, logname="watch"): auto_add=False, logname="watch", loop=None):
assert isinstance(path, str), \ assert isinstance(path, str), \
f"path: expected {type('')}, got {type(path)}" f"path: expected {type('')}, got {type(path)}"
@@ -122,6 +125,7 @@ class Watch:
assert isinstance(auto_add, bool), \ assert isinstance(auto_add, bool), \
f"auto_add: expected {type(bool)}, got {type(auto_add)}" f"auto_add: expected {type(bool)}, got {type(auto_add)}"
logname = (logname or __name__) logname = (logname or __name__)
self._loop = loop
self._path = path self._path = path
self._rec = rec self._rec = rec
@@ -137,7 +141,8 @@ class Watch:
def event_map(self): def event_map(self):
return self._event_map return self._event_map
def start(self, loop=asyncio.get_event_loop()): def start(self, loop=None):
loop = (loop or self._loop)
self._watch_manager.add_watch(self._path, pyinotify.ALL_EVENTS, self._watch_manager.add_watch(self._path, pyinotify.ALL_EVENTS,
rec=self._rec, auto_add=self._auto_add, rec=self._rec, auto_add=self._auto_add,
do_glob=True) do_glob=True)
@@ -154,12 +159,14 @@ class Watch:
class Pyinotifyd: class Pyinotifyd:
name = "pyinotifyd" name = "pyinotifyd"
def __init__(self, watches=[], shutdown_timeout=30, logname="daemon"): def __init__(self, watches=[], shutdown_timeout=30, logname="daemon",
loop=None):
self.set_watches(watches) self.set_watches(watches)
self.set_shutdown_timeout(shutdown_timeout) self.set_shutdown_timeout(shutdown_timeout)
logname = (logname or __name__) logname = (logname or __name__)
self._loop = (loop or asyncio.get_event_loop())
self._log = logging.getLogger(logname) self._log = logging.getLogger(logname)
self._loop = asyncio.get_event_loop()
@staticmethod @staticmethod
def from_cfg_file(config_file): def from_cfg_file(config_file):
@@ -203,8 +210,7 @@ class Pyinotifyd:
return list(set(schedulers)) return list(set(schedulers))
def start(self, loop=None): def start(self, loop=None):
if not loop: loop = (loop or self._loop)
loop = self._loop
if len(self._watches) == 0: if len(self._watches) == 0:
self._log.warning( self._log.warning(
@@ -396,12 +402,12 @@ def main():
for signame in ["SIGINT", "SIGTERM"]: for signame in ["SIGINT", "SIGTERM"]:
loop.add_signal_handler( loop.add_signal_handler(
getattr(signal, signame), getattr(signal, signame),
lambda: asyncio.ensure_future( lambda: loop.create_task(
daemon.shutdown(signame))) daemon.shutdown(signame)))
loop.add_signal_handler( loop.add_signal_handler(
getattr(signal, "SIGHUP"), getattr(signal, "SIGHUP"),
lambda: asyncio.ensure_future( lambda: loop.create_task(
daemon.reload("SIGHUP", args.config, args.debug))) daemon.reload("SIGHUP", args.config, args.debug)))
daemon.start() daemon.start()

View File

@@ -43,7 +43,8 @@ class TaskScheduler:
task: asyncio.Task = None task: asyncio.Task = None
cancelable: bool = True cancelable: bool = True
def __init__(self, job, files=True, dirs=False, delay=0, logname="sched"): def __init__(self, job, files=True, dirs=False, delay=0, logname="sched",
loop=None):
assert iscoroutinefunction(job), \ assert iscoroutinefunction(job), \
f"job: expected coroutine, got {type(job)}" f"job: expected coroutine, got {type(job)}"
assert isinstance(files, bool), \ assert isinstance(files, bool), \
@@ -58,6 +59,7 @@ class TaskScheduler:
self._dirs = dirs self._dirs = dirs
self._delay = delay self._delay = delay
self._log = logging.getLogger((logname or __name__)) self._log = logging.getLogger((logname or __name__))
self._loop = (loop or asyncio.get_event_loop())
self._tasks = {} self._tasks = {}
self._pause = False self._pause = False
@@ -78,7 +80,8 @@ class TaskScheduler:
self._log.info( self._log.info(
f"wait {timeout} seconds for {len(pending)} " f"wait {timeout} seconds for {len(pending)} "
f"remaining task(s) to complete") f"remaining task(s) to complete")
done, pending = await asyncio.wait([*pending], timeout=timeout) done, pending = await asyncio.wait([*pending], timeout=timeout,
loop=self._loop)
if pending: if pending:
self._log.warning( self._log.warning(
f"shutdown timeout exceeded, " f"shutdown timeout exceeded, "
@@ -86,7 +89,7 @@ class TaskScheduler:
for task in pending: for task in pending:
task.cancel() task.cancel()
try: try:
await asyncio.gather(*pending) await asyncio.gather(*pending, loop=self._loop)
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
else: else:
@@ -94,8 +97,8 @@ class TaskScheduler:
async def _run_job(self, event, task_state, restart=False): async def _run_job(self, event, task_state, restart=False):
if self._delay > 0: if self._delay > 0:
task_state.task = asyncio.create_task( task_state.task = self._loop.create_task(
asyncio.sleep(self._delay)) asyncio.sleep(self._delay, loop=self._loop))
try: try:
if restart: if restart:
@@ -110,7 +113,7 @@ class TaskScheduler:
except asyncio.CancelledError: except asyncio.CancelledError:
return return
task_state.task = asyncio.create_task( task_state.task = self._loop.create_task(
self._job(event, task_state.id)) self._job(event, task_state.id))
self._log.info( self._log.info(
@@ -215,7 +218,7 @@ class ShellScheduler(TaskScheduler):
self._log.info(f"{task_id}: execute shell command: {cmd}") self._log.info(f"{task_id}: execute shell command: {cmd}")
try: try:
proc = await asyncio.create_subprocess_shell(cmd) proc = await asyncio.create_subprocess_shell(cmd, loop=self._loop)
await proc.communicate() await proc.communicate()
except Exception as e: except Exception as e:
self._log.error(f"{task_id}: {e}") self._log.error(f"{task_id}: {e}")