# pyinotifyd is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # pyinotifyd is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with pyinotifyd. If not, see . # __all__ = [ "TaskScheduler", "Cancel", "ShellScheduler", "FileManagerRule", "FileManagerScheduler"] import asyncio import logging import os import re import shutil from dataclasses import dataclass from inspect import iscoroutinefunction from shlex import quote as shell_quote from uuid import uuid4 def _event_to_str(event): return f"maskname={event.maskname}, pathname={event.pathname}" class TaskScheduler: @dataclass class TaskState: id: str = str(uuid4()) task: asyncio.Task = None cancelable: bool = True def __init__(self, job, files=True, dirs=False, delay=0, logname="sched", loop=None): assert iscoroutinefunction(job), \ f"job: expected coroutine, got {type(job)}" assert isinstance(files, bool), \ f"files: expected {type(bool)}, got {type(files)}" assert isinstance(dirs, bool), \ f"dirs: expected {type(bool)}, got {type(dirs)}" assert isinstance(delay, int), \ f"delay: expected {type(int)}, got {type(delay)}" self._job = job self._files = files self._dirs = dirs self._delay = delay self._log = logging.getLogger((logname or __name__)) self._loop = (loop or asyncio.get_event_loop()) self._tasks = {} self._pause = False async def pause(self): self._log.info("pause scheduler") self._pause = True async def shutdown(self, timeout=None): self._pause = True pending = [t.task for t in self._tasks.values()] if pending: if timeout is None: self._log.info( f"wait for {len(pending)} " f"remaining task(s) to complete") else: self._log.info( f"wait {timeout} seconds for {len(pending)} " f"remaining task(s) to complete") done, pending = await asyncio.wait([*pending], timeout=timeout, loop=self._loop) if pending: self._log.warning( f"shutdown timeout exceeded, " f"cancel {len(pending)} remaining task(s)") for task in pending: task.cancel() try: await asyncio.gather(*pending, loop=self._loop) except asyncio.CancelledError: pass else: self._log.info("all remainig tasks completed") async def _run_job(self, event, task_state, restart=False): if self._delay > 0: task_state.task = self._loop.create_task( asyncio.sleep(self._delay, loop=self._loop)) try: if restart: prefix = "re-" else: prefix = "" self._log.info( f"{prefix}schedule task ({_event_to_str(event)}, " f"task_id={task_state.id}, delay={self._delay})") await task_state.task except asyncio.CancelledError: return task_state.task = self._loop.create_task( self._job(event, task_state.id)) self._log.info( f"start task ({_event_to_str(event)}, task_id={task_state.id})") try: task_state.cancelable = False await task_state.task except asyncio.CancelledError: self._log.warning( f"ongoing task cancelled ({_event_to_str(event)}, " f"task_id={task_state.id})") else: self._log.info( f"task finished ({_event_to_str(event)}, " f"task_id={task_state.id})") finally: del self._tasks[event.pathname] async def process_event(self, event): if not ((not event.dir and self._files) or (event.dir and self._dirs)): return restart = False try: task_state = self._tasks[event.pathname] if task_state.cancelable: task_state.task.cancel() if not self._pause: restart = True else: self._log.info( f"scheduled task cancelled ({_event_to_str(event)}, " f"task_id={task_state.id})") else: self.log.warning( f"skip ({_event_to_str(event)}) due to an ongoing task " f"(task_id={task_state.id})") return except KeyError: task_state = TaskScheduler.TaskState() self._tasks[event.pathname] = task_state if not self._pause: await self._run_job(event, task_state, restart) async def process_cancel_event(self, event): try: task_state = self._tasks[event.pathname] except KeyError: return if task_state.cancelable: task_state.task.cancel() self._log.info( f"scheduled task cancelled ({_event_to_str(event)}, " f"task_id={task_state.id})") task_state.task = None del self._tasks[event.pathname] else: self.log.warning( f"skip ({_event_to_str(event)}) due to an ongoing task " f"(task_id={task_state.id})") class Cancel: def __init__(self, task, *args, **kwargs): assert issubclass(type(task), TaskScheduler), \ f"task: expected {type(TaskScheduler)}, got {type(task)}" setattr(self, "process_event", task.process_cancel_event) async def shutdown(self, timeout=None): pass async def pause(self): pass class ShellScheduler(TaskScheduler): def __init__(self, cmd, job=None, *args, **kwargs): super().__init__(*args, **kwargs, job=self._shell_job) assert isinstance(cmd, str), \ f"cmd: expected {type('')}, got {type(cmd)}" self._cmd = cmd async def _shell_job(self, event, task_id): maskname = event.maskname.split("|", 1)[0] if hasattr(event, "src_pathname"): src_pathname = event.src_pathname else: src_pathname = "" cmd = self._cmd.replace("{maskname}", shell_quote(maskname)).replace( "{pathname}", shell_quote(event.pathname)).replace( "{src_pathname}", shell_quote(src_pathname)) self._log.info(f"{task_id}: execute shell command: {cmd}") try: proc = await asyncio.create_subprocess_shell(cmd, loop=self._loop) await proc.communicate() except Exception as e: self._log.error(f"{task_id}: {e}") class FileManagerRule: valid_actions = ["copy", "move", "delete"] def __init__(self, action, src_re, dst_re="", auto_create=False, overwrite=False, dirmode=None, filemode=None, user=None, group=None, rec=False): valid = f"{', '.join(FileManagerRule.valid_actions)}" assert action in self.valid_actions, \ f"action: expected [{valid}], got{action}" assert isinstance(src_re, str), \ f"src_re: expected {type('')}, got {type(src_re)}" assert isinstance(dst_re, str), \ f"dst_re: expected {type('')}, got {type(dst_re)}" assert isinstance(auto_create, bool), \ f"auto_create: expected {type(bool)}, got {type(auto_create)}" assert isinstance(overwrite, bool), \ f"auto_create: expected {type(bool)}, got {type(auto_create)}" assert dirmode is None or isinstance(dirmode, int), \ f"dirmode: expected {type(int)}, got {type(dirmode)}" assert filemode is None or isinstance(filemode, int), \ f"filemode: expected {type(int)}, got {type(filemode)}" assert user is None or isinstance(user, str), \ f"user: expected {type('')}, got {type(user)}" assert group is None or isinstance(group, str), \ f"group: expected {type('')}, got {type(group)}" assert isinstance(rec, bool), \ f"rec: expected {type(bool)}, got {type(rec)}" self.action = action self.src_re = re.compile(src_re) self.dst_re = dst_re self.auto_create = auto_create self.overwrite = overwrite self.dirmode = dirmode self.filemode = filemode self.user = user self.group = group self.rec = rec class FileManagerScheduler(TaskScheduler): def __init__(self, rules, job=None, *args, **kwargs): super().__init__(*args, **kwargs, job=self._manager_job) if not isinstance(rules, list): rules = [rules] for rule in rules: assert isinstance(rule, FileManagerRule), \ f"rules: expected {type(FileManagerRule)}, got {type(rule)}" self._rules = rules def _get_rule_by_event(self, event): rule = None for r in self._rules: if r.src_re.match(event.pathname): rule = r break return rule async def process_event(self, event): if not ((not event.dir and self._files) or (event.dir and self._dirs)): return if self._get_rule_by_event(event): await super().process_event(event) else: self._log.debug( f"no rule in ruleset matches path '{event.pathname}'") async def _chmod_and_chown(self, path, mode, chown, task_id): if mode is not None: self._log.debug(f"{task_id}: chmod {oct(mode)} '{path}'") os.chmod(path, mode) if chown is not None: changes = "" if chown[0] is not None: changes = chown[0] if chown[1] is not None: changes = f"{changes}:{chown[1]}" self._log.debug(f"{task_id}: chown {changes} '{path}'") shutil.chown(path, *chown) async def _set_mode_and_owner(self, path, rule, task_id): if (rule.user is rule.group is None): chown = None else: chown = (rule.user, rule.group) if os.path.isdir(path): mode = rule.dirmode else: mode = rule.filemode await self._chmod_and_chown(path, mode, chown, task_id) if not os.path.isdir(path): return work_on_dirs = not (rule.dirmode is chown is None) work_on_files = not (rule.filemode is chown is None) if work_on_dirs or work_on_files: for root, dirs, files in os.walk(path): if work_on_dirs: for p in [os.path.join(root, d) for d in dirs]: await self._chmod_and_chown( p, rule.dirmode, chown, task_id) if work_on_files: for p in [os.path.join(root, f) for f in files]: await self._chmod_and_chown( p, rule.filemode, chown, task_id) async def _manager_job(self, event, task_id): rule = self._get_rule_by_event(event) if not rule: return try: path = event.pathname if rule.action in ["copy", "move"]: dst = rule.src_re.sub(rule.dst_re, path) if not dst: raise RuntimeError( f"unable to {rule.action} '{path}', " f"resulting destination path is empty") if os.path.exists(dst) and not self.overwrite: raise RuntimeError( f"unable to {rule.action} file from '{path} " f"to '{dst}', destination path exists already") dst_dir = os.path.dirname(dst) if not os.path.isdir(dst_dir) and rule.auto_create: self._log.info( f"{task_id}: create directory '{dst_dir}'") first_subdir = dst_dir while not os.path.isdir(first_subdir): parent = os.path.dirname(first_subdir) if not os.path.isdir(parent): first_subdir = parent else: break try: os.makedirs(dst_dir) await self._set_mode_and_owner( first_subdir, rule, task_id) except Exception as e: raise RuntimeError(e) self._log.info( f"{task_id}: {rule.action} '{path}' to '{dst}'") try: if rule.action == "copy": if os.path.isdir(path): shutil.copytree(path, dst) else: shutil.copy2(path, dst) else: os.rename(path, dst) await self._set_mode_and_owner(dst, rule, task_id) except Exception as e: raise RuntimeError(e) elif rule.action == "delete": self._log.info( f"{task_id}: {rule.action} '{path}'") try: if os.path.isdir(path): if rule.rec: shutil.rmtree(path) else: shutil.rmdir(path) else: os.remove(path) except Exception as e: raise RuntimeError(e) except RuntimeError as e: self._log.error(f"{task_id}: {e}") except Exception as e: self._log.exception(f"{task_id}: {e}")