improve logging
This commit is contained in:
@@ -58,7 +58,8 @@ class EventMap(ProcessEvent):
|
||||
**pyinotify.EventsCodes.OP_FLAGS,
|
||||
**pyinotify.EventsCodes.EVENT_FLAGS}
|
||||
|
||||
def my_init(self, event_map=None, default_sched=None, loop=None):
|
||||
def my_init(self, event_map=None, default_sched=None, loop=None,
|
||||
logname="eventmap"):
|
||||
self._map = {}
|
||||
self._loop = (loop or asyncio.get_event_loop())
|
||||
|
||||
@@ -72,6 +73,8 @@ class EventMap(ProcessEvent):
|
||||
for flag, schedulers in event_map.items():
|
||||
self.set_scheduler(flag, schedulers)
|
||||
|
||||
self._log = logging.getLogger((logname or __name__))
|
||||
|
||||
def set_scheduler(self, flag, schedulers):
|
||||
assert flag in EventMap.flags, \
|
||||
f"event_map: invalid flag: {flag}"
|
||||
@@ -94,7 +97,15 @@ class EventMap(ProcessEvent):
|
||||
del self._map[flag]
|
||||
|
||||
def process_default(self, event):
|
||||
logging.debug(f"received {event}")
|
||||
msg = "received event"
|
||||
for attr in ["dir", "mask", "maskname", "pathname", "src_pathname", "wd"]:
|
||||
value = getattr(event, attr, None)
|
||||
if attr == "mask":
|
||||
value = hex(value)
|
||||
if value:
|
||||
msg += f", {attr}={value}"
|
||||
|
||||
self._log.debug(msg)
|
||||
maskname = event.maskname.split("|")[0]
|
||||
if maskname in self._map:
|
||||
self._map[maskname].process_event(event)
|
||||
|
||||
@@ -31,8 +31,16 @@ from shlex import quote as shell_quote
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
def _event_to_str(event):
|
||||
return f"maskname={event.maskname}, pathname={event.pathname}"
|
||||
class SchedulerLogger(logging.LoggerAdapter):
|
||||
def process(self, msg, **kwargs):
|
||||
if "event" in kwargs:
|
||||
event = kwargs["event"]
|
||||
msg = f"{msg}, mask={event.maskname}, path={event.pathname}"
|
||||
if "id" in kwargs:
|
||||
task_id = kwargs["id"]
|
||||
msg = f"{msg}, task_id={task_id}"
|
||||
|
||||
return msg, kwargs
|
||||
|
||||
|
||||
class TaskScheduler:
|
||||
@@ -99,37 +107,38 @@ class TaskScheduler:
|
||||
if self._delay > 0:
|
||||
task_state.task = self._loop.create_task(
|
||||
asyncio.sleep(self._delay, loop=self._loop))
|
||||
|
||||
logger = SchedulerLogger(self._log, {
|
||||
"event": event,
|
||||
"id": task_state.id,
|
||||
"delay": self._delay})
|
||||
try:
|
||||
if restart:
|
||||
prefix = "re-"
|
||||
else:
|
||||
prefix = ""
|
||||
|
||||
self._log.info(
|
||||
f"{prefix}schedule task ({_event_to_str(event)}, "
|
||||
f"task_id={task_state.id}, delay={self._delay})")
|
||||
logger.info(f"{prefix}schedule task")
|
||||
|
||||
await task_state.task
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
|
||||
logger = SchedulerLogger(self._log, {
|
||||
"event": event,
|
||||
"id": task_state.id})
|
||||
|
||||
logger.info("start task")
|
||||
|
||||
task_state.task = self._loop.create_task(
|
||||
self._job(event, task_state.id))
|
||||
|
||||
self._log.info(
|
||||
f"start task ({_event_to_str(event)}, task_id={task_state.id})")
|
||||
|
||||
try:
|
||||
task_state.cancelable = False
|
||||
await task_state.task
|
||||
except asyncio.CancelledError:
|
||||
self._log.warning(
|
||||
f"ongoing task cancelled ({_event_to_str(event)}, "
|
||||
f"task_id={task_state.id})")
|
||||
logger.warning("ongoing task cancelled")
|
||||
else:
|
||||
self._log.info(
|
||||
f"task finished ({_event_to_str(event)}, "
|
||||
f"task_id={task_state.id})")
|
||||
self._log.info("task finished")
|
||||
finally:
|
||||
del self._tasks[event.pathname]
|
||||
|
||||
@@ -141,19 +150,20 @@ class TaskScheduler:
|
||||
restart = False
|
||||
try:
|
||||
task_state = self._tasks[event.pathname]
|
||||
|
||||
logger = SchedulerLogger(self._log, {
|
||||
"event": event,
|
||||
"id": task_state.id})
|
||||
|
||||
if task_state.cancelable:
|
||||
task_state.task.cancel()
|
||||
if not self._pause:
|
||||
restart = True
|
||||
else:
|
||||
self._log.info(
|
||||
f"scheduled task cancelled ({_event_to_str(event)}, "
|
||||
f"task_id={task_state.id})")
|
||||
logger.info("scheduled task cancelled")
|
||||
|
||||
else:
|
||||
self.log.warning(
|
||||
f"skip ({_event_to_str(event)}) due to an ongoing task "
|
||||
f"(task_id={task_state.id})")
|
||||
logger.warning("skip event due to ongoing task")
|
||||
return
|
||||
|
||||
except KeyError:
|
||||
@@ -169,17 +179,17 @@ class TaskScheduler:
|
||||
except KeyError:
|
||||
return
|
||||
|
||||
logger = SchedulerLogger(self._log, {
|
||||
"event": event,
|
||||
"id": task_state.id})
|
||||
|
||||
if task_state.cancelable:
|
||||
task_state.task.cancel()
|
||||
self._log.info(
|
||||
f"scheduled task cancelled ({_event_to_str(event)}, "
|
||||
f"task_id={task_state.id})")
|
||||
logger.info("scheduled task cancelled")
|
||||
task_state.task = None
|
||||
del self._tasks[event.pathname]
|
||||
else:
|
||||
self.log.warning(
|
||||
f"skip ({_event_to_str(event)}) due to an ongoing task "
|
||||
f"(task_id={task_state.id})")
|
||||
logger.warning("skip event due to ongoing task")
|
||||
|
||||
|
||||
class Cancel:
|
||||
@@ -216,12 +226,16 @@ class ShellScheduler(TaskScheduler):
|
||||
"{pathname}", shell_quote(event.pathname)).replace(
|
||||
"{src_pathname}", shell_quote(src_pathname))
|
||||
|
||||
self._log.info(f"{task_id}: execute shell command: {cmd}")
|
||||
logger = SchedulerLogger(self._log, {
|
||||
"event": event,
|
||||
"id": task_id})
|
||||
|
||||
logger.info(f"execute shell command, cmd={cmd}")
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_shell(cmd, loop=self._loop)
|
||||
await proc.communicate()
|
||||
except Exception as e:
|
||||
self._log.error(f"{task_id}: {e}")
|
||||
logger.error(e)
|
||||
|
||||
|
||||
class FileManagerRule:
|
||||
@@ -294,12 +308,14 @@ class FileManagerScheduler(TaskScheduler):
|
||||
if self._get_rule_by_event(event):
|
||||
await super().process_event(event)
|
||||
else:
|
||||
self._log.debug(
|
||||
f"no rule in ruleset matches path '{event.pathname}'")
|
||||
logger = SchedulerLogger(self._log, {"event": event})
|
||||
logger.debug("no rule in ruleset matches")
|
||||
|
||||
async def _chmod_and_chown(self, path, mode, chown, logger=None):
|
||||
logger = (logger or self._log)
|
||||
|
||||
async def _chmod_and_chown(self, path, mode, chown, task_id):
|
||||
if mode is not None:
|
||||
self._log.debug(f"{task_id}: chmod {oct(mode)} '{path}'")
|
||||
logger.debug(f"chmod {oct(mode)}")
|
||||
os.chmod(path, mode)
|
||||
|
||||
if chown is not None:
|
||||
@@ -310,10 +326,12 @@ class FileManagerScheduler(TaskScheduler):
|
||||
if chown[1] is not None:
|
||||
changes = f"{changes}:{chown[1]}"
|
||||
|
||||
self._log.debug(f"{task_id}: chown {changes} '{path}'")
|
||||
logger.debug(f"chown {changes}")
|
||||
shutil.chown(path, *chown)
|
||||
|
||||
async def _set_mode_and_owner(self, path, rule, task_id):
|
||||
async def _set_mode_and_owner(self, path, rule, logger=None):
|
||||
logger = (logger or self._log)
|
||||
|
||||
if (rule.user is rule.group is None):
|
||||
chown = None
|
||||
else:
|
||||
@@ -324,7 +342,7 @@ class FileManagerScheduler(TaskScheduler):
|
||||
else:
|
||||
mode = rule.filemode
|
||||
|
||||
await self._chmod_and_chown(path, mode, chown, task_id)
|
||||
await self._chmod_and_chown(path, mode, chown, logger)
|
||||
|
||||
if not os.path.isdir(path):
|
||||
return
|
||||
@@ -337,18 +355,20 @@ class FileManagerScheduler(TaskScheduler):
|
||||
if work_on_dirs:
|
||||
for p in [os.path.join(root, d) for d in dirs]:
|
||||
await self._chmod_and_chown(
|
||||
p, rule.dirmode, chown, task_id)
|
||||
p, rule.dirmode, chown, logger)
|
||||
|
||||
if work_on_files:
|
||||
for p in [os.path.join(root, f) for f in files]:
|
||||
await self._chmod_and_chown(
|
||||
p, rule.filemode, chown, task_id)
|
||||
p, rule.filemode, chown, logger)
|
||||
|
||||
async def _manager_job(self, event, task_id):
|
||||
rule = self._get_rule_by_event(event)
|
||||
if not rule:
|
||||
return
|
||||
|
||||
logger = SchedulerLogger(self._log, {"id": task_id})
|
||||
|
||||
try:
|
||||
path = event.pathname
|
||||
if rule.action in ["copy", "move"]:
|
||||
@@ -365,8 +385,7 @@ class FileManagerScheduler(TaskScheduler):
|
||||
|
||||
dst_dir = os.path.dirname(dst)
|
||||
if not os.path.isdir(dst_dir) and rule.auto_create:
|
||||
self._log.info(
|
||||
f"{task_id}: create directory '{dst_dir}'")
|
||||
logger.info(f"create directory '{dst_dir}'")
|
||||
first_subdir = dst_dir
|
||||
while not os.path.isdir(first_subdir):
|
||||
parent = os.path.dirname(first_subdir)
|
||||
@@ -378,12 +397,11 @@ class FileManagerScheduler(TaskScheduler):
|
||||
try:
|
||||
os.makedirs(dst_dir)
|
||||
await self._set_mode_and_owner(
|
||||
first_subdir, rule, task_id)
|
||||
first_subdir, rule, logger)
|
||||
except Exception as e:
|
||||
raise RuntimeError(e)
|
||||
|
||||
self._log.info(
|
||||
f"{task_id}: {rule.action} '{path}' to '{dst}'")
|
||||
logger.info(f"{rule.action} '{path}' to '{dst}'")
|
||||
|
||||
try:
|
||||
if rule.action == "copy":
|
||||
@@ -395,13 +413,12 @@ class FileManagerScheduler(TaskScheduler):
|
||||
else:
|
||||
os.rename(path, dst)
|
||||
|
||||
await self._set_mode_and_owner(dst, rule, task_id)
|
||||
await self._set_mode_and_owner(dst, rule, logger)
|
||||
except Exception as e:
|
||||
raise RuntimeError(e)
|
||||
|
||||
elif rule.action == "delete":
|
||||
self._log.info(
|
||||
f"{task_id}: {rule.action} '{path}'")
|
||||
logger.info(f"{rule.action} '{path}'")
|
||||
try:
|
||||
if os.path.isdir(path):
|
||||
if rule.rec:
|
||||
@@ -415,7 +432,7 @@ class FileManagerScheduler(TaskScheduler):
|
||||
raise RuntimeError(e)
|
||||
|
||||
except RuntimeError as e:
|
||||
self._log.error(f"{task_id}: {e}")
|
||||
logger.error(e)
|
||||
|
||||
except Exception as e:
|
||||
self._log.exception(f"{task_id}: {e}")
|
||||
logger.exception(e)
|
||||
|
||||
Reference in New Issue
Block a user