diff --git a/README.md b/README.md index c67aaa5..bc58d96 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ The basic idea is to instantiate one or multiple schedulers and map specific ino pyinotifyd has different schedulers to schedule tasks with an optional delay. The advantages of using a scheduler are consistent logging and the possibility to cancel delayed tasks. Furthermore, schedulers have the ability to differentiate between files and directories. ### TaskScheduler -Schedule a custom python method *job* with an optional *delay* in seconds. Skip scheduling of tasks for files and/or directories according to *files* and *dirs* arguments. If there already is a scheduled task, re-schedule it with *delay*. Use *logname* in log messages. All additional modules, functions and variables that are defined in the config file and are needed within the *job*, need to be passed as dictionary to the TaskManager through *global_vars*. +Schedule a custom python method *job* with an optional *delay* in seconds. Skip scheduling of tasks for files and/or directories according to *files* and *dirs* arguments. If there already is a scheduled task, re-schedule it with *delay*. Use *logname* in log messages. All additional modules, functions and variables that are defined in the config file and are needed within the *job*, need to be passed as dictionary to the TaskManager through *global_vars*. If you want to limit the scheduler to run only one job at a time, set *singlejob* to True. All arguments except for *job* are optional. ```python # Please note that pyinotifyd uses pythons asyncio for asynchronous task execution. @@ -71,7 +71,8 @@ task_sched = TaskScheduler( dirs=False, delay=0, logname="sched", - global_vars=globals()) + global_vars=globals(), + singlejob=False) ``` ### ShellScheduler diff --git a/pyinotifyd/misc/config.py.default b/pyinotifyd/misc/config.py.default index 510241c..21f704c 100644 --- a/pyinotifyd/misc/config.py.default +++ b/pyinotifyd/misc/config.py.default @@ -6,15 +6,16 @@ #import logging # #async def custom_job(event, task_id): -# asyncio.sleep(1) +# await asyncio.sleep(1) # logging.info(f"{task_id}: execute example task: {event}") # #task_sched = TaskScheduler( # job=custom_job, # files=True, # dirs=False, -# delay=10 -# global_vars=globals()) +# delay=10, +# global_vars=globals(), +# singlejob=False) ########################### @@ -25,7 +26,8 @@ # cmd="/usr/local/bin/task.sh {maskname} {pathname} {src_pathname}", # files=True, # dirs=False, -# delay=10) +# delay=10, +# singlejob=False) ################################# diff --git a/pyinotifyd/scheduler.py b/pyinotifyd/scheduler.py index 96c8509..a0d328d 100755 --- a/pyinotifyd/scheduler.py +++ b/pyinotifyd/scheduler.py @@ -52,7 +52,7 @@ class TaskScheduler: self.cancelable = cancelable def __init__(self, job, files=True, dirs=False, delay=0, logname="sched", - loop=None, global_vars={}): + loop=None, global_vars={}, singlejob=False): assert iscoroutinefunction(job), \ f"job: expected coroutine, got {type(job)}" assert isinstance(files, bool), \ @@ -71,6 +71,7 @@ class TaskScheduler: self._log = logging.getLogger((logname or __name__)) self._loop = (loop or asyncio.get_event_loop()) self._globals = global_vars + self._singlejob = singlejob self._tasks = {} self._pause = False @@ -105,6 +106,9 @@ class TaskScheduler: else: self._log.info("all remainig tasks completed") + def taskindex(self, event): + return "singlejob" if self._singlejob else event.pathname + async def _run_job(self, event, task_state, restart=False): logger = SchedulerLogger(self._log, { "event": event, @@ -112,7 +116,7 @@ class TaskScheduler: if self._delay > 0: task_state.task = self._loop.create_task( - asyncio.sleep(self._delay, loop=self._loop)) + asyncio.sleep(self._delay)) try: if restart: prefix = "re-" @@ -145,7 +149,8 @@ class TaskScheduler: else: logger.info("task finished") finally: - del self._tasks[event.pathname] + task_index = self.taskindex(event) + del self._tasks[task_index] async def process_event(self, event): if not ((not event.dir and self._files) or @@ -153,9 +158,13 @@ class TaskScheduler: return restart = False + task_index = self.taskindex(event) try: - task_state = self._tasks[event.pathname] - + task_state = self._tasks[task_index] + except KeyError: + task_state = TaskScheduler.TaskState() + self._tasks[task_index] = task_state + else: logger = SchedulerLogger(self._log, { "event": event, "id": task_state.id}) @@ -171,16 +180,13 @@ class TaskScheduler: logger.warning("skip event due to ongoing task") return - except KeyError: - task_state = TaskScheduler.TaskState() - self._tasks[event.pathname] = task_state - if not self._pause: await self._run_job(event, task_state, restart) async def process_cancel_event(self, event): try: - task_state = self._tasks[event.pathname] + task_index = self.taskindex(event) + task_state = self._tasks[task_index] except KeyError: return @@ -192,7 +198,8 @@ class TaskScheduler: task_state.task.cancel() logger.info("scheduled task cancelled") task_state.task = None - del self._tasks[event.pathname] + logger.info(f"{task_index}") + del self._tasks[task_index] else: logger.warning("skip event due to ongoing task") @@ -285,7 +292,8 @@ class FileManagerRule: class FileManagerScheduler(TaskScheduler): def __init__(self, rules, job=None, *args, **kwargs): - super().__init__(*args, **kwargs, job=self._manager_job) + super().__init__( + *args, **kwargs, job=self._manager_job, singlejob=False) if not isinstance(rules, list): rules = [rules]