diff --git a/pyinotifyd/__init__.py b/pyinotifyd/__init__.py index 126c27d..529bae6 100755 --- a/pyinotifyd/__init__.py +++ b/pyinotifyd/__init__.py @@ -121,11 +121,7 @@ class DaemonInstance: pending = self._get_pending_tasks() if pending: if self._timeout: - try: - future = asyncio.shield(asyncio.gather(*pending)) - except asyncio.CancelledError: - pass - + future = asyncio.gather(*pending) self._log.info( f"wait {self._timeout} seconds for {len(pending)} " f"remaining task(s) to complete") @@ -133,22 +129,21 @@ class DaemonInstance: await asyncio.wait_for(future, self._timeout) pending = [] except asyncio.TimeoutError: + future.cancel() + future.exception() self._log.warning( - f"shutdown timeout exceeded") - - pending = [t for t in pending if not t.done()] - - if pending: + "shutdown timeout exceeded, remaining task(s) killed") + else: self._log.warning( f"cancel {len(pending)} remaining task(s)") for task in pending: task.cancel() - #try: - # await asyncio.gather(*pending) - #except asyncio.CancelledError: - # pass + try: + await asyncio.gather(*pending) + except asyncio.CancelledError: + pass asyncio.get_event_loop().stop() self._shutdown = False diff --git a/pyinotifyd/scheduler.py b/pyinotifyd/scheduler.py index b126632..b52092d 100755 --- a/pyinotifyd/scheduler.py +++ b/pyinotifyd/scheduler.py @@ -18,140 +18,116 @@ import os import re import shutil +from dataclasses import dataclass +from inspect import iscoroutinefunction from shlex import quote as shell_quote from uuid import uuid4 -class _Task: - def __init__(self, event, delay, task_id, task, callback=None, - logname="task"): - self._event = event - self._path = event.pathname - self._delay = delay - self._task_id = task_id - self._job = task - self._callback = callback - - self._task = None - self._log = logging.getLogger((logname or __name__)) - - async def _start(self): - try: - if self._delay > 0: - await asyncio.sleep(self._delay) - - if self._callback is not None: - self._callback(self._event) - - self._task = None - - self._log.info(f"execute task {self._task_id}") - await self._job(self._event, self._task_id) - self._log.info(f"task {self._task_id} finished") - except asyncio.CancelledError: - self._log.info(f"task {self._task_id} cancelled") - - def start(self): - if self._task is None: - self._task = asyncio.create_task(self._start()) - - def cancel(self): - if self._task is not None: - self._task.cancel() - self._task = None - - def restart(self): - self.cancel() - self.start() - - def task_id(self): - return self._task_id +def event_to_str(event): + return f"maskname={event.maskname}, pathname={event.pathname}" -class TaskScheduler: - def __init__(self, task, files, dirs, delay=0, logname="sched"): - assert callable(task), \ - f"task: expected callable, got {type(task)}" +class Task: + def __init__(self, task=None, logname="task"): + assert task is None or iscoroutinefunction(task), \ + f"task: expected asynchronous method or None, " \ + f"got {type(task)}" + logname = (logname or __name__) + self._task = task + self._log = logging.getLogger(logname) - assert isinstance(delay, int), \ - f"delay: expected {type(int)}, got {type(delay)}" - self._delay = delay - - assert isinstance(files, bool), \ - f"files: expected {type(bool)}, got {type(files)}" - self._files = files - - assert isinstance(dirs, bool), \ - f"dirs: expected {type(bool)}, got {type(dirs)}" - self._dirs = dirs - - self._tasks = {} - self._logname = (logname or __name__) - self._log = logging.getLogger(self._logname) - - def _task_started(self, event): - path = event.pathname - if path in self._tasks: - del self._tasks[path] - - def schedule(self, event): - self._log.debug(f"received {event}") - - if (not event.dir and not self._files) or \ - (event.dir and not self._dirs): - return - - path = event.pathname - maskname = event.maskname.split("|", 1)[0] - - if path in self._tasks: - task = self._tasks[path] - task_id = task.task_id() - self._log.info( - f"received event {maskname} on '{path}', " - f"re-schedule task {task_id} (delay={self._delay}s)") - task.restart() - else: - task_id = str(uuid4()) - self._log.info( - f"received event {maskname} on '{path}', " - f"schedule task {task_id} (delay={self._delay}s)") - task = _Task( - event, self._delay, task_id, self._task, - callback=self._task_started, logname=self._logname) - self._tasks[path] = task - task.start() - - def cancel(self, event): - self._log.debug(f"received {event}") - - path = event.pathname - maskname = event.maskname.split("|", 1)[0] - if path in self._tasks: - task = self._tasks[path] - task_id = task.task_id() - self._log.info( - f"received event {maskname} on '{path}', " - f"cancel scheduled task {task_id}") - task.cancel() - del self._tasks[path] + def start(self, event, *args, **kwargs): + assert self._task, "task not set" + task_id = str(uuid4()) + task = asyncio.create_task( + self._task( + event, task_id, *args, **kwargs)) + return (task_id, task) def log(self, event): - self._log.info(f"LOG: received {event}") + self._log.info(f"LOG: {event}") + + +@dataclass +class _TaskState: + task_id: str = "" + task: asyncio.Task = None + waiting: bool = False + + +class TaskScheduler(Task): + def __init__(self, task=None, files=True, dirs=False, delay=0, + *args, **kwargs): + super().__init__(*args, **kwargs, task=self._schedule_task) + + assert task is None or iscoroutinefunction(task), \ + f"TaskScheduler: expected asynchronous method or None, " \ + f"got {type(task)}" + assert isinstance(files, bool), \ + f"files: expected {type(bool)}, got {type(files)}" + assert isinstance(dirs, bool), \ + f"dirs: expected {type(bool)}, got {type(dirs)}" + + self._delayed_task = task + self._files = files + self._dirs = dirs + self._delay = delay + + self._tasks = {} + + async def _schedule_task(self, event, task_id, task_state): + if self._delay > 0: + task_state.waiting = True + self._log.debug( + f"schedule task ({event_to_str(event)}, " + f"task_id={task_id}, delay={self._delay})") + await asyncio.sleep(self._delay) + task_state.waiting = False + + self._log.debug( + f"start task ({event_to_str(event)}, task_id={task_id})") + await self._delayed_task(event, task_id) + self._log.debug( + f"task finished ({event_to_str(event)}, task_id={task_id})") + del self._tasks[event.pathname] + + def start(self, event, *args, **kwargs): + if not ((not event.dir and self._files) or + (event.dir and self._dirs)): + return + + self.cancel(event) + + task_state = _TaskState() + task_state.task_id, task_state.task = super().start( + event, task_state, *args, **kwargs) + self._tasks[event.pathname] = task_state + + def cancel(self, event): + if event.pathname in self._tasks: + task_state = self._tasks[event.pathname] + + if task_state.waiting: + self._log.debug( + f"cancel task ({event_to_str(event)}, " + f"task_id={task_state.task_id})") + task_state.task.cancel() + del self._tasks[event.pathname] class ShellScheduler(TaskScheduler): def __init__(self, cmd, task=None, *args, **kwargs): + super().__init__(*args, **kwargs, task=self._shell_task) + assert isinstance(cmd, str), \ f"cmd: expected {type('')}, got {type(cmd)}" + self._cmd = cmd - super().__init__(*args, task=self.task, **kwargs) - - async def task(self, event, task_id): + async def _shell_task(self, event, task_id, *args, **kwargs): maskname = event.maskname.split("|", 1)[0] - if hasattr(event, "src_pathname"): src_pathname = event.src_pathname else: @@ -178,54 +154,46 @@ class FileManagerRule: valid = f"{', '.join(FileManagerRule.valid_actions)}" assert action in self.valid_actions, \ f"action: expected [{valid}], got{action}" - self.action = action - - self.src_re = re.compile(src_re) - + assert isinstance(src_re, str), \ + f"src_re: expected {type('')}, got {type(src_re)}" assert isinstance(dst_re, str), \ f"dst_re: expected {type('')}, got {type(dst_re)}" - self.dst_re = dst_re - assert isinstance(auto_create, bool), \ f"auto_create: expected {type(bool)}, got {type(auto_create)}" - self.auto_create = auto_create - - if dirmode is not None: - assert isinstance(dirmode, int), \ - f"dirmode: expected {type(int)}, got {type(dirmode)}" - self.dirmode = dirmode - - if filemode is not None: - assert isinstance(filemode, int), \ - f"filemode: expected {type(int)}, got {type(filemode)}" - self.filemode = filemode - - if user is not None: - assert isinstance(user, str), \ - f"user: expected {type('')}, got {type(user)}" - self.user = user - - if group is not None: - assert isinstance(group, str), \ - f"group: expected {type('')}, got {type(group)}" - self.group = group - + assert dirmode is None or isinstance(dirmode, int), \ + f"dirmode: expected {type(int)}, got {type(dirmode)}" + assert filemode is None or isinstance(filemode, int), \ + f"filemode: expected {type(int)}, got {type(filemode)}" + assert user is None or isinstance(user, str), \ + f"user: expected {type('')}, got {type(user)}" + assert group is None or isinstance(group, str), \ + f"group: expected {type('')}, got {type(group)}" assert isinstance(rec, bool), \ f"rec: expected {type(bool)}, got {type(rec)}" + + self.action = action + self.src_re = re.compile(src_re) + self.dst_re = dst_re + self.auto_create = auto_create + self.dirmode = dirmode + self.filemode = filemode + self.user = user + self.group = group self.rec = rec class FileManagerScheduler(TaskScheduler): def __init__(self, rules, task=None, *args, **kwargs): + super().__init__(*args, **kwargs, task=self._manager_task) + if not isinstance(rules, list): rules = [rules] for rule in rules: assert isinstance(rule, FileManagerRule), \ f"rules: expected {type(FileManagerRule)}, got {type(rule)}" - self._rules = rules - super().__init__(*args, task=self.task, **kwargs) + self._rules = rules async def _chmod_and_chown(self, path, mode, chown, task_id): if mode is not None: @@ -283,14 +251,14 @@ class FileManagerScheduler(TaskScheduler): return rule - def schedule(self, event): + def start(self, event): if self._get_rule_by_event(event): - super().schedule(event) + super().start(event) else: self._log.debug( f"no rule in ruleset matches path '{event.pathname}'") - async def task(self, event, task_id): + async def _manager_task(self, event, task_id, *args, **kwargs): path = event.pathname rule = self._get_rule_by_event(event) diff --git a/pyinotifyd/watch.py b/pyinotifyd/watch.py index fc838b5..0042d3e 100755 --- a/pyinotifyd/watch.py +++ b/pyinotifyd/watch.py @@ -76,11 +76,10 @@ class _TaskList: class Watch: - def __init__(self, path, event_map, rec=False, auto_add=False): + def __init__(self, path, event_map, rec=False, auto_add=False, + logname="watch"): assert isinstance(path, str), \ f"path: expected {type('')}, got {type(path)}" - self._path = path - if isinstance(event_map, EventMap): self._event_map = event_map elif isinstance(event_map, dict): @@ -92,10 +91,11 @@ class Watch: assert isinstance(rec, bool), \ f"rec: expected {type(bool)}, got {type(rec)}" - self._rec = rec - assert isinstance(auto_add, bool), \ f"auto_add: expected {type(bool)}, got {type(auto_add)}" + + self._path = path + self._rec = rec self._auto_add = auto_add def path(self):