add argument singlejob to TaskScheduler

This commit is contained in:
2022-09-08 13:44:42 +02:00
parent 0ac52b23d4
commit c22bd73759
3 changed files with 29 additions and 18 deletions

View File

@@ -49,7 +49,7 @@ The basic idea is to instantiate one or multiple schedulers and map specific ino
pyinotifyd has different schedulers to schedule tasks with an optional delay. The advantages of using a scheduler are consistent logging and the possibility to cancel delayed tasks. Furthermore, schedulers have the ability to differentiate between files and directories. pyinotifyd has different schedulers to schedule tasks with an optional delay. The advantages of using a scheduler are consistent logging and the possibility to cancel delayed tasks. Furthermore, schedulers have the ability to differentiate between files and directories.
### TaskScheduler ### TaskScheduler
Schedule a custom python method *job* with an optional *delay* in seconds. Skip scheduling of tasks for files and/or directories according to *files* and *dirs* arguments. If there already is a scheduled task, re-schedule it with *delay*. Use *logname* in log messages. All additional modules, functions and variables that are defined in the config file and are needed within the *job*, need to be passed as dictionary to the TaskManager through *global_vars*. Schedule a custom python method *job* with an optional *delay* in seconds. Skip scheduling of tasks for files and/or directories according to *files* and *dirs* arguments. If there already is a scheduled task, re-schedule it with *delay*. Use *logname* in log messages. All additional modules, functions and variables that are defined in the config file and are needed within the *job*, need to be passed as dictionary to the TaskManager through *global_vars*. If you want to limit the scheduler to run only one job at a time, set *singlejob* to True.
All arguments except for *job* are optional. All arguments except for *job* are optional.
```python ```python
# Please note that pyinotifyd uses pythons asyncio for asynchronous task execution. # Please note that pyinotifyd uses pythons asyncio for asynchronous task execution.
@@ -71,7 +71,8 @@ task_sched = TaskScheduler(
dirs=False, dirs=False,
delay=0, delay=0,
logname="sched", logname="sched",
global_vars=globals()) global_vars=globals(),
singlejob=False)
``` ```
### ShellScheduler ### ShellScheduler

View File

@@ -6,15 +6,16 @@
#import logging #import logging
# #
#async def custom_job(event, task_id): #async def custom_job(event, task_id):
# asyncio.sleep(1) # await asyncio.sleep(1)
# logging.info(f"{task_id}: execute example task: {event}") # logging.info(f"{task_id}: execute example task: {event}")
# #
#task_sched = TaskScheduler( #task_sched = TaskScheduler(
# job=custom_job, # job=custom_job,
# files=True, # files=True,
# dirs=False, # dirs=False,
# delay=10 # delay=10,
# global_vars=globals()) # global_vars=globals(),
# singlejob=False)
########################### ###########################
@@ -25,7 +26,8 @@
# cmd="/usr/local/bin/task.sh {maskname} {pathname} {src_pathname}", # cmd="/usr/local/bin/task.sh {maskname} {pathname} {src_pathname}",
# files=True, # files=True,
# dirs=False, # dirs=False,
# delay=10) # delay=10,
# singlejob=False)
################################# #################################

View File

@@ -52,7 +52,7 @@ class TaskScheduler:
self.cancelable = cancelable self.cancelable = cancelable
def __init__(self, job, files=True, dirs=False, delay=0, logname="sched", def __init__(self, job, files=True, dirs=False, delay=0, logname="sched",
loop=None, global_vars={}): loop=None, global_vars={}, singlejob=False):
assert iscoroutinefunction(job), \ assert iscoroutinefunction(job), \
f"job: expected coroutine, got {type(job)}" f"job: expected coroutine, got {type(job)}"
assert isinstance(files, bool), \ assert isinstance(files, bool), \
@@ -71,6 +71,7 @@ class TaskScheduler:
self._log = logging.getLogger((logname or __name__)) self._log = logging.getLogger((logname or __name__))
self._loop = (loop or asyncio.get_event_loop()) self._loop = (loop or asyncio.get_event_loop())
self._globals = global_vars self._globals = global_vars
self._singlejob = singlejob
self._tasks = {} self._tasks = {}
self._pause = False self._pause = False
@@ -105,6 +106,9 @@ class TaskScheduler:
else: else:
self._log.info("all remainig tasks completed") self._log.info("all remainig tasks completed")
def taskindex(self, event):
return "singlejob" if self._singlejob else event.pathname
async def _run_job(self, event, task_state, restart=False): async def _run_job(self, event, task_state, restart=False):
logger = SchedulerLogger(self._log, { logger = SchedulerLogger(self._log, {
"event": event, "event": event,
@@ -112,7 +116,7 @@ class TaskScheduler:
if self._delay > 0: if self._delay > 0:
task_state.task = self._loop.create_task( task_state.task = self._loop.create_task(
asyncio.sleep(self._delay, loop=self._loop)) asyncio.sleep(self._delay))
try: try:
if restart: if restart:
prefix = "re-" prefix = "re-"
@@ -145,7 +149,8 @@ class TaskScheduler:
else: else:
logger.info("task finished") logger.info("task finished")
finally: finally:
del self._tasks[event.pathname] task_index = self.taskindex(event)
del self._tasks[task_index]
async def process_event(self, event): async def process_event(self, event):
if not ((not event.dir and self._files) or if not ((not event.dir and self._files) or
@@ -153,9 +158,13 @@ class TaskScheduler:
return return
restart = False restart = False
task_index = self.taskindex(event)
try: try:
task_state = self._tasks[event.pathname] task_state = self._tasks[task_index]
except KeyError:
task_state = TaskScheduler.TaskState()
self._tasks[task_index] = task_state
else:
logger = SchedulerLogger(self._log, { logger = SchedulerLogger(self._log, {
"event": event, "event": event,
"id": task_state.id}) "id": task_state.id})
@@ -171,16 +180,13 @@ class TaskScheduler:
logger.warning("skip event due to ongoing task") logger.warning("skip event due to ongoing task")
return return
except KeyError:
task_state = TaskScheduler.TaskState()
self._tasks[event.pathname] = task_state
if not self._pause: if not self._pause:
await self._run_job(event, task_state, restart) await self._run_job(event, task_state, restart)
async def process_cancel_event(self, event): async def process_cancel_event(self, event):
try: try:
task_state = self._tasks[event.pathname] task_index = self.taskindex(event)
task_state = self._tasks[task_index]
except KeyError: except KeyError:
return return
@@ -192,7 +198,8 @@ class TaskScheduler:
task_state.task.cancel() task_state.task.cancel()
logger.info("scheduled task cancelled") logger.info("scheduled task cancelled")
task_state.task = None task_state.task = None
del self._tasks[event.pathname] logger.info(f"{task_index}")
del self._tasks[task_index]
else: else:
logger.warning("skip event due to ongoing task") logger.warning("skip event due to ongoing task")
@@ -285,7 +292,8 @@ class FileManagerRule:
class FileManagerScheduler(TaskScheduler): class FileManagerScheduler(TaskScheduler):
def __init__(self, rules, job=None, *args, **kwargs): def __init__(self, rules, job=None, *args, **kwargs):
super().__init__(*args, **kwargs, job=self._manager_job) super().__init__(
*args, **kwargs, job=self._manager_job, singlejob=False)
if not isinstance(rules, list): if not isinstance(rules, list):
rules = [rules] rules = [rules]