"""HTTP(S) server for DDNS updates.""" from __future__ import annotations import base64 import ipaddress import json import logging import signal import ssl from datetime import datetime from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.parse import parse_qs, urlparse import argon2 from .cleanup import ExpiredRecordsCleanupThread, RateLimitCleanupThread from .dns import detect_ip_type from .logging import clear_txn_id, set_txn_id from .models import DoesNotExist, get_hostname_for_user, get_user from .validation import encode_hostname, ValidationError def extract_param(params, aliases): """Extract first matching param from query params.""" for alias in aliases: val = params.get(alias, [None])[0] if val is not None: return val return None class ProxyHeaderError(Exception): """Raised when expected proxy header is missing.""" pass def _parse_trusted_proxies(proxies): """Parse list of IPs/networks into ip_network objects.""" networks = [] for proxy in proxies: try: if "/" not in proxy: addr = ipaddress.ip_address(proxy) if addr.version == 4: proxy = proxy + "/32" else: proxy = proxy + "/128" networks.append(ipaddress.ip_network(proxy, strict=False)) except ValueError: logging.warning(f"Invalid trusted proxy: {proxy}") return networks def _is_trusted_proxy(client_ip, trusted_networks): """Check if client IP is in trusted proxy networks.""" try: addr = ipaddress.ip_address(client_ip) for network in trusted_networks: if addr in network: return True except ValueError: pass return False class DDNSServer(ThreadingHTTPServer): """HTTP server with Application instance.""" def __init__(self, app, address): """ Initialize server with application. Args: app: Application instance. address: (host, port) tuple. """ self.app = app self.proxy_header = app.config["daemon"].get("proxy_header", "") self.trusted_networks = _parse_trusted_proxies( app.config["daemon"].get("trusted_proxies", []) ) super().__init__(address, DDNSRequestHandler) class DDNSRequestHandler(BaseHTTPRequestHandler): """HTTP request handler for DDNS updates.""" @property def app(self): """Get application instance from server.""" return self.server.app def log_message(self, format_str, *args): """Override to use our logger.""" msg = f"{self.address_string()} - {format_str % args}" if self.app.config["daemon"]["log_requests"]: logging.info(msg) else: logging.debug(msg) def send_response_body(self, code, body, content_type="text/plain"): """Send response with body.""" self.send_response(code) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body.encode()) def wants_json(self): """Check if client wants JSON response.""" accept = self.headers.get("Accept", "") return "application/json" in accept def respond(self, code, status, **kwargs): """Send response in appropriate format.""" if self.wants_json(): data = {"status": status, **kwargs} self.send_response_body(code, json.dumps(data), "application/json") else: # DynDNS-compatible plain text parts = [status] if "ipv4" in kwargs and kwargs["ipv4"]: parts.append(kwargs["ipv4"]) if "ipv6" in kwargs and kwargs["ipv6"]: parts.append(kwargs["ipv6"]) self.send_response_body(code, " ".join(parts)) def get_client_ip(self): """Get client IP, considering configured proxy header if trusted.""" direct_ip = self.client_address[0] proxy_header = self.server.proxy_header if not proxy_header: return direct_ip if not _is_trusted_proxy(direct_ip, self.server.trusted_networks): return direct_ip forwarded = self.headers.get(proxy_header) if not forwarded: raise ProxyHeaderError( f"Missing {proxy_header} header from trusted proxy" ) return forwarded.split(",")[0].strip() def parse_basic_auth(self): """Parse Basic Auth header.""" auth = self.headers.get("Authorization", "") if not auth.startswith("Basic "): return None, None try: decoded = base64.b64decode(auth[6:]).decode("utf-8") if ":" in decoded: username, password = decoded.split(":", 1) return username, password except Exception: pass return None, None def do_GET(self): """Handle GET requests.""" set_txn_id() try: self._handle_get_request() finally: clear_txn_id() def _handle_get_request(self): """Handle GET request logic.""" try: client_ip = self.get_client_ip() except ProxyHeaderError as e: logging.error(f"Proxy header error: {e}") self.send_response_body(400, "Bad Request") return # Bad rate limit check if self.app.rate_limiter: blocked, retry = self.app.rate_limiter.is_blocked_bad(client_ip) if blocked: logging.warning( f"Rate limited (bad requests): client={client_ip}, " f"retry_after={retry}") self.respond(429, "abuse") return # Parse URL parsed = urlparse(self.path) # Find matching endpoint endpoint = self.app.config["_endpoint_map"].get(parsed.path) if endpoint is None: self.send_response_body(404, "Not Found") return # Parse query parameters params = parse_qs(parsed.query) # Get credentials username, password = self.parse_basic_auth() if username is None: username = extract_param(params, endpoint["params"]["username"]) password = extract_param(params, endpoint["params"]["password"]) if not username or not password: logging.warning(f"Auth failed: client={client_ip} user=anonymous") self._handle_bad_request(client_ip, 401, "badauth") return # Validate credentials try: user = get_user(username) self.app.password_hasher.verify(user.password_hash, password) except (DoesNotExist, argon2.exceptions.VerifyMismatchError): logging.warning(f"Auth failed: client={client_ip} user={username}") self._handle_bad_request(client_ip, 401, "badauth") return # Get hostname parameter hostname_param = extract_param(params, endpoint["params"]["hostname"]) if not hostname_param: logging.warning(f"Missing hostname: client={client_ip} user={username}") self._handle_bad_request(client_ip, 400, "nohost") return # Validate and encode hostname try: hostname_param = encode_hostname(hostname_param) except ValidationError: logging.warning( f"Invalid hostname: client={client_ip}, " f"hostname={hostname_param}") self._handle_bad_request(client_ip, 400, "nohost") return # Check hostname ownership try: hostname = get_hostname_for_user(hostname_param, user) except DoesNotExist: logging.warning( f"Access denied: client={client_ip} user={username} " f"hostname={hostname_param}" ) self._handle_bad_request(client_ip, 403, "nohost") return # Good rate limit check if self.app.rate_limiter: blocked, retry = self.app.rate_limiter.is_blocked_good(client_ip) if blocked: logging.warning(f"Rate limited: client={client_ip}, retry_after={retry}") self.respond(429, "abuse", retry_after=retry) return # Record good request if self.app.rate_limiter: self.app.rate_limiter.record_good(client_ip) # Determine IPs to update result = self._process_ip_update(hostname, params, endpoint, client_ip) if result: code, status, *kwargs = result if kwargs: self.respond(code, status, **kwargs[0]) else: self.respond(code, status) def _handle_bad_request(self, client_ip, code, status): """Handle bad request and record in rate limiter.""" if self.app.rate_limiter: self.app.rate_limiter.record_bad(client_ip) self.respond(code, status) def _process_ip_update(self, hostname, params, endpoint, client_ip): """Process IP update for hostname.""" myip = extract_param(params, endpoint["params"]["ipv4"]) myip6 = extract_param(params, endpoint["params"]["ipv6"]) ipv4 = None ipv6 = None # Process myip parameter if myip: try: rtype, myip = detect_ip_type(myip) if rtype == "A": ipv4 = myip else: ipv6 = myip except ValueError: return (400, "badip") # Process myip6 parameter if myip6: try: rtype, myip6 = detect_ip_type(myip6) if rtype == "AAAA": ipv6 = myip6 else: return (400, "badip") except ValueError: return (400, "badip") # Auto-detect from client IP if no params if ipv4 is None and ipv6 is None: try: rtype, ip = detect_ip_type(client_ip) if rtype == "A": ipv4 = ip else: ipv6 = ip except ValueError: return (400, "badip") now = datetime.now() changed = False if ipv4: hostname.last_ipv4_update = now if ipv4 != hostname.last_ipv4: # Update DNS IPv4 record try: self.app.dns_service.update_record( hostname.hostname, hostname.zone, ipv4, hostname.dns_ttl ) hostname.last_ipv4 = ipv4 changed = True except Exception as e: hostname.save() logging.error( f"DNS update failed: client={client_ip} hostname={hostname.hostname} " f"zone={hostname.zone} ipv4={ipv4} error={e}" ) return (500, "dnserr") if ipv6: hostname.last_ipv6_update = now if ipv6 != hostname.last_ipv6: # Update DNS IPv6 record try: self.app.dns_service.update_record( hostname.hostname, hostname.zone, ipv6, hostname.dns_ttl ) hostname.last_ipv6 = ipv6 changed = True except Exception as e: hostname.save() logging.error( f"DNS update failed: client={client_ip} hostname={hostname.hostname} " f"zone={hostname.zone} ipv6={ipv6} error={e}" ) return (500, "dnserr") # Update database hostname.save() changed_addrs = "" if ipv4: changed_addrs += f"ipv4={ipv4}" if ipv6: changed_addrs += f" ipv6={ipv6}" if not changed: logging.info( f"No change: client={client_ip} hostname={hostname.hostname} " f"zone={hostname.zone} {changed_addrs}" ) return ( 200, "nochg", {"ipv4": hostname.last_ipv4, "ipv6": hostname.last_ipv6} ) logging.info( f"Updated: client={client_ip} hostname={hostname.hostname} " f"zone={hostname.zone} {changed_addrs}" ) return ( 200, "good", {"ipv4": hostname.last_ipv4, "ipv6": hostname.last_ipv6} ) def run_daemon(app): """ Run the DDNS daemon. Args: app: Application instance with initialized services. """ config = app.config # Setup server host = config["daemon"]["host"] port = config["daemon"]["port"] server = DDNSServer(app, (host, port)) # Setup TLS if enabled if config["daemon"]["ssl"]: context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.load_cert_chain( config["daemon"]["ssl_cert_file"], config["daemon"]["ssl_key_file"] ) server.socket = context.wrap_socket(server.socket, server_side=True) proto = "https" else: proto = "http" # Start cleanup threads expired_cleanup_thread = ExpiredRecordsCleanupThread(app) expired_cleanup_thread.start() ratelimit_cleanup_thread = RateLimitCleanupThread(app) ratelimit_cleanup_thread.start() # Setup signal handlers def signal_handler(signum, frame): logging.info(f"Signal received: {signum}, shutting down") app.signal_shutdown() signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) paths = ", ".join(ep["path"] for ep in config["endpoints"]) logging.info(f"Daemon started: {proto}://{host}:{port} endpoints=[{paths}]") # Run server server.timeout = 1.0 while not app.is_shutting_down(): server.handle_request() # Cleanup expired_cleanup_thread.stop() ratelimit_cleanup_thread.stop() expired_cleanup_thread.join(timeout=5) ratelimit_cleanup_thread.join(timeout=5) server.server_close() logging.info("Daemon stopped")