fix shutdown routine
This commit is contained in:
@@ -37,6 +37,7 @@ class Pyinotifyd:
|
|||||||
self._loop = asyncio.get_event_loop()
|
self._loop = asyncio.get_event_loop()
|
||||||
self._notifiers = []
|
self._notifiers = []
|
||||||
self._wm = pyinotify.WatchManager()
|
self._wm = pyinotify.WatchManager()
|
||||||
|
self._shutdown = False
|
||||||
|
|
||||||
def set_watches(self, watches):
|
def set_watches(self, watches):
|
||||||
if not isinstance(watches, list):
|
if not isinstance(watches, list):
|
||||||
@@ -63,7 +64,8 @@ class Pyinotifyd:
|
|||||||
|
|
||||||
self._log.info("starting")
|
self._log.info("starting")
|
||||||
if len(self._watches) == 0:
|
if len(self._watches) == 0:
|
||||||
self._log.warning("no watches configured, the daemon will not do anything")
|
self._log.warning(
|
||||||
|
"no watches configured, the daemon will not do anything")
|
||||||
for watch in self._watches:
|
for watch in self._watches:
|
||||||
self._log.info(f"start watching '{watch.path}' for inotify events")
|
self._log.info(f"start watching '{watch.path}' for inotify events")
|
||||||
self._notifiers.append(watch.event_notifier(self._wm, loop))
|
self._notifiers.append(watch.event_notifier(self._wm, loop))
|
||||||
@@ -74,27 +76,45 @@ class Pyinotifyd:
|
|||||||
notifier.stop()
|
notifier.stop()
|
||||||
|
|
||||||
self._notifiers = []
|
self._notifiers = []
|
||||||
return self._shutdown_timeout
|
|
||||||
|
|
||||||
|
def _get_pending_tasks(self):
|
||||||
|
return [t for t in asyncio.all_tasks()
|
||||||
|
if t is not asyncio.current_task()]
|
||||||
|
|
||||||
async def _shutdown(signame, daemon, log):
|
async def shutdown(self, signame):
|
||||||
log.info(f"got signal {signame}, graceful shutdown")
|
if self._shutdown:
|
||||||
timeout = daemon.stop()
|
self._log.info(
|
||||||
pending = [t for t in asyncio.all_tasks()
|
f"got signal {signame}, but shutdown already in progress")
|
||||||
if t is not asyncio.current_task()]
|
return
|
||||||
if len(pending) > 0:
|
|
||||||
log.info(
|
|
||||||
f"waiting {timeout}s for remaining tasks to complete")
|
|
||||||
try:
|
|
||||||
future = asyncio.gather(*pending)
|
|
||||||
await asyncio.wait_for(future, timeout)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
log.warning("forcefully terminate remaining tasks")
|
|
||||||
future.cancel()
|
|
||||||
future.exception()
|
|
||||||
|
|
||||||
log.info("shutdown complete")
|
self._shutdown = True
|
||||||
asyncio.get_event_loop().stop()
|
self._log.info(f"got signal {signame}, shutdown")
|
||||||
|
self.stop()
|
||||||
|
pending = self._get_pending_tasks()
|
||||||
|
if pending:
|
||||||
|
if self._shutdown_timeout > 0:
|
||||||
|
self._log.info(
|
||||||
|
f"waiting {self._shutdown_timeout}s "
|
||||||
|
f"for remaining tasks to complete")
|
||||||
|
try:
|
||||||
|
future = asyncio.gather(*pending)
|
||||||
|
await asyncio.wait_for(future, self._shutdown_timeout)
|
||||||
|
pending = None
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pending = self._get_pending_tasks()
|
||||||
|
|
||||||
|
if pending:
|
||||||
|
self._log.warning("forcefully terminate remaining tasks")
|
||||||
|
future = asyncio.gather(*pending)
|
||||||
|
future.cancel()
|
||||||
|
future.exception()
|
||||||
|
|
||||||
|
asyncio.get_event_loop().stop()
|
||||||
|
self._shutdown = False
|
||||||
|
self._log.info("shutdown complete")
|
||||||
|
|
||||||
|
async def _reload(self, signame):
|
||||||
|
self._log.info(f"got signal {signame}, reload config")
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@@ -145,7 +165,7 @@ def main():
|
|||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
if args.version:
|
if args.version:
|
||||||
print(f"{myname} ({version})")
|
print(f"{myname} ({__version__})")
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
if args.list:
|
if args.list:
|
||||||
@@ -196,13 +216,17 @@ def main():
|
|||||||
f"%(asctime)s - {myname}/%(name)s - %(levelname)s - %(message)s")
|
f"%(asctime)s - {myname}/%(name)s - %(levelname)s - %(message)s")
|
||||||
ch.setFormatter(formatter)
|
ch.setFormatter(formatter)
|
||||||
|
|
||||||
log = logging.getLogger(myname)
|
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
for signame in ["SIGINT", "SIGTERM"]:
|
for signame in ["SIGINT", "SIGTERM"]:
|
||||||
loop.add_signal_handler(
|
loop.add_signal_handler(
|
||||||
getattr(signal, signame),
|
getattr(signal, signame),
|
||||||
lambda: asyncio.ensure_future(
|
lambda: asyncio.ensure_future(
|
||||||
_shutdown(signame, daemon, log)))
|
daemon.shutdown(signame)))
|
||||||
|
|
||||||
|
loop.add_signal_handler(
|
||||||
|
getattr(signal, "SIGHUP"),
|
||||||
|
lambda: asyncio.ensure_future(
|
||||||
|
daemon.reload(signame)))
|
||||||
|
|
||||||
daemon.start()
|
daemon.start()
|
||||||
loop.run_forever()
|
loop.run_forever()
|
||||||
|
|||||||
@@ -100,8 +100,7 @@ class Watch:
|
|||||||
|
|
||||||
def event_notifier(self, wm, loop=asyncio.get_event_loop()):
|
def event_notifier(self, wm, loop=asyncio.get_event_loop()):
|
||||||
handler = pyinotify.ProcessEvent()
|
handler = pyinotify.ProcessEvent()
|
||||||
wm.add_watch(
|
wm.add_watch(self.path, pyinotify.ALL_EVENTS, rec=self.rec,
|
||||||
self.path, pyinotify.ALL_EVENTS, rec=self.rec, auto_add=self.auto_add,
|
auto_add=self.auto_add, do_glob=True)
|
||||||
do_glob=True)
|
|
||||||
|
|
||||||
return pyinotify.AsyncioNotifier(wm, loop, default_proc_fun=handler)
|
return pyinotify.AsyncioNotifier(wm, loop, default_proc_fun=handler)
|
||||||
|
|||||||
Reference in New Issue
Block a user