fix shutdown and add reload
This commit is contained in:
@@ -28,6 +28,18 @@ from pyinotifyd._install import install, uninstall
|
|||||||
__version__ = "0.0.2"
|
__version__ = "0.0.2"
|
||||||
|
|
||||||
|
|
||||||
|
def get_pyinotifyd_from_config(name, config_file):
|
||||||
|
config = {}
|
||||||
|
exec(f"from {name}.scheduler import *", config)
|
||||||
|
with open(config_file, "r") as c:
|
||||||
|
exec(c.read(), globals(), config)
|
||||||
|
daemon = config[f"{name}"]
|
||||||
|
assert isinstance(daemon, Pyinotifyd), \
|
||||||
|
f"{name}: expected {type(Pyinotifyd)}, " \
|
||||||
|
f"got {type(daemon)}"
|
||||||
|
return daemon
|
||||||
|
|
||||||
|
|
||||||
class Pyinotifyd:
|
class Pyinotifyd:
|
||||||
def __init__(self, watches=[], shutdown_timeout=30, logname="daemon"):
|
def __init__(self, watches=[], shutdown_timeout=30, logname="daemon"):
|
||||||
self.set_watches(watches)
|
self.set_watches(watches)
|
||||||
@@ -37,7 +49,6 @@ class Pyinotifyd:
|
|||||||
self._loop = asyncio.get_event_loop()
|
self._loop = asyncio.get_event_loop()
|
||||||
self._notifiers = []
|
self._notifiers = []
|
||||||
self._wm = pyinotify.WatchManager()
|
self._wm = pyinotify.WatchManager()
|
||||||
self._shutdown = False
|
|
||||||
|
|
||||||
def set_watches(self, watches):
|
def set_watches(self, watches):
|
||||||
if not isinstance(watches, list):
|
if not isinstance(watches, list):
|
||||||
@@ -67,15 +78,31 @@ class Pyinotifyd:
|
|||||||
self._log.warning(
|
self._log.warning(
|
||||||
"no watches configured, the daemon will not do anything")
|
"no watches configured, the daemon will not do anything")
|
||||||
for watch in self._watches:
|
for watch in self._watches:
|
||||||
self._log.info(f"start watching '{watch.path}' for inotify events")
|
self._log.info(
|
||||||
|
f"start listening for inotify events on '{watch.path()}'")
|
||||||
self._notifiers.append(watch.event_notifier(self._wm, loop))
|
self._notifiers.append(watch.event_notifier(self._wm, loop))
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._log.info("stop watching for inotify events")
|
self._log.info("stop listening for inotify events")
|
||||||
for notifier in self._notifiers:
|
for notifier in self._notifiers:
|
||||||
notifier.stop()
|
notifier.stop()
|
||||||
|
|
||||||
self._notifiers = []
|
self._notifiers = []
|
||||||
|
return self._shutdown_timeout
|
||||||
|
|
||||||
|
|
||||||
|
class DaemonInstance:
|
||||||
|
def __init__(self, instance, logname="daemon"):
|
||||||
|
self._instance = instance
|
||||||
|
self._shutdown = False
|
||||||
|
self._log = logging.getLogger(logname)
|
||||||
|
self._timeout = None
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self._instance.start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._timeout = self._instance.stop()
|
||||||
|
|
||||||
def _get_pending_tasks(self):
|
def _get_pending_tasks(self):
|
||||||
return [t for t in asyncio.all_tasks()
|
return [t for t in asyncio.all_tasks()
|
||||||
@@ -92,29 +119,48 @@ class Pyinotifyd:
|
|||||||
self.stop()
|
self.stop()
|
||||||
pending = self._get_pending_tasks()
|
pending = self._get_pending_tasks()
|
||||||
if pending:
|
if pending:
|
||||||
if self._shutdown_timeout > 0:
|
tasks_done = False
|
||||||
|
if self._timeout:
|
||||||
self._log.info(
|
self._log.info(
|
||||||
f"waiting {self._shutdown_timeout}s "
|
f"wait {self._timeout} seconds for {len(pending)} "
|
||||||
f"for remaining tasks to complete")
|
f"remaining task(s) to complete")
|
||||||
try:
|
try:
|
||||||
future = asyncio.gather(*pending)
|
future = asyncio.gather(*pending)
|
||||||
await asyncio.wait_for(future, self._shutdown_timeout)
|
await asyncio.wait_for(future, self._timeout)
|
||||||
pending = None
|
tasks_done = True
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
pending = self._get_pending_tasks()
|
|
||||||
|
|
||||||
if pending:
|
|
||||||
self._log.warning("forcefully terminate remaining tasks")
|
|
||||||
future = asyncio.gather(*pending)
|
|
||||||
future.cancel()
|
future.cancel()
|
||||||
future.exception()
|
future.exception()
|
||||||
|
|
||||||
|
if not tasks_done:
|
||||||
|
self._log.warning(f"terminate remaining task(s)")
|
||||||
|
for task in pending:
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
try:
|
||||||
|
await asyncio.gather(*pending)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
asyncio.get_event_loop().stop()
|
asyncio.get_event_loop().stop()
|
||||||
self._shutdown = False
|
self._shutdown = False
|
||||||
self._log.info("shutdown complete")
|
self._log.info("shutdown complete")
|
||||||
|
|
||||||
async def _reload(self, signame):
|
async def reload(self, signame, name, config):
|
||||||
|
if self._shutdown:
|
||||||
|
self._log.info(
|
||||||
|
f"got signal {signame}, but shutdown already in progress")
|
||||||
|
return
|
||||||
|
|
||||||
self._log.info(f"got signal {signame}, reload config")
|
self._log.info(f"got signal {signame}, reload config")
|
||||||
|
try:
|
||||||
|
instance = get_pyinotifyd_from_config(name, config)
|
||||||
|
except Exception as e:
|
||||||
|
logging.exception(f"unable to reload config '{config}': {e}")
|
||||||
|
else:
|
||||||
|
self.stop()
|
||||||
|
self._instance = instance
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@@ -193,14 +239,8 @@ def main():
|
|||||||
sys.exit(uninstall(myname))
|
sys.exit(uninstall(myname))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
config = {}
|
pyinotifyd = get_pyinotifyd_from_config(myname, args.config)
|
||||||
exec(f"from {myname}.scheduler import *", config)
|
daemon = DaemonInstance(pyinotifyd)
|
||||||
with open(args.config, "r") as c:
|
|
||||||
exec(c.read(), globals(), config)
|
|
||||||
daemon = config[f"{myname}"]
|
|
||||||
assert isinstance(daemon, Pyinotifyd), \
|
|
||||||
f"{myname}: expected {type(Pyinotifyd)}, " \
|
|
||||||
f"got {type(daemon)}"
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.exception(f"config file '{args.config}': {e}")
|
logging.exception(f"config file '{args.config}': {e}")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
@@ -225,7 +265,7 @@ def main():
|
|||||||
loop.add_signal_handler(
|
loop.add_signal_handler(
|
||||||
getattr(signal, "SIGHUP"),
|
getattr(signal, "SIGHUP"),
|
||||||
lambda: asyncio.ensure_future(
|
lambda: asyncio.ensure_future(
|
||||||
daemon.reload(signame)))
|
daemon.reload(signame, myname, args.config)))
|
||||||
|
|
||||||
daemon.start()
|
daemon.start()
|
||||||
loop.run_forever()
|
loop.run_forever()
|
||||||
|
|||||||
@@ -37,7 +37,10 @@ class _Task:
|
|||||||
|
|
||||||
async def _start(self):
|
async def _start(self):
|
||||||
if self._delay > 0:
|
if self._delay > 0:
|
||||||
|
try:
|
||||||
await asyncio.sleep(self._delay)
|
await asyncio.sleep(self._delay)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
return
|
||||||
|
|
||||||
if self._callback is not None:
|
if self._callback is not None:
|
||||||
self._callback(self._event)
|
self._callback(self._event)
|
||||||
|
|||||||
@@ -79,12 +79,12 @@ class Watch:
|
|||||||
def __init__(self, path, event_map, rec=False, auto_add=False):
|
def __init__(self, path, event_map, rec=False, auto_add=False):
|
||||||
assert isinstance(path, str), \
|
assert isinstance(path, str), \
|
||||||
f"path: expected {type('')}, got {type(path)}"
|
f"path: expected {type('')}, got {type(path)}"
|
||||||
self.path = path
|
self._path = path
|
||||||
|
|
||||||
if isinstance(event_map, EventMap):
|
if isinstance(event_map, EventMap):
|
||||||
self.event_map = event_map
|
self._event_map = event_map
|
||||||
elif isinstance(event_map, dict):
|
elif isinstance(event_map, dict):
|
||||||
self.event_map = EventMap(event_map)
|
self._event_map = EventMap(event_map)
|
||||||
else:
|
else:
|
||||||
raise AssertionError(
|
raise AssertionError(
|
||||||
f"event_map: expected {type(EventMap)} or {type(dict)}, "
|
f"event_map: expected {type(EventMap)} or {type(dict)}, "
|
||||||
@@ -92,15 +92,21 @@ class Watch:
|
|||||||
|
|
||||||
assert isinstance(rec, bool), \
|
assert isinstance(rec, bool), \
|
||||||
f"rec: expected {type(bool)}, got {type(rec)}"
|
f"rec: expected {type(bool)}, got {type(rec)}"
|
||||||
self.rec = rec
|
self._rec = rec
|
||||||
|
|
||||||
assert isinstance(auto_add, bool), \
|
assert isinstance(auto_add, bool), \
|
||||||
f"auto_add: expected {type(bool)}, got {type(auto_add)}"
|
f"auto_add: expected {type(bool)}, got {type(auto_add)}"
|
||||||
self.auto_add = auto_add
|
self._auto_add = auto_add
|
||||||
|
|
||||||
|
def path(self):
|
||||||
|
return self._path
|
||||||
|
|
||||||
def event_notifier(self, wm, loop=asyncio.get_event_loop()):
|
def event_notifier(self, wm, loop=asyncio.get_event_loop()):
|
||||||
handler = pyinotify.ProcessEvent()
|
handler = pyinotify.ProcessEvent()
|
||||||
wm.add_watch(self.path, pyinotify.ALL_EVENTS, rec=self.rec,
|
for flag, values in self._event_map.items():
|
||||||
auto_add=self.auto_add, do_glob=True)
|
setattr(handler, f"process_{flag}", _TaskList(values).execute)
|
||||||
|
|
||||||
|
wm.add_watch(self._path, pyinotify.ALL_EVENTS, rec=self._rec,
|
||||||
|
auto_add=self._auto_add, do_glob=True)
|
||||||
|
|
||||||
return pyinotify.AsyncioNotifier(wm, loop, default_proc_fun=handler)
|
return pyinotify.AsyncioNotifier(wm, loop, default_proc_fun=handler)
|
||||||
|
|||||||
Reference in New Issue
Block a user