"""HTTP(S) server for DDNS updates.""" from __future__ import annotations import base64 import ipaddress import json import logging import signal import ssl import threading from . import ( now_utc, datetime_str, STATUS_GOOD, STATUS_NOCHG, STATUS_BADAUTH, STATUS_NOHOST, STATUS_DNSERR, STATUS_ABUSE, STATUS_BADIP, ) from .cleanup import ExpiredRecordsCleanupThread, RateLimitCleanupThread from .dns import detect_ip_type from .logging import clear_txn_id, set_txn_id from .models import ( DatabaseError, DoesNotExist, EncodingError, get_hostname_for_user, get_user ) from argon2.exceptions import VerifyMismatchError from concurrent.futures import ThreadPoolExecutor from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.parse import parse_qs, urlparse def extract_param(params, aliases): """Extract first matching param from query params.""" for alias in aliases: val = params.get(alias, [None])[0] if val is not None: return val return None class ProxyHeaderError(Exception): """Raised when expected proxy header is missing.""" pass def _parse_trusted_proxies(proxies): """Parse list of IPs/networks into ip_network objects.""" networks = [] for proxy in proxies: try: if "/" not in proxy: addr = ipaddress.ip_address(proxy) if addr.version == 4: proxy = proxy + "/32" else: proxy = proxy + "/128" networks.append(ipaddress.ip_network(proxy, strict=False)) except ValueError: logging.warning(f"Invalid trusted proxy: {proxy}") return networks def _is_trusted_proxy(client_ip, trusted_networks): """Check if client IP is in trusted proxy networks.""" try: addr = ipaddress.ip_address(client_ip) for network in trusted_networks: if addr in network: return True except ValueError: pass return False class DDNSServer(ThreadingHTTPServer): """HTTP server with Application instance and thread pool.""" def __init__(self, app, address): """ Initialize server with application. Args: app: Application instance. address: (host, port) tuple. """ self.app = app self.proxy_header = app.config["daemon"].get("proxy_header", "") self.trusted_networks = _parse_trusted_proxies( app.config["daemon"].get("trusted_proxies", []) ) self.pool_size = app.config["daemon"]["thread_pool_size"] self.request_timeout = app.config["daemon"]["request_timeout"] self.executor = ThreadPoolExecutor(max_workers=self.pool_size) self.active_requests = 0 self.requests_lock = threading.Lock() self.requests_done = threading.Condition(self.requests_lock) super().__init__(address, DDNSRequestHandler) def process_request(self, request, client_address): """Submit request to thread pool.""" with self.requests_lock: self.active_requests += 1 request.settimeout(self.request_timeout) self.executor.submit(self._handle_request_wrapper, request, client_address) def _handle_request_wrapper(self, request, client_address): """Wrap request handling to track active requests.""" try: self.process_request_thread(request, client_address) finally: with self.requests_lock: self.active_requests -= 1 if self.active_requests == 0: self.requests_done.notify_all() def wait_for_requests(self, timeout=5): """Wait for active requests to complete.""" with self.requests_lock: if self.active_requests > 0: logging.info(f"Waiting for {self.active_requests} active request(s)") self.requests_done.wait(timeout=timeout) if self.active_requests > 0: logging.warning( f"Shutdown timeout, {self.active_requests} request(s) still active" ) def server_close(self): """Shutdown thread pool and close server.""" self.executor.shutdown(wait=True) super().server_close() class DDNSError(Exception): def __init__(self, message, status, **kwargs): super().__init__(self, message) self.message = message self.status = status self.kwargs = kwargs def __str__(self): if not self.kwargs: return self.message string = f"{self.message}:" for key, value in self.kwargs.items(): string += f" {key}={value}" return string class DDNSClientError(DDNSError): def __init__(self, message, code, status, **kwargs): super().__init__(message, status, **kwargs) self.code = code class DDNSRequestHandler(BaseHTTPRequestHandler): """HTTP request handler for DDNS updates.""" @property def app(self): """Get application instance from server.""" return self.server.app def log_message(self, format_str, *args): """Override to use our logger.""" msg = f"{self.address_string()} - {format_str % args}" if self.app.config["daemon"]["log_requests"]: logging.info(msg) else: logging.debug(msg) def send_response_body(self, code, body, content_type="text/plain"): """Send response with body.""" self.send_response(code) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body.encode()) def wants_json(self): """Check if client wants JSON response.""" accept = self.headers.get("Accept", "") return "application/json" in accept def respond(self, code, status, **kwargs): """Send response in appropriate format.""" if self.wants_json(): data = {"status": status, **kwargs} self.send_response_body(code, json.dumps(data), "application/json") else: # DynDNS-compatible plain text parts = [status] if "ipv4" in kwargs and kwargs["ipv4"]: parts.append(kwargs["ipv4"]) if "ipv6" in kwargs and kwargs["ipv6"]: parts.append(kwargs["ipv6"]) self.send_response_body(code, " ".join(parts)) def get_client_ip(self): """Get client IP, considering configured proxy header if trusted.""" direct_ip = self.client_address[0] proxy_header = self.server.proxy_header if not proxy_header: return direct_ip if not _is_trusted_proxy(direct_ip, self.server.trusted_networks): return direct_ip forwarded = self.headers.get(proxy_header) if not forwarded: raise ProxyHeaderError( f"Missing {proxy_header} header from trusted proxy" ) return forwarded.split(",")[0].strip() def parse_basic_auth(self): """Parse Basic Auth header.""" auth = self.headers.get("Authorization", "") if not auth.startswith("Basic "): return None, None try: decoded = base64.b64decode(auth[6:]).decode("utf-8") if ":" in decoded: username, password = decoded.split(":", 1) return username, password except Exception: pass return None, None def do_GET(self): """Handle GET requests.""" set_txn_id() try: client_ip = self.get_client_ip() except ProxyHeaderError as e: logging.error(f"Proxy header error: {e}") self.respond(400, "Bad Request") return try: self._handle_get_request(client_ip) except DDNSClientError as e: if self.app.bad_limiter: self.app.bad_limiter.record(client_ip) logging.warning(e) self.respond(e.code, e.status) except DDNSError as e: logging.error(e) self.respond(500, e.status) except DatabaseError as e: logging.error(f"Database error: {e}") self.respond(500, "Internal Server Error") except Exception as e: logging.exception(f"Uncaught exception: {e}") self.respond(500, "Internal Server Error") finally: clear_txn_id() def _handle_get_request(self, client_ip): """Handle GET request logic.""" # Parse URL parsed = urlparse(self.path) # Find matching endpoint endpoint = self.app.config["_endpoint_map"].get(parsed.path) if endpoint is None: self.respond(404, "Not Found") return # Bad rate limit check if self.app.bad_limiter: blocked, retry_at = self.app.bad_limiter.is_blocked(client_ip) if blocked: raise DDNSClientError( "Rate limited (bad requests)", 429, STATUS_ABUSE, client=client_ip, retry_at=datetime_str(retry_at) ) # Parse query parameters params = parse_qs(parsed.query) # Get credentials username, password = self.parse_basic_auth() if username is None: username = extract_param(params, endpoint["params"]["username"]) password = extract_param(params, endpoint["params"]["password"]) if not username or not password: raise DDNSClientError( "Auth failed", 401, STATUS_BADAUTH, client_ip ) # Get hostname parameter hostname_param = extract_param(params, endpoint["params"]["hostname"]) if not hostname_param: raise DDNSClientError( "Missing hostname", 400, STATUS_NOHOST, client=client_ip, username=username ) # Validate credentials user = self._authenticate(client_ip, username, password) # Check hostname ownership hostname = self._check_permissions(client_ip, user, hostname_param) # Process myip parameter ipv4 = None myip = extract_param(params, endpoint["params"]["ipv4"]) if myip: try: rtype, myip = detect_ip_type(myip) if rtype == "A": ipv4 = myip else: ipv6 = myip except ValueError: raise DDNSClientError( "Bad IP address", 400, STATUS_BADIP, client=client_ip, username=username, hostname=hostname.hostname, zone=hostname.zone, ip=myip ) # Process myip6 parameter ipv6 = None myip6 = extract_param(params, endpoint["params"]["ipv6"]) if myip6: try: rtype, myip6 = detect_ip_type(myip6) if rtype == "AAAA": ipv6 = myip6 else: raise ValueError except ValueError: raise DDNSClientError( "Bad IP address", 400, STATUS_BADIP, client=client_ip, username=username, hostname=hostname.hostname, zone=hostname.zone, ipv6=myip6 ) # Auto-detect from client IP if no params if ipv4 is None and ipv6 is None: rtype, ip = detect_ip_type(client_ip) if rtype == "A": ipv4 = ip else: ipv6 = ip # Process notify_change parameter notify_change = extract_param(params, endpoint["params"]["notify_change"]) notify_change = notify_change.lower() in ["1", "y", "yes", "on", "true"] \ if notify_change else False # Good rate limit check if self.app.good_limiter: blocked, retry_at = self.app.good_limiter.is_blocked(client_ip) if blocked: raise DDNSClientError( "Rate limited (good requests)", 429, STATUS_ABUSE, client=client_ip, username=username, retry_at=datetime_str(retry_at) ) # Record good request if self.app.good_limiter: self.app.good_limiter.record(client_ip) # Process update request self._process_ip_update( client_ip, user, hostname, ipv4, ipv6, notify_change ) def _authenticate(self, client_ip, username, password): try: try: user = get_user(username) except DoesNotExist: # User does not exist, Hash fake password to prevent time-based attacks self.app.password_hasher.hash("FAKE-PASSWORD") raise DoesNotExist self.app.password_hasher.verify(user.password_hash, password) except (DoesNotExist, VerifyMismatchError): raise DDNSClientError( "Auth failed", 401, STATUS_BADAUTH, client=client_ip, username=username ) return user def _check_permissions(self, client_ip, user, hostname_param): # Check hostname ownership code = None try: hostname = get_hostname_for_user(hostname_param, user) except DoesNotExist: code = 403 except EncodingError: code = 400 if code: raise DDNSClientError( "Access denied", code, STATUS_NOHOST, client=client_ip, username=user.username, hostname=hostname_param ) return hostname def _process_ip_update(self, client_ip, user, hostname, ipv4, ipv6, notify_change): """Process IP update for hostname.""" now = now_utc() ipv4_changed = False ipv6_changed = False if ipv4: hostname.last_ipv4_update = now if ipv4 != hostname.last_ipv4: # Update DNS IPv4 record try: self.app.dns_service.update_record( hostname.hostname, hostname.zone, ipv4, hostname.dns_ttl ) hostname.last_ipv4 = ipv4 ipv4_changed = True except Exception as e: hostname.save() logging.error(f"DNS error: {e}") raise DDNSError( "Update failed", STATUS_DNSERR, client=client_ip, hostname=hostname.hostname, zone=hostname.zone, ipv4=ipv4 ) if ipv6: hostname.last_ipv6_update = now if ipv6 != hostname.last_ipv6: # Update DNS IPv6 record try: self.app.dns_service.update_record( hostname.hostname, hostname.zone, ipv6, hostname.dns_ttl ) hostname.last_ipv6 = ipv6 ipv6_changed = True except Exception as e: hostname.save() logging.error(f"DNS error: {e}") raise DDNSError( "Update failed", STATUS_DNSERR, client=client_ip, hostname=hostname.hostname, zone=hostname.zone, ipv6=ipv6 ) # Update database hostname.save() changed_addrs = "" if ipv4_changed: changed_addrs += f" ipv4={ipv4}" if ipv6_changed: changed_addrs += f" ipv6={ipv6}" if not ipv4_changed and not ipv6_changed: logging.info( f"No change: client={client_ip} hostname={hostname.hostname} " f"zone={hostname.zone}{changed_addrs} notify_change={str(notify_change).lower()}" ) self.respond( 200, STATUS_NOCHG, ipv4=hostname.last_ipv4, ipv6=hostname.last_ipv6 ) return logging.info( f"Updated: client={client_ip} hostname={hostname.hostname} " f"zone={hostname.zone}{changed_addrs} notify_change={str(notify_change).lower()}" ) if notify_change: try: self.app.email_service.send_change_notification( hostname.user.email, hostname, ipv4_changed, ipv6_changed ) except Exception as e: logging.error(f"Sending change notification error: {e}") self.respond( 200, STATUS_GOOD, ipv4=hostname.last_ipv4, ipv6=hostname.last_ipv6 ) def run_daemon(app): """ Run the DDNS daemon. Args: app: Application instance with initialized services. """ config = app.config # Setup server host = config["daemon"]["host"] port = config["daemon"]["port"] server = DDNSServer(app, (host, port)) # Setup TLS if enabled if config["daemon"]["ssl"]: context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.load_cert_chain( config["daemon"]["ssl_cert_file"], config["daemon"]["ssl_key_file"] ) server.socket = context.wrap_socket(server.socket, server_side=True) proto = "https" else: proto = "http" # Start cleanup threads ratelimit_cleanup_thread = RateLimitCleanupThread(app) ratelimit_cleanup_thread.start() expired_cleanup_thread = ExpiredRecordsCleanupThread(app) expired_cleanup_thread.start() # Setup signal handlers def signal_handler(signum, frame): logging.info(f"Signal received: {signum}, shutting down") app.signal_shutdown() def sighup_handler(signum, frame): logging.info("SIGHUP received, reloading configuration") try: app.reload_config() # Update server attributes server.proxy_header = app.config["daemon"].get("proxy_header", "") server.trusted_networks = _parse_trusted_proxies( app.config["daemon"].get("trusted_proxies", []) ) # Reload SSL if enabled if app.config["daemon"]["ssl"]: new_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) new_context.load_cert_chain( app.config["daemon"]["ssl_cert_file"], app.config["daemon"]["ssl_key_file"] ) # Note: existing connections use old cert, new connections use new server.socket = new_context.wrap_socket( server.socket.detach(), server_side=True ) except Exception as e: logging.error(f"Config reload failed: {e}") signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGHUP, sighup_handler) paths = ", ".join(ep["path"] for ep in config["endpoints"]) logging.info(f"Daemon started: {proto}://{host}:{port} endpoints=[{paths}]") # Run server server.timeout = 1.0 while not app.is_shutting_down(): server.handle_request() # Graceful shutdown - wait for active requests server.wait_for_requests(5) # Cleanup expired_cleanup_thread.stop() ratelimit_cleanup_thread.stop() expired_cleanup_thread.join(timeout=5) ratelimit_cleanup_thread.join(timeout=5) server.server_close() logging.info("Daemon stopped")