From 0c18e6097e3df6433e4883b267973f794bde42a8 Mon Sep 17 00:00:00 2001 From: Thomas Oettli Date: Sun, 8 Nov 2020 20:56:01 +0100 Subject: [PATCH] restructure source for better async operation --- pyinotifyd/__init__.py | 8 +- pyinotifyd/scheduler.py | 220 ++++++++++++++++++++-------------------- 2 files changed, 113 insertions(+), 115 deletions(-) diff --git a/pyinotifyd/__init__.py b/pyinotifyd/__init__.py index 826a8d9..11168df 100755 --- a/pyinotifyd/__init__.py +++ b/pyinotifyd/__init__.py @@ -32,7 +32,7 @@ import sys from pyinotify import ProcessEvent from pyinotifyd._install import install, uninstall -from pyinotifyd.scheduler import Task +from pyinotifyd.scheduler import TaskScheduler __version__ = "0.0.2" @@ -52,7 +52,7 @@ class _TaskList: def execute(self, event): for task in self._tasks: - task.start(event) + asyncio.create_task(task.start(event)) class EventMap(ProcessEvent): @@ -82,8 +82,8 @@ class EventMap(ProcessEvent): task_instances = [] for task in tasks: - if not issubclass(type(task), Task): - task = Task(task) + if not issubclass(type(task), TaskScheduler): + task = TaskScheduler(task) task_instances.append(task) self._map[flag] = _TaskList(task_instances).execute diff --git a/pyinotifyd/scheduler.py b/pyinotifyd/scheduler.py index 2282f27..44d0102 100755 --- a/pyinotifyd/scheduler.py +++ b/pyinotifyd/scheduler.py @@ -13,9 +13,8 @@ # __all__ = [ - "Task", - "Cancel", "TaskScheduler", + "Cancel", "ShellScheduler", "FileManagerRule", "FileManagerScheduler"] @@ -36,125 +35,125 @@ def _event_to_str(event): return f"maskname={event.maskname}, pathname={event.pathname}" -class Task: - def __init__(self, task, logname="task"): - assert task is None or iscoroutinefunction(task), \ - f"task: expected asynchronous method or None, " \ - f"got {type(task)}" - logname = (logname or __name__) +class TaskScheduler: - self._task = task - self._log = logging.getLogger(logname) - - def start(self, event, *args, **kwargs): - assert self._task, "task not set" - task_id = str(uuid4()) - task = asyncio.create_task( - self._task( - event, task_id, *args, **kwargs)) - return (task_id, task) - - def cancel(self, event): - pass + @dataclass + class TaskState: + id: str = str(uuid4()) + task: asyncio.Task = None + cancelable: bool = True -class Cancel(Task): - def __init__(self, task): - assert issubclass(type(task), Task), \ - f"task: expected {type(Task)}, got {type(task)}" - self._task = task - - def start(self, event, *args, **kwargs): - self._task.cancel(event) - - -@dataclass -class _TaskState: - task_id: str = "" - task: asyncio.Task = None - waiting: bool = True - - -class TaskScheduler(Task): - def __init__(self, task, files=True, dirs=False, delay=0, - *args, **kwargs): - super().__init__(*args, **kwargs, task=self._schedule_task) - - assert task is None or iscoroutinefunction(task), \ - f"TaskScheduler: expected asynchronous method or None, " \ - f"got {type(task)}" + def __init__(self, job, files=True, dirs=False, delay=0, logname="sched"): + assert iscoroutinefunction(job), \ + f"job: expected coroutine, got {type(job)}" assert isinstance(files, bool), \ f"files: expected {type(bool)}, got {type(files)}" assert isinstance(dirs, bool), \ f"dirs: expected {type(bool)}, got {type(dirs)}" + assert isinstance(delay, int), \ + f"delay: expected {type(int)}, got {type(delay)}" - self._delayed_task = task + self._job = job self._files = files self._dirs = dirs self._delay = delay + self._log = logging.getLogger((logname or __name__)) self._tasks = {} - async def _schedule_task(self, event, task_id, task_state, restart): - if self._delay > 0: - if restart: - action = "re-schedule" - else: - action = "schedule" + def cancel(self, event): + try: + task_state = self._tasks[event.pathname] + except KeyError: + return + if task_state.cancelable: + task_state.task.cancel() self._log.info( - f"{action} task ({_event_to_str(event)}, " - f"task_id={task_state.task_id}, delay={self._delay})") - await asyncio.sleep(self._delay) + f"scheduled task cancelled ({_event_to_str(event)}, " + f"task_id={task_state.id})") + task_state.task = None + del self._tasks[event.pathname] + else: + self.log.warning( + f"skip ({_event_to_str(event)}) due to an ongoing task " + f"(task_id={task_state.id})") - task_state.waiting = False - - self._log.info( - f"start task ({_event_to_str(event)}, task_id={task_id})") - await self._delayed_task(event, task_id) - self._log.info( - f"task finished ({_event_to_str(event)}, task_id={task_id})") - del self._tasks[event.pathname] - - def start(self, event, *args, **kwargs): + async def start(self, event): if not ((not event.dir and self._files) or (event.dir and self._dirs)): return - if event.pathname in self._tasks: - self.cancel(event, silent=True) - restart = True - else: - restart = False - - task_state = _TaskState() - task_state.task_id, task_state.task = super().start( - event, task_state, restart, *args, **kwargs) - self._tasks[event.pathname] = task_state - - def cancel(self, event, silent=False): - if event.pathname in self._tasks: + prefix = "" + try: task_state = self._tasks[event.pathname] - - if task_state.waiting: - if not silent: - self._log.info( - f"cancel task ({_event_to_str(event)}, " - f"task_id={task_state.task_id})") + if task_state.cancelable: task_state.task.cancel() - del self._tasks[event.pathname] + prefix = "re" + else: + self.log.warning( + f"skip ({_event_to_str(event)}) due to an ongoing task " + f"(task_id={task_state.id})") + return + + except KeyError: + task_state = TaskScheduler.TaskState() + self._tasks[event.pathname] = task_state + + if self._delay > 0: + task_state.task = asyncio.create_task( + asyncio.sleep(self._delay)) + + try: + self._log.info( + f"{prefix}schedule task ({_event_to_str(event)}, " + f"task_id={task_state.id}, delay={self._delay})") + await task_state.task + except asyncio.CancelledError: + return + + task_state.task = asyncio.create_task( + self._job(event, task_state.id)) + + self._log.info( + f"start task ({_event_to_str(event)}, task_id={task_state.id})") + + try: + task_state.cancelable = False + await task_state.task + except asyncio.CancelledError: + self._log.warning( + f"ongoing task cancelled ({_event_to_str(event)}, " + f"task_id={task_state.id})") + else: + self._log.info( + f"task finished ({_event_to_str(event)}, " + f"task_id={task_state.id})") + finally: + del self._tasks[event.pathname] + + +class Cancel(TaskScheduler): + def __init__(self, task): + assert issubclass(type(task), TaskScheduler), \ + f"task: expected {type(TaskScheduler)}, got {type(task)}" + self._task = task + + async def start(self, event): + self._task.cancel(event) class ShellScheduler(TaskScheduler): - def __init__(self, cmd, task=None, *args, **kwargs): - super().__init__(*args, **kwargs, task=self._shell_task) + def __init__(self, cmd, job=None, *args, **kwargs): + super().__init__(*args, **kwargs, job=self._shell_job) assert isinstance(cmd, str), \ f"cmd: expected {type('')}, got {type(cmd)}" self._cmd = cmd - async def _shell_task(self, event, task_id, *args, **kwargs): + async def _shell_job(self, event, task_id): maskname = event.maskname.split("|", 1)[0] if hasattr(event, "src_pathname"): src_pathname = event.src_pathname @@ -211,8 +210,8 @@ class FileManagerRule: class FileManagerScheduler(TaskScheduler): - def __init__(self, rules, task=None, *args, **kwargs): - super().__init__(*args, **kwargs, task=self._manager_task) + def __init__(self, rules, job=None, *args, **kwargs): + super().__init__(*args, **kwargs, job=self._manager_job) if not isinstance(rules, list): rules = [rules] @@ -223,6 +222,22 @@ class FileManagerScheduler(TaskScheduler): self._rules = rules + def _get_rule_by_event(self, event): + rule = None + for r in self._rules: + if r.src_re.match(event.pathname): + rule = r + break + + return rule + + async def start(self, event): + if self._get_rule_by_event(event): + await super().start(event) + else: + self._log.debug( + f"no rule in ruleset matches path '{event.pathname}'") + async def _chmod_and_chown(self, path, mode, chown, task_id): if mode is not None: self._log.debug(f"{task_id}: chmod {oct(mode)} '{path}'") @@ -245,7 +260,7 @@ class FileManagerScheduler(TaskScheduler): else: chown = (rule.user, rule.group) - if os.path.isidr(path): + if os.path.isdir(path): mode = rule.dirmode else: mode = rule.filemode @@ -270,30 +285,13 @@ class FileManagerScheduler(TaskScheduler): await self._chmod_and_chown( p, rule.filemode, chown, task_id) - def _get_rule_by_event(self, event): - rule = None - for r in self._rules: - if r.src_re.match(event.pathname): - rule = r - break - - return rule - - def start(self, event): - if self._get_rule_by_event(event): - super().start(event) - else: - self._log.debug( - f"no rule in ruleset matches path '{event.pathname}'") - - async def _manager_task(self, event, task_id, *args, **kwargs): - path = event.pathname + async def _manager_job(self, event, task_id): rule = self._get_rule_by_event(event) - if not rule: return try: + path = event.pathname if rule.action in ["copy", "move"]: dst = rule.src_re.sub(rule.dst_re, path) if not dst: