Shutdown daemon gracefully
This commit is contained in:
@@ -8,6 +8,7 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import signal
|
import signal
|
||||||
import ssl
|
import ssl
|
||||||
|
import threading
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||||
from urllib.parse import parse_qs, urlparse
|
from urllib.parse import parse_qs, urlparse
|
||||||
@@ -93,12 +94,36 @@ class DDNSServer(ThreadingHTTPServer):
|
|||||||
self.pool_size = app.config["daemon"]["thread_pool_size"]
|
self.pool_size = app.config["daemon"]["thread_pool_size"]
|
||||||
self.request_timeout = app.config["daemon"]["request_timeout"]
|
self.request_timeout = app.config["daemon"]["request_timeout"]
|
||||||
self.executor = ThreadPoolExecutor(max_workers=self.pool_size)
|
self.executor = ThreadPoolExecutor(max_workers=self.pool_size)
|
||||||
|
self.active_requests = 0
|
||||||
|
self.requests_lock = threading.Lock()
|
||||||
|
self.requests_done = threading.Condition(self.requests_lock)
|
||||||
super().__init__(address, DDNSRequestHandler)
|
super().__init__(address, DDNSRequestHandler)
|
||||||
|
|
||||||
def process_request(self, request, client_address):
|
def process_request(self, request, client_address):
|
||||||
"""Submit request to thread pool."""
|
"""Submit request to thread pool."""
|
||||||
|
with self.requests_lock:
|
||||||
|
self.active_requests += 1
|
||||||
request.settimeout(self.request_timeout)
|
request.settimeout(self.request_timeout)
|
||||||
self.executor.submit(self.process_request_thread, request, client_address)
|
self.executor.submit(self._handle_request_wrapper, request, client_address)
|
||||||
|
|
||||||
|
def _handle_request_wrapper(self, request, client_address):
|
||||||
|
"""Wrap request handling to track active requests."""
|
||||||
|
try:
|
||||||
|
self.process_request_thread(request, client_address)
|
||||||
|
finally:
|
||||||
|
with self.requests_lock:
|
||||||
|
self.active_requests -= 1
|
||||||
|
if self.active_requests == 0:
|
||||||
|
self.requests_done.notify_all()
|
||||||
|
|
||||||
|
def wait_for_requests(self, timeout=5):
|
||||||
|
"""Wait for active requests to complete."""
|
||||||
|
with self.requests_lock:
|
||||||
|
if self.active_requests > 0:
|
||||||
|
logging.info(f"Waiting for {self.active_requests} active request(s)")
|
||||||
|
self.requests_done.wait(timeout=timeout)
|
||||||
|
if self.active_requests > 0:
|
||||||
|
logging.warning(f"Shutdown timeout, {self.active_requests} request(s) still active")
|
||||||
|
|
||||||
def server_close(self):
|
def server_close(self):
|
||||||
"""Shutdown thread pool and close server."""
|
"""Shutdown thread pool and close server."""
|
||||||
@@ -503,6 +528,9 @@ def run_daemon(app):
|
|||||||
while not app.is_shutting_down():
|
while not app.is_shutting_down():
|
||||||
server.handle_request()
|
server.handle_request()
|
||||||
|
|
||||||
|
# Graceful shutdown - wait for active requests
|
||||||
|
server.wait_for_requests(5)
|
||||||
|
|
||||||
# Cleanup
|
# Cleanup
|
||||||
expired_cleanup_thread.stop()
|
expired_cleanup_thread.stop()
|
||||||
ratelimit_cleanup_thread.stop()
|
ratelimit_cleanup_thread.stop()
|
||||||
|
|||||||
Reference in New Issue
Block a user