restructure source for better async operation

This commit is contained in:
2020-11-08 20:56:01 +01:00
parent 2a989cbfcc
commit 0c18e6097e
2 changed files with 113 additions and 115 deletions

View File

@@ -32,7 +32,7 @@ import sys
from pyinotify import ProcessEvent
from pyinotifyd._install import install, uninstall
from pyinotifyd.scheduler import Task
from pyinotifyd.scheduler import TaskScheduler
__version__ = "0.0.2"
@@ -52,7 +52,7 @@ class _TaskList:
def execute(self, event):
for task in self._tasks:
task.start(event)
asyncio.create_task(task.start(event))
class EventMap(ProcessEvent):
@@ -82,8 +82,8 @@ class EventMap(ProcessEvent):
task_instances = []
for task in tasks:
if not issubclass(type(task), Task):
task = Task(task)
if not issubclass(type(task), TaskScheduler):
task = TaskScheduler(task)
task_instances.append(task)
self._map[flag] = _TaskList(task_instances).execute

View File

@@ -13,9 +13,8 @@
#
__all__ = [
"Task",
"Cancel",
"TaskScheduler",
"Cancel",
"ShellScheduler",
"FileManagerRule",
"FileManagerScheduler"]
@@ -36,125 +35,125 @@ def _event_to_str(event):
return f"maskname={event.maskname}, pathname={event.pathname}"
class Task:
def __init__(self, task, logname="task"):
assert task is None or iscoroutinefunction(task), \
f"task: expected asynchronous method or None, " \
f"got {type(task)}"
logname = (logname or __name__)
class TaskScheduler:
self._task = task
self._log = logging.getLogger(logname)
def start(self, event, *args, **kwargs):
assert self._task, "task not set"
task_id = str(uuid4())
task = asyncio.create_task(
self._task(
event, task_id, *args, **kwargs))
return (task_id, task)
def cancel(self, event):
pass
class Cancel(Task):
def __init__(self, task):
assert issubclass(type(task), Task), \
f"task: expected {type(Task)}, got {type(task)}"
self._task = task
def start(self, event, *args, **kwargs):
self._task.cancel(event)
@dataclass
class _TaskState:
task_id: str = ""
@dataclass
class TaskState:
id: str = str(uuid4())
task: asyncio.Task = None
waiting: bool = True
cancelable: bool = True
class TaskScheduler(Task):
def __init__(self, task, files=True, dirs=False, delay=0,
*args, **kwargs):
super().__init__(*args, **kwargs, task=self._schedule_task)
assert task is None or iscoroutinefunction(task), \
f"TaskScheduler: expected asynchronous method or None, " \
f"got {type(task)}"
def __init__(self, job, files=True, dirs=False, delay=0, logname="sched"):
assert iscoroutinefunction(job), \
f"job: expected coroutine, got {type(job)}"
assert isinstance(files, bool), \
f"files: expected {type(bool)}, got {type(files)}"
assert isinstance(dirs, bool), \
f"dirs: expected {type(bool)}, got {type(dirs)}"
assert isinstance(delay, int), \
f"delay: expected {type(int)}, got {type(delay)}"
self._delayed_task = task
self._job = job
self._files = files
self._dirs = dirs
self._delay = delay
self._log = logging.getLogger((logname or __name__))
self._tasks = {}
async def _schedule_task(self, event, task_id, task_state, restart):
if self._delay > 0:
if restart:
action = "re-schedule"
else:
action = "schedule"
def cancel(self, event):
try:
task_state = self._tasks[event.pathname]
except KeyError:
return
if task_state.cancelable:
task_state.task.cancel()
self._log.info(
f"{action} task ({_event_to_str(event)}, "
f"task_id={task_state.task_id}, delay={self._delay})")
await asyncio.sleep(self._delay)
task_state.waiting = False
self._log.info(
f"start task ({_event_to_str(event)}, task_id={task_id})")
await self._delayed_task(event, task_id)
self._log.info(
f"task finished ({_event_to_str(event)}, task_id={task_id})")
f"scheduled task cancelled ({_event_to_str(event)}, "
f"task_id={task_state.id})")
task_state.task = None
del self._tasks[event.pathname]
else:
self.log.warning(
f"skip ({_event_to_str(event)}) due to an ongoing task "
f"(task_id={task_state.id})")
def start(self, event, *args, **kwargs):
async def start(self, event):
if not ((not event.dir and self._files) or
(event.dir and self._dirs)):
return
if event.pathname in self._tasks:
self.cancel(event, silent=True)
restart = True
prefix = ""
try:
task_state = self._tasks[event.pathname]
if task_state.cancelable:
task_state.task.cancel()
prefix = "re"
else:
restart = False
self.log.warning(
f"skip ({_event_to_str(event)}) due to an ongoing task "
f"(task_id={task_state.id})")
return
task_state = _TaskState()
task_state.task_id, task_state.task = super().start(
event, task_state, restart, *args, **kwargs)
except KeyError:
task_state = TaskScheduler.TaskState()
self._tasks[event.pathname] = task_state
def cancel(self, event, silent=False):
if event.pathname in self._tasks:
task_state = self._tasks[event.pathname]
if self._delay > 0:
task_state.task = asyncio.create_task(
asyncio.sleep(self._delay))
if task_state.waiting:
if not silent:
try:
self._log.info(
f"cancel task ({_event_to_str(event)}, "
f"task_id={task_state.task_id})")
task_state.task.cancel()
f"{prefix}schedule task ({_event_to_str(event)}, "
f"task_id={task_state.id}, delay={self._delay})")
await task_state.task
except asyncio.CancelledError:
return
task_state.task = asyncio.create_task(
self._job(event, task_state.id))
self._log.info(
f"start task ({_event_to_str(event)}, task_id={task_state.id})")
try:
task_state.cancelable = False
await task_state.task
except asyncio.CancelledError:
self._log.warning(
f"ongoing task cancelled ({_event_to_str(event)}, "
f"task_id={task_state.id})")
else:
self._log.info(
f"task finished ({_event_to_str(event)}, "
f"task_id={task_state.id})")
finally:
del self._tasks[event.pathname]
class Cancel(TaskScheduler):
def __init__(self, task):
assert issubclass(type(task), TaskScheduler), \
f"task: expected {type(TaskScheduler)}, got {type(task)}"
self._task = task
async def start(self, event):
self._task.cancel(event)
class ShellScheduler(TaskScheduler):
def __init__(self, cmd, task=None, *args, **kwargs):
super().__init__(*args, **kwargs, task=self._shell_task)
def __init__(self, cmd, job=None, *args, **kwargs):
super().__init__(*args, **kwargs, job=self._shell_job)
assert isinstance(cmd, str), \
f"cmd: expected {type('')}, got {type(cmd)}"
self._cmd = cmd
async def _shell_task(self, event, task_id, *args, **kwargs):
async def _shell_job(self, event, task_id):
maskname = event.maskname.split("|", 1)[0]
if hasattr(event, "src_pathname"):
src_pathname = event.src_pathname
@@ -211,8 +210,8 @@ class FileManagerRule:
class FileManagerScheduler(TaskScheduler):
def __init__(self, rules, task=None, *args, **kwargs):
super().__init__(*args, **kwargs, task=self._manager_task)
def __init__(self, rules, job=None, *args, **kwargs):
super().__init__(*args, **kwargs, job=self._manager_job)
if not isinstance(rules, list):
rules = [rules]
@@ -223,6 +222,22 @@ class FileManagerScheduler(TaskScheduler):
self._rules = rules
def _get_rule_by_event(self, event):
rule = None
for r in self._rules:
if r.src_re.match(event.pathname):
rule = r
break
return rule
async def start(self, event):
if self._get_rule_by_event(event):
await super().start(event)
else:
self._log.debug(
f"no rule in ruleset matches path '{event.pathname}'")
async def _chmod_and_chown(self, path, mode, chown, task_id):
if mode is not None:
self._log.debug(f"{task_id}: chmod {oct(mode)} '{path}'")
@@ -245,7 +260,7 @@ class FileManagerScheduler(TaskScheduler):
else:
chown = (rule.user, rule.group)
if os.path.isidr(path):
if os.path.isdir(path):
mode = rule.dirmode
else:
mode = rule.filemode
@@ -270,30 +285,13 @@ class FileManagerScheduler(TaskScheduler):
await self._chmod_and_chown(
p, rule.filemode, chown, task_id)
def _get_rule_by_event(self, event):
rule = None
for r in self._rules:
if r.src_re.match(event.pathname):
rule = r
break
return rule
def start(self, event):
if self._get_rule_by_event(event):
super().start(event)
else:
self._log.debug(
f"no rule in ruleset matches path '{event.pathname}'")
async def _manager_task(self, event, task_id, *args, **kwargs):
path = event.pathname
async def _manager_job(self, event, task_id):
rule = self._get_rule_by_event(event)
if not rule:
return
try:
path = event.pathname
if rule.action in ["copy", "move"]:
dst = rule.src_re.sub(rule.dst_re, path)
if not dst: