diff --git a/pyinotifyd/__init__.py b/pyinotifyd/__init__.py index 11168df..302d249 100755 --- a/pyinotifyd/__init__.py +++ b/pyinotifyd/__init__.py @@ -32,27 +32,24 @@ import sys from pyinotify import ProcessEvent from pyinotifyd._install import install, uninstall -from pyinotifyd.scheduler import TaskScheduler +from pyinotifyd.scheduler import TaskScheduler, Cancel __version__ = "0.0.2" -class _TaskList: - def __init__(self, tasks=[]): - if not isinstance(tasks, list): - tasks = [tasks] +class _SchedulerList: + def __init__(self, schedulers=[]): + if not isinstance(schedulers, list): + schedulers = [schedulers] - self._tasks = tasks + self._schedulers = schedulers - def add(self, task): - self._tasks.append(task) + def process_event(self, event): + for scheduler in self._schedulers: + asyncio.create_task(scheduler.process_event(event)) - def remove(self, task): - self._tasks.remove(task) - - def execute(self, event): - for task in self._tasks: - asyncio.create_task(task.start(event)) + def schedulers(self): + return self._schedulers class EventMap(ProcessEvent): @@ -60,33 +57,35 @@ class EventMap(ProcessEvent): **pyinotify.EventsCodes.OP_FLAGS, **pyinotify.EventsCodes.EVENT_FLAGS} - def my_init(self, event_map=None, default_task=None): + def my_init(self, event_map=None, default_sched=None): self._map = {} - if default_task is not None: + if default_sched is not None: for flag in EventMap.flags: - self.set(flag, default_task) + self.set(flag, default_sched) if event_map is not None: assert isinstance(event_map, dict), \ f"event_map: expected {type(dict)}, got {type(event_map)}" - for flag, tasks in event_map.items(): - self.set_task(flag, tasks) + for flag, schedulers in event_map.items(): + self.set_scheduler(flag, schedulers) - def set_task(self, flag, tasks): + def set_scheduler(self, flag, schedulers): assert flag in EventMap.flags, \ f"event_map: invalid flag: {flag}" - if tasks is not None: - if not isinstance(tasks, list): - tasks = [tasks] + if schedulers is not None: + if not isinstance(schedulers, list): + schedulers = [schedulers] - task_instances = [] - for task in tasks: - if not issubclass(type(task), TaskScheduler): - task = TaskScheduler(task) + instances = [] + for scheduler in schedulers: + if issubclass(type(scheduler), TaskScheduler) or \ + isinstance(scheduler, Cancel): + instances.append(scheduler) + else: + instances.append(TaskScheduler(scheduler)) - task_instances.append(task) - self._map[flag] = _TaskList(task_instances).execute + self._map[flag] = _SchedulerList(instances) elif flag in self._map: del self._map[flag] @@ -95,11 +94,19 @@ class EventMap(ProcessEvent): logging.debug(f"received {event}") maskname = event.maskname.split("|")[0] if maskname in self._map: - self._map[maskname](event) + self._map[maskname].process_event(event) + + def schedulers(self): + schedulers = [] + for scheduler_list in self._map.values(): + schedulers.extend( + scheduler_list.schedulers()) + + return list(set(schedulers)) class Watch: - def __init__(self, path, event_map=None, default_task=None, rec=False, + def __init__(self, path, event_map=None, default_sched=None, rec=False, auto_add=False, logname="watch"): assert isinstance(path, str), \ f"path: expected {type('')}, got {type(path)}" @@ -108,7 +115,7 @@ class Watch: self._event_map = event_map else: self._event_map = EventMap( - event_map=event_map, default_task=default_task) + event_map=event_map, default_sched=default_sched) assert isinstance(rec, bool), \ f"rec: expected {type(bool)}, got {type(rec)}" @@ -127,6 +134,9 @@ class Watch: def path(self): return self._path + def event_map(self): + return self._event_map + def start(self, loop=asyncio.get_event_loop()): self._watch_manager.add_watch(self._path, pyinotify.ALL_EVENTS, rec=self._rec, auto_add=self._auto_add, @@ -186,6 +196,12 @@ class Pyinotifyd: f"got {type(timeout)}" self._shutdown_timeout = timeout + def schedulers(self): + schedulers = [] + for w in self._watches: + schedulers.extend(w.event_map().schedulers()) + return list(set(schedulers)) + def start(self, loop=None): if not loop: loop = self._loop @@ -199,14 +215,22 @@ class Pyinotifyd: f"start listening for inotify events on '{watch.path()}'") watch.start(loop) - def stop(self): + def pause(self): + for scheduler in self.schedulers(): + scheduler.pause() + + async def shutdown(self): + schedulers = self.schedulers() + + tasks = [s.shutdown(self._shutdown_timeout) for s in set(schedulers)] + if tasks: + await asyncio.gather(*tasks) + for watch in self._watches: - self._log.info( + self._log.debug( f"stop listening for inotify events on '{watch.path()}'") watch.stop() - return self._shutdown_timeout - class DaemonInstance: def __init__(self, instance, logname="daemon"): @@ -217,49 +241,30 @@ class DaemonInstance: def start(self): self._instance.start() - def stop(self): - return self._instance.stop() - - def _get_pending_tasks(self): - return [t for t in asyncio.all_tasks() - if t is not asyncio.current_task()] - async def shutdown(self, signame): if self._shutdown: self._log.warning( f"got signal {signame}, but shutdown already in progress") return - self._shutdown = True self._log.info(f"got signal {signame}, shutdown") - timeout = self.stop() + self._shutdown = True - pending = self._get_pending_tasks() - if pending: - if timeout: - future = asyncio.gather(*pending) - self._log.info( - f"wait {timeout} seconds for {len(pending)} " - f"remaining task(s) to complete") - try: - await asyncio.wait_for(future, timeout) - pending = [] - except asyncio.TimeoutError: - future.cancel() - future.exception() - self._log.warning( - "shutdown timeout exceeded, remaining task(s) killed") - else: - self._log.warning( - f"cancel {len(pending)} remaining task(s)") + try: + await self._instance.shutdown() - for task in pending: - task.cancel() + pending = [t for t in asyncio.all_tasks() + if t is not asyncio.current_task()] - try: - await asyncio.gather(*pending) - except asyncio.CancelledError: - pass + for task in pending: + task.cancel() + + try: + await asyncio.gather(*pending) + except asyncio.CancelledError: + pass + except Exception as e: + self._log.exception(f"error during shutdown: {e}") asyncio.get_event_loop().stop() self._shutdown = False @@ -280,23 +285,28 @@ class DaemonInstance: else: if debug: logging.getLogger().setLevel(logging.DEBUG) - self.stop() + + old_instance = self._instance + + old_instance.pause() + instance.start() + asyncio.create_task(old_instance.shutdown()) + self._instance = instance - self.start() def main(): - myname = Pyinotifyd.name + name = Pyinotifyd.name parser = argparse.ArgumentParser( - description=myname, + description=name, formatter_class=lambda prog: argparse.HelpFormatter( prog, max_help_position=45, width=140)) parser.add_argument( "-c", "--config", - help=f"path to config file (default: /etc/{myname}/config.py)", - default=f"/etc/{myname}/config.py") + help=f"path to config file (default: /etc/{name}/config.py)", + default=f"/etc/{name}/config.py") parser.add_argument( "-d", "--debug", @@ -333,7 +343,7 @@ def main(): args = parser.parse_args() if args.version: - print(f"{myname} ({__version__})") + print(f"{name} ({__version__})") sys.exit(0) if args.list: @@ -355,10 +365,10 @@ def main(): root_logger.addHandler(ch) if args.install: - sys.exit(install(myname)) + sys.exit(install(name)) if args.uninstall: - sys.exit(uninstall(myname)) + sys.exit(uninstall(name)) try: pyinotifyd = Pyinotifyd.from_cfg_file(args.config) @@ -379,7 +389,7 @@ def main(): root_logger.setLevel(loglevel) formatter = logging.Formatter( - f"%(asctime)s - {myname}/%(name)s - %(levelname)s - %(message)s") + f"%(asctime)s - {name}/%(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) loop = asyncio.get_event_loop() @@ -392,7 +402,7 @@ def main(): loop.add_signal_handler( getattr(signal, "SIGHUP"), lambda: asyncio.ensure_future( - daemon.reload("SIGHUP", myname, args.config, args.debug))) + daemon.reload("SIGHUP", args.config, args.debug))) daemon.start() loop.run_forever() diff --git a/pyinotifyd/scheduler.py b/pyinotifyd/scheduler.py index 44d0102..f1c678b 100755 --- a/pyinotifyd/scheduler.py +++ b/pyinotifyd/scheduler.py @@ -43,7 +43,6 @@ class TaskScheduler: task: asyncio.Task = None cancelable: bool = True - def __init__(self, job, files=True, dirs=False, delay=0, logname="sched"): assert iscoroutinefunction(job), \ f"job: expected coroutine, got {type(job)}" @@ -61,51 +60,49 @@ class TaskScheduler: self._log = logging.getLogger((logname or __name__)) self._tasks = {} + self._pause = False - def cancel(self, event): - try: - task_state = self._tasks[event.pathname] - except KeyError: - return + async def pause(self): + self._log.info("pause scheduler") + self._pause = True - if task_state.cancelable: - task_state.task.cancel() - self._log.info( - f"scheduled task cancelled ({_event_to_str(event)}, " - f"task_id={task_state.id})") - task_state.task = None - del self._tasks[event.pathname] - else: - self.log.warning( - f"skip ({_event_to_str(event)}) due to an ongoing task " - f"(task_id={task_state.id})") - - async def start(self, event): - if not ((not event.dir and self._files) or - (event.dir and self._dirs)): - return - - prefix = "" - try: - task_state = self._tasks[event.pathname] - if task_state.cancelable: - task_state.task.cancel() - prefix = "re" + async def shutdown(self, timeout=None): + self._pause = True + pending = [t.task for t in self._tasks.values()] + if pending: + if timeout is None: + self._log.info( + f"wait for {len(pending)} " + f"remaining task(s) to complete") else: - self.log.warning( - f"skip ({_event_to_str(event)}) due to an ongoing task " - f"(task_id={task_state.id})") - return - - except KeyError: - task_state = TaskScheduler.TaskState() - self._tasks[event.pathname] = task_state + self._log.info( + f"wait {timeout} seconds for {len(pending)} " + f"remaining task(s) to complete") + done, pending = await asyncio.wait([*pending], timeout=timeout) + if pending: + self._log.warning( + f"shutdown timeout exceeded, " + f"cancel {len(pending)} remaining task(s)") + for task in pending: + task.cancel() + try: + await asyncio.gather(*pending) + except asyncio.CancelledError: + pass + else: + self._log.info("all remainig tasks completed") + async def _run_job(self, event, task_state, restart=False): if self._delay > 0: task_state.task = asyncio.create_task( asyncio.sleep(self._delay)) try: + if restart: + prefix = "re-" + else: + prefix = "" + self._log.info( f"{prefix}schedule task ({_event_to_str(event)}, " f"task_id={task_state.id}, delay={self._delay})") @@ -133,15 +130,67 @@ class TaskScheduler: finally: del self._tasks[event.pathname] + async def process_event(self, event): + if not ((not event.dir and self._files) or + (event.dir and self._dirs)): + return -class Cancel(TaskScheduler): - def __init__(self, task): + restart = False + try: + task_state = self._tasks[event.pathname] + if task_state.cancelable: + task_state.task.cancel() + if not self._pause: + restart = True + else: + self._log.info( + f"scheduled task cancelled ({_event_to_str(event)}, " + f"task_id={task_state.id})") + + else: + self.log.warning( + f"skip ({_event_to_str(event)}) due to an ongoing task " + f"(task_id={task_state.id})") + return + + except KeyError: + task_state = TaskScheduler.TaskState() + self._tasks[event.pathname] = task_state + + if not self._pause: + await self._run_job(event, task_state, restart) + + async def process_cancel_event(self, event): + try: + task_state = self._tasks[event.pathname] + except KeyError: + return + + if task_state.cancelable: + task_state.task.cancel() + self._log.info( + f"scheduled task cancelled ({_event_to_str(event)}, " + f"task_id={task_state.id})") + task_state.task = None + del self._tasks[event.pathname] + else: + self.log.warning( + f"skip ({_event_to_str(event)}) due to an ongoing task " + f"(task_id={task_state.id})") + + +class Cancel: + def __init__(self, task, *args, **kwargs): assert issubclass(type(task), TaskScheduler), \ f"task: expected {type(TaskScheduler)}, got {type(task)}" - self._task = task - async def start(self, event): - self._task.cancel(event) + setattr(self, "process_event", task.process_cancel_event) + + async def shutdown(self, timeout=None): + pass + + async def pause(self): + pass class ShellScheduler(TaskScheduler): @@ -231,9 +280,9 @@ class FileManagerScheduler(TaskScheduler): return rule - async def start(self, event): + async def process_event(self, event): if self._get_rule_by_event(event): - await super().start(event) + await super().process_event(event) else: self._log.debug( f"no rule in ruleset matches path '{event.pathname}'")