initial commit
This commit is contained in:
340
pyinotifyd.py
Executable file
340
pyinotifyd.py
Executable file
@@ -0,0 +1,340 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# pyinotifyd is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# pyinotifyd is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with pyinotifyd. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import logging.handlers
|
||||
import pyinotify
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
from shlex import quote as shell_quote
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
class Task:
|
||||
def __init__(self, event, delay, task_id, job, callback=None,
|
||||
logname="Task"):
|
||||
self._event = event
|
||||
self._path = event.pathname
|
||||
self._delay = delay
|
||||
self.task_id = task_id
|
||||
self._task = None
|
||||
self._job = job
|
||||
self._callback = callback
|
||||
self._log = logging.getLogger(logname)
|
||||
|
||||
async def _start(self):
|
||||
if self._delay > 0:
|
||||
await asyncio.sleep(self._delay)
|
||||
|
||||
if self._callback is not None:
|
||||
self._callback(self._event)
|
||||
self._task = None
|
||||
self._log.info(f"execute task {self.task_id}")
|
||||
await asyncio.shield(self._job(self._event, self.task_id))
|
||||
self._log.info(f"task {self.task_id} finished")
|
||||
|
||||
def start(self):
|
||||
if self._task is None:
|
||||
self._task = asyncio.create_task(self._start())
|
||||
|
||||
def cancel(self):
|
||||
if self._task is not None:
|
||||
self._task.cancel()
|
||||
self._task = None
|
||||
|
||||
def restart(self):
|
||||
self.cancel()
|
||||
self.start()
|
||||
|
||||
|
||||
class TaskScheduler:
|
||||
def __init__(self, job, delay=0, files=True, directories=False,
|
||||
logname="TaskScheduler"):
|
||||
self._delay = delay
|
||||
self._files = files
|
||||
self._directories = directories
|
||||
self._job = job
|
||||
self._tasks = {}
|
||||
self._log = logging.getLogger(logname)
|
||||
|
||||
def _task_started(self, event):
|
||||
path = event.pathname
|
||||
if path in self._tasks:
|
||||
del self._tasks[path]
|
||||
|
||||
def schedule(self, event):
|
||||
self._log.debug(f"received {event}")
|
||||
path = event.pathname
|
||||
maskname = event.maskname.split("|", 1)[0]
|
||||
if (not event.dir and not self._files) or \
|
||||
(event.dir and not self._directories):
|
||||
return
|
||||
|
||||
if path in self._tasks:
|
||||
task = self._tasks[path]
|
||||
self._log.info(f"received event {maskname} on '{path}', "
|
||||
f"re-schedule task {task.task_id} "
|
||||
f"(delay={self._delay}s)")
|
||||
task.restart()
|
||||
else:
|
||||
task_id = str(uuid4())
|
||||
self._log.info(
|
||||
f"received event {maskname} on '{path}', "
|
||||
f"schedule task {task_id} (delay={self._delay}s)")
|
||||
task = Task(
|
||||
event, self._delay, task_id=task_id, job=self._job,
|
||||
callback=self._task_started)
|
||||
self._tasks[path] = task
|
||||
task.start()
|
||||
|
||||
def cancel(self, event):
|
||||
self._log.debug(f"received {event}")
|
||||
path = event.pathname
|
||||
maskname = event.maskname.split("|", 1)[0]
|
||||
if path in self._tasks:
|
||||
task = self._tasks[path]
|
||||
self._log.info(f"received event {maskname} on '{path}', "
|
||||
f"cancel scheduled task {task.task_id}")
|
||||
task.cancel()
|
||||
del self._tasks[path]
|
||||
|
||||
|
||||
class ShellScheduler(TaskScheduler):
|
||||
def __init__(self, cmd, job=None, logname="ShellScheduler",
|
||||
*args, **kwargs):
|
||||
self._cmd = cmd
|
||||
super().__init__(*args, job=self.job, logname=logname, **kwargs)
|
||||
|
||||
async def job(self, event, task_id):
|
||||
maskname = event.maskname.split("|", 1)[0]
|
||||
cmd = self._cmd
|
||||
cmd = cmd.replace("{maskname}", shell_quote(maskname))
|
||||
cmd = cmd.replace("{pathname}", shell_quote(event.pathname))
|
||||
if hasattr(event, "src_pathname"):
|
||||
cmd = cmd.replace(
|
||||
"{src_pathname}", shell_quote(event.src_pathname))
|
||||
|
||||
self._log.info(f"{task_id}: execute shell command: {cmd}")
|
||||
proc = await asyncio.create_subprocess_shell(cmd)
|
||||
await proc.communicate()
|
||||
|
||||
|
||||
class FileManager:
|
||||
def __init__(self, rules, auto_create=False, rec=False,
|
||||
logname="FileManager"):
|
||||
self._rules = []
|
||||
if not isinstance(rules, list):
|
||||
rules = [rules]
|
||||
|
||||
for rule in rules:
|
||||
if rule["action"] in ["copy", "move"]:
|
||||
self._rules.append((rule["action"],
|
||||
re.compile(rule["src_re"]),
|
||||
rule["dst_re"]))
|
||||
elif rule["action"] == "delete":
|
||||
self._rules.append(
|
||||
(rule["action"], re.compile(rule["src_re"])))
|
||||
else:
|
||||
raise ValueError(f"invalid action type: {rule['action']}")
|
||||
|
||||
self._auto_create = auto_create
|
||||
self._rec = rec
|
||||
self._log = logging.getLogger(logname)
|
||||
|
||||
async def job(self, event, task_id):
|
||||
path = event.pathname
|
||||
match = None
|
||||
for rule in self._rules:
|
||||
src_re = rule[1]
|
||||
match = src_re.match(path)
|
||||
if match:
|
||||
break
|
||||
|
||||
if match is not None:
|
||||
action = rule[0]
|
||||
try:
|
||||
if action in ["copy", "move"]:
|
||||
dst_re = rule[2]
|
||||
dest = src_re.sub(dst_re, path)
|
||||
dest_dir = os.path.dirname(dest)
|
||||
if not os.path.isdir(dest_dir) and self._auto_create:
|
||||
self._log.info(
|
||||
f"{task_id}: create directory '{dest_dir}'")
|
||||
os.makedirs(dest_dir)
|
||||
elif os.path.exists(dest):
|
||||
raise RuntimeError(
|
||||
f"unable to move file from '{path} to '{dest}', "
|
||||
f"destination path exists already")
|
||||
|
||||
self._log.info(
|
||||
f"{task_id}: {action} '{path}' to '{dest}'")
|
||||
if action == "copy":
|
||||
if os.path.isdir(path):
|
||||
shutil.copytree(path, dest)
|
||||
else:
|
||||
shutil.copy2(path, dest)
|
||||
else:
|
||||
os.rename(path, dest)
|
||||
|
||||
elif action == "delete":
|
||||
self._log.info(
|
||||
f"{task_id}: {action} '{path}'")
|
||||
if os.path.isdir(path):
|
||||
if self._rec:
|
||||
shutil.rmtree(path)
|
||||
else:
|
||||
shutil.rmdir(path)
|
||||
else:
|
||||
os.remove(path)
|
||||
|
||||
except RuntimeError as e:
|
||||
self._log.error(f"{task_id}: {e}")
|
||||
except Exception as e:
|
||||
self._log.exception(f"{task_id}: {e}")
|
||||
|
||||
else:
|
||||
self._log.warning(f"{task_id}: no rule matches path '{path}'")
|
||||
|
||||
|
||||
class ExecList:
|
||||
def __init__(self):
|
||||
self._list = []
|
||||
|
||||
def add(self, func):
|
||||
self._list.append(func)
|
||||
|
||||
def remove(self, func):
|
||||
self._list.remove(func)
|
||||
|
||||
def run(self, event):
|
||||
for func in self._list:
|
||||
func(event)
|
||||
|
||||
|
||||
def add_mask(new_mask, current_mask=False):
|
||||
if not current_mask:
|
||||
return new_mask
|
||||
else:
|
||||
return current_mask | new_mask
|
||||
|
||||
|
||||
async def shutdown(timeout=30):
|
||||
pending = [ t for t in asyncio.all_tasks() if t is not asyncio.current_task() ]
|
||||
if len(pending) > 0:
|
||||
logging.info(
|
||||
f"graceful shutdown, waiting {timeout}s "
|
||||
f"for remaining tasks to complete")
|
||||
try:
|
||||
future = asyncio.gather(*pending)
|
||||
await asyncio.wait_for(future, timeout)
|
||||
except asyncio.TimeoutError:
|
||||
logging.warning(
|
||||
"forcefully terminate remaining tasks")
|
||||
future.cancel()
|
||||
future.exception()
|
||||
|
||||
logging.info("shutdown")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="pyinotifyd",
|
||||
formatter_class=lambda prog: argparse.HelpFormatter(
|
||||
prog, max_help_position=45, width=140))
|
||||
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--config",
|
||||
help="path to config file (defaults to /etc/pyinotifyd/config.py)",
|
||||
default="/etc/pyinotifyd/config.py")
|
||||
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--debug",
|
||||
help="Log debugging messages.",
|
||||
action="store_true")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
with open(args.config, "r") as c:
|
||||
exec(c.read(), globals())
|
||||
|
||||
console = logging.StreamHandler()
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||||
console.setFormatter(formatter)
|
||||
|
||||
if args.debug:
|
||||
loglevel = logging.DEBUG
|
||||
elif "LOGLEVEL" in globals():
|
||||
loglevel = LOGLEVEL
|
||||
else:
|
||||
loglevel = logging.INFO
|
||||
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(loglevel)
|
||||
root_logger.addHandler(console)
|
||||
|
||||
watchable_flags = pyinotify.EventsCodes.OP_FLAGS
|
||||
watchable_flags.update(pyinotify.EventsCodes.EVENT_FLAGS)
|
||||
|
||||
wm = pyinotify.WatchManager()
|
||||
loop = asyncio.get_event_loop()
|
||||
notifiers = []
|
||||
for watch in WATCHES:
|
||||
mask = False
|
||||
handler = pyinotify.ProcessEvent()
|
||||
for flag, values in watch["event_map"].items():
|
||||
if flag not in watchable_flags or values is None:
|
||||
continue
|
||||
|
||||
if not isinstance(values, list):
|
||||
values = [values]
|
||||
|
||||
mask = add_mask(pyinotify.EventsCodes.ALL_FLAGS[flag], mask)
|
||||
exec_list = ExecList()
|
||||
for value in values:
|
||||
assert callable(value), \
|
||||
f"event_map['{flag}']: expected callable, " \
|
||||
f"got {type(value)}"
|
||||
exec_list.add(value)
|
||||
|
||||
setattr(handler, f"process_{flag}", exec_list.run)
|
||||
|
||||
wm.add_watch(
|
||||
watch["path"], mask, rec=watch["rec"], auto_add=watch["auto_add"],
|
||||
do_glob=True)
|
||||
notifiers.append(pyinotify.AsyncioNotifier(
|
||||
wm, loop, default_proc_fun=handler))
|
||||
|
||||
try:
|
||||
loop.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
for notifier in notifiers:
|
||||
notifier.stop()
|
||||
|
||||
loop.run_until_complete(shutdown())
|
||||
loop.close()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user