big source code cleanup

This commit is contained in:
2020-11-07 03:03:32 +01:00
parent fdaf2cee53
commit 380045d6bf
3 changed files with 130 additions and 167 deletions

View File

@@ -121,11 +121,7 @@ class DaemonInstance:
pending = self._get_pending_tasks()
if pending:
if self._timeout:
try:
future = asyncio.shield(asyncio.gather(*pending))
except asyncio.CancelledError:
pass
future = asyncio.gather(*pending)
self._log.info(
f"wait {self._timeout} seconds for {len(pending)} "
f"remaining task(s) to complete")
@@ -133,22 +129,21 @@ class DaemonInstance:
await asyncio.wait_for(future, self._timeout)
pending = []
except asyncio.TimeoutError:
future.cancel()
future.exception()
self._log.warning(
f"shutdown timeout exceeded")
pending = [t for t in pending if not t.done()]
if pending:
"shutdown timeout exceeded, remaining task(s) killed")
else:
self._log.warning(
f"cancel {len(pending)} remaining task(s)")
for task in pending:
task.cancel()
#try:
# await asyncio.gather(*pending)
#except asyncio.CancelledError:
# pass
try:
await asyncio.gather(*pending)
except asyncio.CancelledError:
pass
asyncio.get_event_loop().stop()
self._shutdown = False

View File

@@ -18,140 +18,116 @@ import os
import re
import shutil
from dataclasses import dataclass
from inspect import iscoroutinefunction
from shlex import quote as shell_quote
from uuid import uuid4
class _Task:
def __init__(self, event, delay, task_id, task, callback=None,
logname="task"):
self._event = event
self._path = event.pathname
self._delay = delay
self._task_id = task_id
self._job = task
self._callback = callback
self._task = None
self._log = logging.getLogger((logname or __name__))
async def _start(self):
try:
if self._delay > 0:
await asyncio.sleep(self._delay)
if self._callback is not None:
self._callback(self._event)
self._task = None
self._log.info(f"execute task {self._task_id}")
await self._job(self._event, self._task_id)
self._log.info(f"task {self._task_id} finished")
except asyncio.CancelledError:
self._log.info(f"task {self._task_id} cancelled")
def start(self):
if self._task is None:
self._task = asyncio.create_task(self._start())
def cancel(self):
if self._task is not None:
self._task.cancel()
self._task = None
def restart(self):
self.cancel()
self.start()
def task_id(self):
return self._task_id
def event_to_str(event):
return f"maskname={event.maskname}, pathname={event.pathname}"
class TaskScheduler:
def __init__(self, task, files, dirs, delay=0, logname="sched"):
assert callable(task), \
f"task: expected callable, got {type(task)}"
class Task:
def __init__(self, task=None, logname="task"):
assert task is None or iscoroutinefunction(task), \
f"task: expected asynchronous method or None, " \
f"got {type(task)}"
logname = (logname or __name__)
self._task = task
self._log = logging.getLogger(logname)
assert isinstance(delay, int), \
f"delay: expected {type(int)}, got {type(delay)}"
self._delay = delay
assert isinstance(files, bool), \
f"files: expected {type(bool)}, got {type(files)}"
self._files = files
assert isinstance(dirs, bool), \
f"dirs: expected {type(bool)}, got {type(dirs)}"
self._dirs = dirs
self._tasks = {}
self._logname = (logname or __name__)
self._log = logging.getLogger(self._logname)
def _task_started(self, event):
path = event.pathname
if path in self._tasks:
del self._tasks[path]
def schedule(self, event):
self._log.debug(f"received {event}")
if (not event.dir and not self._files) or \
(event.dir and not self._dirs):
return
path = event.pathname
maskname = event.maskname.split("|", 1)[0]
if path in self._tasks:
task = self._tasks[path]
task_id = task.task_id()
self._log.info(
f"received event {maskname} on '{path}', "
f"re-schedule task {task_id} (delay={self._delay}s)")
task.restart()
else:
def start(self, event, *args, **kwargs):
assert self._task, "task not set"
task_id = str(uuid4())
self._log.info(
f"received event {maskname} on '{path}', "
f"schedule task {task_id} (delay={self._delay}s)")
task = _Task(
event, self._delay, task_id, self._task,
callback=self._task_started, logname=self._logname)
self._tasks[path] = task
task.start()
def cancel(self, event):
self._log.debug(f"received {event}")
path = event.pathname
maskname = event.maskname.split("|", 1)[0]
if path in self._tasks:
task = self._tasks[path]
task_id = task.task_id()
self._log.info(
f"received event {maskname} on '{path}', "
f"cancel scheduled task {task_id}")
task.cancel()
del self._tasks[path]
task = asyncio.create_task(
self._task(
event, task_id, *args, **kwargs))
return (task_id, task)
def log(self, event):
self._log.info(f"LOG: received {event}")
self._log.info(f"LOG: {event}")
@dataclass
class _TaskState:
task_id: str = ""
task: asyncio.Task = None
waiting: bool = False
class TaskScheduler(Task):
def __init__(self, task=None, files=True, dirs=False, delay=0,
*args, **kwargs):
super().__init__(*args, **kwargs, task=self._schedule_task)
assert task is None or iscoroutinefunction(task), \
f"TaskScheduler: expected asynchronous method or None, " \
f"got {type(task)}"
assert isinstance(files, bool), \
f"files: expected {type(bool)}, got {type(files)}"
assert isinstance(dirs, bool), \
f"dirs: expected {type(bool)}, got {type(dirs)}"
self._delayed_task = task
self._files = files
self._dirs = dirs
self._delay = delay
self._tasks = {}
async def _schedule_task(self, event, task_id, task_state):
if self._delay > 0:
task_state.waiting = True
self._log.debug(
f"schedule task ({event_to_str(event)}, "
f"task_id={task_id}, delay={self._delay})")
await asyncio.sleep(self._delay)
task_state.waiting = False
self._log.debug(
f"start task ({event_to_str(event)}, task_id={task_id})")
await self._delayed_task(event, task_id)
self._log.debug(
f"task finished ({event_to_str(event)}, task_id={task_id})")
del self._tasks[event.pathname]
def start(self, event, *args, **kwargs):
if not ((not event.dir and self._files) or
(event.dir and self._dirs)):
return
self.cancel(event)
task_state = _TaskState()
task_state.task_id, task_state.task = super().start(
event, task_state, *args, **kwargs)
self._tasks[event.pathname] = task_state
def cancel(self, event):
if event.pathname in self._tasks:
task_state = self._tasks[event.pathname]
if task_state.waiting:
self._log.debug(
f"cancel task ({event_to_str(event)}, "
f"task_id={task_state.task_id})")
task_state.task.cancel()
del self._tasks[event.pathname]
class ShellScheduler(TaskScheduler):
def __init__(self, cmd, task=None, *args, **kwargs):
super().__init__(*args, **kwargs, task=self._shell_task)
assert isinstance(cmd, str), \
f"cmd: expected {type('')}, got {type(cmd)}"
self._cmd = cmd
super().__init__(*args, task=self.task, **kwargs)
async def task(self, event, task_id):
async def _shell_task(self, event, task_id, *args, **kwargs):
maskname = event.maskname.split("|", 1)[0]
if hasattr(event, "src_pathname"):
src_pathname = event.src_pathname
else:
@@ -178,54 +154,46 @@ class FileManagerRule:
valid = f"{', '.join(FileManagerRule.valid_actions)}"
assert action in self.valid_actions, \
f"action: expected [{valid}], got{action}"
self.action = action
self.src_re = re.compile(src_re)
assert isinstance(src_re, str), \
f"src_re: expected {type('')}, got {type(src_re)}"
assert isinstance(dst_re, str), \
f"dst_re: expected {type('')}, got {type(dst_re)}"
self.dst_re = dst_re
assert isinstance(auto_create, bool), \
f"auto_create: expected {type(bool)}, got {type(auto_create)}"
self.auto_create = auto_create
if dirmode is not None:
assert isinstance(dirmode, int), \
assert dirmode is None or isinstance(dirmode, int), \
f"dirmode: expected {type(int)}, got {type(dirmode)}"
self.dirmode = dirmode
if filemode is not None:
assert isinstance(filemode, int), \
assert filemode is None or isinstance(filemode, int), \
f"filemode: expected {type(int)}, got {type(filemode)}"
self.filemode = filemode
if user is not None:
assert isinstance(user, str), \
assert user is None or isinstance(user, str), \
f"user: expected {type('')}, got {type(user)}"
self.user = user
if group is not None:
assert isinstance(group, str), \
assert group is None or isinstance(group, str), \
f"group: expected {type('')}, got {type(group)}"
self.group = group
assert isinstance(rec, bool), \
f"rec: expected {type(bool)}, got {type(rec)}"
self.action = action
self.src_re = re.compile(src_re)
self.dst_re = dst_re
self.auto_create = auto_create
self.dirmode = dirmode
self.filemode = filemode
self.user = user
self.group = group
self.rec = rec
class FileManagerScheduler(TaskScheduler):
def __init__(self, rules, task=None, *args, **kwargs):
super().__init__(*args, **kwargs, task=self._manager_task)
if not isinstance(rules, list):
rules = [rules]
for rule in rules:
assert isinstance(rule, FileManagerRule), \
f"rules: expected {type(FileManagerRule)}, got {type(rule)}"
self._rules = rules
super().__init__(*args, task=self.task, **kwargs)
self._rules = rules
async def _chmod_and_chown(self, path, mode, chown, task_id):
if mode is not None:
@@ -283,14 +251,14 @@ class FileManagerScheduler(TaskScheduler):
return rule
def schedule(self, event):
def start(self, event):
if self._get_rule_by_event(event):
super().schedule(event)
super().start(event)
else:
self._log.debug(
f"no rule in ruleset matches path '{event.pathname}'")
async def task(self, event, task_id):
async def _manager_task(self, event, task_id, *args, **kwargs):
path = event.pathname
rule = self._get_rule_by_event(event)

View File

@@ -76,11 +76,10 @@ class _TaskList:
class Watch:
def __init__(self, path, event_map, rec=False, auto_add=False):
def __init__(self, path, event_map, rec=False, auto_add=False,
logname="watch"):
assert isinstance(path, str), \
f"path: expected {type('')}, got {type(path)}"
self._path = path
if isinstance(event_map, EventMap):
self._event_map = event_map
elif isinstance(event_map, dict):
@@ -92,10 +91,11 @@ class Watch:
assert isinstance(rec, bool), \
f"rec: expected {type(bool)}, got {type(rec)}"
self._rec = rec
assert isinstance(auto_add, bool), \
f"auto_add: expected {type(bool)}, got {type(auto_add)}"
self._path = path
self._rec = rec
self._auto_add = auto_add
def path(self):