643 lines
20 KiB
Python
643 lines
20 KiB
Python
"""HTTP(S) server for DDNS updates."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import ipaddress
|
|
import json
|
|
import logging
|
|
import signal
|
|
import ssl
|
|
import threading
|
|
|
|
from . import (
|
|
now_utc,
|
|
datetime_str,
|
|
STATUS_GOOD,
|
|
STATUS_NOCHG,
|
|
STATUS_BADAUTH,
|
|
STATUS_NOHOST,
|
|
STATUS_DNSERR,
|
|
STATUS_ABUSE,
|
|
STATUS_BADIP,
|
|
)
|
|
from .cleanup import ExpiredRecordsCleanupThread, RateLimitCleanupThread
|
|
from .dns import detect_ip_type
|
|
from .logging import clear_txn_id, set_txn_id
|
|
from .models import (
|
|
DatabaseError,
|
|
DoesNotExist,
|
|
EncodingError,
|
|
get_hostname_for_user,
|
|
get_user
|
|
)
|
|
from argon2.exceptions import VerifyMismatchError
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
|
|
def extract_param(params, aliases):
|
|
"""Extract first matching param from query params."""
|
|
for alias in aliases:
|
|
val = params.get(alias, [None])[0]
|
|
if val is not None:
|
|
return val
|
|
return None
|
|
|
|
|
|
class ProxyHeaderError(Exception):
|
|
"""Raised when expected proxy header is missing."""
|
|
pass
|
|
|
|
|
|
def _parse_trusted_proxies(proxies):
|
|
"""Parse list of IPs/networks into ip_network objects."""
|
|
networks = []
|
|
for proxy in proxies:
|
|
try:
|
|
if "/" not in proxy:
|
|
addr = ipaddress.ip_address(proxy)
|
|
if addr.version == 4:
|
|
proxy = proxy + "/32"
|
|
else:
|
|
proxy = proxy + "/128"
|
|
networks.append(ipaddress.ip_network(proxy, strict=False))
|
|
except ValueError:
|
|
logging.warning(f"Invalid trusted proxy: {proxy}")
|
|
return networks
|
|
|
|
|
|
def _is_trusted_proxy(client_ip, trusted_networks):
|
|
"""Check if client IP is in trusted proxy networks."""
|
|
try:
|
|
addr = ipaddress.ip_address(client_ip)
|
|
for network in trusted_networks:
|
|
if addr in network:
|
|
return True
|
|
except ValueError:
|
|
pass
|
|
return False
|
|
|
|
|
|
class DDNSServer(ThreadingHTTPServer):
|
|
"""HTTP server with Application instance and thread pool."""
|
|
|
|
def __init__(self, app, address):
|
|
"""
|
|
Initialize server with application.
|
|
|
|
Args:
|
|
app: Application instance.
|
|
address: (host, port) tuple.
|
|
"""
|
|
self.app = app
|
|
self.proxy_header = app.config["daemon"].get("proxy_header", "")
|
|
self.trusted_networks = _parse_trusted_proxies(
|
|
app.config["daemon"].get("trusted_proxies", [])
|
|
)
|
|
self.pool_size = app.config["daemon"]["thread_pool_size"]
|
|
self.request_timeout = app.config["daemon"]["request_timeout"]
|
|
self.executor = ThreadPoolExecutor(max_workers=self.pool_size)
|
|
self.active_requests = 0
|
|
self.requests_lock = threading.Lock()
|
|
self.requests_done = threading.Condition(self.requests_lock)
|
|
super().__init__(address, DDNSRequestHandler)
|
|
|
|
def process_request(self, request, client_address):
|
|
"""Submit request to thread pool."""
|
|
with self.requests_lock:
|
|
self.active_requests += 1
|
|
request.settimeout(self.request_timeout)
|
|
self.executor.submit(self._handle_request_wrapper, request, client_address)
|
|
|
|
def _handle_request_wrapper(self, request, client_address):
|
|
"""Wrap request handling to track active requests."""
|
|
try:
|
|
self.process_request_thread(request, client_address)
|
|
finally:
|
|
with self.requests_lock:
|
|
self.active_requests -= 1
|
|
if self.active_requests == 0:
|
|
self.requests_done.notify_all()
|
|
|
|
def wait_for_requests(self, timeout=5):
|
|
"""Wait for active requests to complete."""
|
|
with self.requests_lock:
|
|
if self.active_requests > 0:
|
|
logging.info(f"Waiting for {self.active_requests} active request(s)")
|
|
self.requests_done.wait(timeout=timeout)
|
|
if self.active_requests > 0:
|
|
logging.warning(
|
|
f"Shutdown timeout, {self.active_requests} request(s) still active"
|
|
)
|
|
|
|
def server_close(self):
|
|
"""Shutdown thread pool and close server."""
|
|
self.executor.shutdown(wait=True)
|
|
super().server_close()
|
|
|
|
|
|
class DDNSError(Exception):
|
|
def __init__(self, message, status, **kwargs):
|
|
super().__init__(self, message)
|
|
self.message = message
|
|
self.status = status
|
|
self.kwargs = kwargs
|
|
|
|
def __str__(self):
|
|
if not self.kwargs:
|
|
return self.message
|
|
|
|
string = f"{self.message}:"
|
|
for key, value in self.kwargs.items():
|
|
string += f" {key}={value}"
|
|
|
|
return string
|
|
|
|
|
|
class DDNSClientError(DDNSError):
|
|
def __init__(self, message, code, status, **kwargs):
|
|
super().__init__(message, status, **kwargs)
|
|
self.code = code
|
|
|
|
|
|
class DDNSRequestHandler(BaseHTTPRequestHandler):
|
|
"""HTTP request handler for DDNS updates."""
|
|
|
|
@property
|
|
def app(self):
|
|
"""Get application instance from server."""
|
|
return self.server.app
|
|
|
|
def log_message(self, format_str, *args):
|
|
"""Override to use our logger."""
|
|
msg = f"{self.address_string()} - {format_str % args}"
|
|
if self.app.config["daemon"]["log_requests"]:
|
|
logging.info(msg)
|
|
else:
|
|
logging.debug(msg)
|
|
|
|
def send_response_body(self, code, body, content_type="text/plain"):
|
|
"""Send response with body."""
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", content_type)
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body.encode())
|
|
|
|
def wants_json(self):
|
|
"""Check if client wants JSON response."""
|
|
accept = self.headers.get("Accept", "")
|
|
return "application/json" in accept
|
|
|
|
def respond(self, code, status, **kwargs):
|
|
"""Send response in appropriate format."""
|
|
if self.wants_json():
|
|
data = {"status": status, **kwargs}
|
|
self.send_response_body(code, json.dumps(data), "application/json")
|
|
else:
|
|
# DynDNS-compatible plain text
|
|
parts = [status]
|
|
if "ipv4" in kwargs and kwargs["ipv4"]:
|
|
parts.append(kwargs["ipv4"])
|
|
if "ipv6" in kwargs and kwargs["ipv6"]:
|
|
parts.append(kwargs["ipv6"])
|
|
self.send_response_body(code, " ".join(parts))
|
|
|
|
def get_client_ip(self):
|
|
"""Get client IP, considering configured proxy header if trusted."""
|
|
direct_ip = self.client_address[0]
|
|
|
|
proxy_header = self.server.proxy_header
|
|
if not proxy_header:
|
|
return direct_ip
|
|
|
|
if not _is_trusted_proxy(direct_ip, self.server.trusted_networks):
|
|
return direct_ip
|
|
|
|
forwarded = self.headers.get(proxy_header)
|
|
if not forwarded:
|
|
raise ProxyHeaderError(
|
|
f"Missing {proxy_header} header from trusted proxy"
|
|
)
|
|
|
|
return forwarded.split(",")[0].strip()
|
|
|
|
def parse_basic_auth(self):
|
|
"""Parse Basic Auth header."""
|
|
auth = self.headers.get("Authorization", "")
|
|
if not auth.startswith("Basic "):
|
|
return None, None
|
|
try:
|
|
decoded = base64.b64decode(auth[6:]).decode("utf-8")
|
|
if ":" in decoded:
|
|
username, password = decoded.split(":", 1)
|
|
return username, password
|
|
except Exception:
|
|
pass
|
|
return None, None
|
|
|
|
def do_GET(self):
|
|
"""Handle GET requests."""
|
|
set_txn_id()
|
|
try:
|
|
client_ip = self.get_client_ip()
|
|
except ProxyHeaderError as e:
|
|
logging.error(f"Proxy header error: {e}")
|
|
self.respond(400, "Bad Request")
|
|
return
|
|
|
|
try:
|
|
self._handle_get_request(client_ip)
|
|
except DDNSClientError as e:
|
|
if self.app.bad_limiter:
|
|
self.app.bad_limiter.record(client_ip)
|
|
logging.warning(e)
|
|
self.respond(e.code, e.status)
|
|
except DDNSError as e:
|
|
logging.error(e)
|
|
self.respond(500, e.status)
|
|
except DatabaseError as e:
|
|
logging.error(f"Database error: {e}")
|
|
self.respond(500, "Internal Server Error")
|
|
except Exception as e:
|
|
logging.exception(f"Uncaught exception: {e}")
|
|
self.respond(500, "Internal Server Error")
|
|
finally:
|
|
clear_txn_id()
|
|
|
|
def _handle_get_request(self, client_ip):
|
|
"""Handle GET request logic."""
|
|
# Parse URL
|
|
parsed = urlparse(self.path)
|
|
|
|
# Find matching endpoint
|
|
endpoint = self.app.config["_endpoint_map"].get(parsed.path)
|
|
if endpoint is None:
|
|
self.respond(404, "Not Found")
|
|
return
|
|
|
|
# Bad rate limit check
|
|
if self.app.bad_limiter:
|
|
blocked, retry_at = self.app.bad_limiter.is_blocked(client_ip)
|
|
if blocked:
|
|
raise DDNSClientError(
|
|
"Rate limited (bad requests)",
|
|
429,
|
|
STATUS_ABUSE,
|
|
client=client_ip,
|
|
retry_at=datetime_str(retry_at)
|
|
)
|
|
|
|
# Parse query parameters
|
|
params = parse_qs(parsed.query)
|
|
|
|
# Get credentials
|
|
username, password = self.parse_basic_auth()
|
|
if username is None:
|
|
username = extract_param(params, endpoint["params"]["username"])
|
|
password = extract_param(params, endpoint["params"]["password"])
|
|
|
|
if not username or not password:
|
|
raise DDNSClientError(
|
|
"Auth failed",
|
|
401,
|
|
STATUS_BADAUTH,
|
|
client_ip
|
|
)
|
|
|
|
# Get hostname parameter
|
|
hostname_param = extract_param(params, endpoint["params"]["hostname"])
|
|
if not hostname_param:
|
|
raise DDNSClientError(
|
|
"Missing hostname",
|
|
400,
|
|
STATUS_NOHOST,
|
|
client=client_ip,
|
|
username=username
|
|
)
|
|
|
|
# Validate credentials
|
|
user = self._authenticate(client_ip, username, password)
|
|
|
|
# Check hostname ownership
|
|
hostname = self._check_permissions(client_ip, user, hostname_param)
|
|
|
|
# Process myip parameter
|
|
ipv4 = None
|
|
myip = extract_param(params, endpoint["params"]["ipv4"])
|
|
if myip:
|
|
try:
|
|
rtype, myip = detect_ip_type(myip)
|
|
if rtype == "A":
|
|
ipv4 = myip
|
|
else:
|
|
ipv6 = myip
|
|
except ValueError:
|
|
raise DDNSClientError(
|
|
"Bad IP address",
|
|
400,
|
|
STATUS_BADIP,
|
|
client=client_ip,
|
|
username=username,
|
|
hostname=hostname.hostname,
|
|
zone=hostname.zone,
|
|
ip=myip
|
|
)
|
|
|
|
# Process myip6 parameter
|
|
ipv6 = None
|
|
myip6 = extract_param(params, endpoint["params"]["ipv6"])
|
|
if myip6:
|
|
try:
|
|
rtype, myip6 = detect_ip_type(myip6)
|
|
if rtype == "AAAA":
|
|
ipv6 = myip6
|
|
else:
|
|
raise ValueError
|
|
except ValueError:
|
|
raise DDNSClientError(
|
|
"Bad IP address",
|
|
400,
|
|
STATUS_BADIP,
|
|
client=client_ip,
|
|
username=username,
|
|
hostname=hostname.hostname,
|
|
zone=hostname.zone,
|
|
ipv6=myip6
|
|
)
|
|
|
|
# Auto-detect from client IP if no params
|
|
if ipv4 is None and ipv6 is None:
|
|
rtype, ip = detect_ip_type(client_ip)
|
|
if rtype == "A":
|
|
ipv4 = ip
|
|
else:
|
|
ipv6 = ip
|
|
|
|
# Process notify_change parameter
|
|
notify_change = extract_param(params, endpoint["params"]["notify_change"])
|
|
notify_change = notify_change.lower() in ["1", "y", "yes", "on", "true"] \
|
|
if notify_change else False
|
|
|
|
# Good rate limit check
|
|
if self.app.good_limiter:
|
|
blocked, retry_at = self.app.good_limiter.is_blocked(client_ip)
|
|
if blocked:
|
|
raise DDNSClientError(
|
|
"Rate limited (good requests)",
|
|
429,
|
|
STATUS_ABUSE,
|
|
client=client_ip,
|
|
username=username,
|
|
retry_at=datetime_str(retry_at)
|
|
)
|
|
|
|
# Record good request
|
|
if self.app.good_limiter:
|
|
self.app.good_limiter.record(client_ip)
|
|
|
|
# Process update request
|
|
self._process_ip_update(
|
|
client_ip,
|
|
user,
|
|
hostname,
|
|
ipv4,
|
|
ipv6,
|
|
notify_change
|
|
)
|
|
|
|
def _authenticate(self, client_ip, username, password):
|
|
try:
|
|
try:
|
|
user = get_user(username)
|
|
except DoesNotExist:
|
|
# User does not exist, Hash fake password to prevent time-based attacks
|
|
self.app.password_hasher.hash("FAKE-PASSWORD")
|
|
raise DoesNotExist
|
|
|
|
self.app.password_hasher.verify(user.password_hash, password)
|
|
except (DoesNotExist, VerifyMismatchError):
|
|
raise DDNSClientError(
|
|
"Auth failed",
|
|
401,
|
|
STATUS_BADAUTH,
|
|
client=client_ip,
|
|
username=username
|
|
)
|
|
|
|
return user
|
|
|
|
def _check_permissions(self, client_ip, user, hostname_param):
|
|
# Check hostname ownership
|
|
code = None
|
|
|
|
try:
|
|
hostname = get_hostname_for_user(hostname_param, user)
|
|
except DoesNotExist:
|
|
code = 403
|
|
except EncodingError:
|
|
code = 400
|
|
|
|
if code:
|
|
raise DDNSClientError(
|
|
"Access denied",
|
|
code,
|
|
STATUS_NOHOST,
|
|
client=client_ip,
|
|
username=user.username,
|
|
hostname=hostname_param
|
|
)
|
|
|
|
return hostname
|
|
|
|
def _process_ip_update(self, client_ip, user, hostname, ipv4, ipv6, notify_change):
|
|
"""Process IP update for hostname."""
|
|
now = now_utc()
|
|
|
|
ipv4_changed = False
|
|
ipv6_changed = False
|
|
if ipv4:
|
|
hostname.last_ipv4_update = now
|
|
if ipv4 != hostname.last_ipv4:
|
|
# Update DNS IPv4 record
|
|
try:
|
|
self.app.dns_service.update_record(
|
|
hostname.hostname,
|
|
hostname.zone,
|
|
ipv4,
|
|
hostname.dns_ttl
|
|
)
|
|
hostname.last_ipv4 = ipv4
|
|
ipv4_changed = True
|
|
except Exception as e:
|
|
hostname.save()
|
|
logging.error(f"DNS error: {e}")
|
|
raise DDNSError(
|
|
"Update failed",
|
|
STATUS_DNSERR,
|
|
client=client_ip,
|
|
hostname=hostname.hostname,
|
|
zone=hostname.zone,
|
|
ipv4=ipv4
|
|
)
|
|
|
|
if ipv6:
|
|
hostname.last_ipv6_update = now
|
|
if ipv6 != hostname.last_ipv6:
|
|
# Update DNS IPv6 record
|
|
try:
|
|
self.app.dns_service.update_record(
|
|
hostname.hostname,
|
|
hostname.zone,
|
|
ipv6,
|
|
hostname.dns_ttl
|
|
)
|
|
hostname.last_ipv6 = ipv6
|
|
ipv6_changed = True
|
|
except Exception as e:
|
|
hostname.save()
|
|
logging.error(f"DNS error: {e}")
|
|
raise DDNSError(
|
|
"Update failed",
|
|
STATUS_DNSERR,
|
|
client=client_ip,
|
|
hostname=hostname.hostname,
|
|
zone=hostname.zone,
|
|
ipv6=ipv6
|
|
)
|
|
|
|
# Update database
|
|
hostname.save()
|
|
|
|
changed_addrs = ""
|
|
if ipv4_changed:
|
|
changed_addrs += f" ipv4={ipv4}"
|
|
if ipv6_changed:
|
|
changed_addrs += f" ipv6={ipv6}"
|
|
|
|
if not ipv4_changed and not ipv6_changed:
|
|
logging.info(
|
|
f"No change: client={client_ip} hostname={hostname.hostname} "
|
|
f"zone={hostname.zone}{changed_addrs} notify_change={str(notify_change).lower()}"
|
|
)
|
|
self.respond(
|
|
200,
|
|
STATUS_NOCHG,
|
|
ipv4=hostname.last_ipv4,
|
|
ipv6=hostname.last_ipv6
|
|
)
|
|
return
|
|
|
|
logging.info(
|
|
f"Updated: client={client_ip} hostname={hostname.hostname} "
|
|
f"zone={hostname.zone}{changed_addrs} notify_change={str(notify_change).lower()}"
|
|
)
|
|
|
|
if notify_change:
|
|
try:
|
|
self.app.email_service.send_change_notification(
|
|
hostname.user.email,
|
|
hostname,
|
|
ipv4_changed,
|
|
ipv6_changed
|
|
)
|
|
except Exception as e:
|
|
logging.error(f"Sending change notification error: {e}")
|
|
|
|
self.respond(
|
|
200,
|
|
STATUS_GOOD,
|
|
ipv4=hostname.last_ipv4,
|
|
ipv6=hostname.last_ipv6
|
|
)
|
|
|
|
|
|
def run_daemon(app):
|
|
"""
|
|
Run the DDNS daemon.
|
|
|
|
Args:
|
|
app: Application instance with initialized services.
|
|
"""
|
|
config = app.config
|
|
|
|
# Setup server
|
|
host = config["daemon"]["host"]
|
|
port = config["daemon"]["port"]
|
|
|
|
server = DDNSServer(app, (host, port))
|
|
|
|
# Setup TLS if enabled
|
|
if config["daemon"]["ssl"]:
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
context.load_cert_chain(
|
|
config["daemon"]["ssl_cert_file"],
|
|
config["daemon"]["ssl_key_file"]
|
|
)
|
|
server.socket = context.wrap_socket(server.socket, server_side=True)
|
|
proto = "https"
|
|
else:
|
|
proto = "http"
|
|
|
|
# Start cleanup threads
|
|
ratelimit_cleanup_thread = RateLimitCleanupThread(app)
|
|
ratelimit_cleanup_thread.start()
|
|
|
|
expired_cleanup_thread = ExpiredRecordsCleanupThread(app)
|
|
expired_cleanup_thread.start()
|
|
|
|
# Setup signal handlers
|
|
def signal_handler(signum, frame):
|
|
logging.info(f"Signal received: {signum}, shutting down")
|
|
app.signal_shutdown()
|
|
|
|
def sighup_handler(signum, frame):
|
|
logging.info("SIGHUP received, reloading configuration")
|
|
try:
|
|
app.reload_config()
|
|
|
|
# Update server attributes
|
|
server.proxy_header = app.config["daemon"].get("proxy_header", "")
|
|
server.trusted_networks = _parse_trusted_proxies(
|
|
app.config["daemon"].get("trusted_proxies", [])
|
|
)
|
|
|
|
# Reload SSL if enabled
|
|
if app.config["daemon"]["ssl"]:
|
|
new_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
new_context.load_cert_chain(
|
|
app.config["daemon"]["ssl_cert_file"],
|
|
app.config["daemon"]["ssl_key_file"]
|
|
)
|
|
# Note: existing connections use old cert, new connections use new
|
|
server.socket = new_context.wrap_socket(
|
|
server.socket.detach(), server_side=True
|
|
)
|
|
except Exception as e:
|
|
logging.error(f"Config reload failed: {e}")
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGHUP, sighup_handler)
|
|
|
|
paths = ", ".join(ep["path"] for ep in config["endpoints"])
|
|
logging.info(f"Daemon started: {proto}://{host}:{port} endpoints=[{paths}]")
|
|
|
|
# Run server
|
|
server.timeout = 1.0
|
|
while not app.is_shutting_down():
|
|
server.handle_request()
|
|
|
|
# Graceful shutdown - wait for active requests
|
|
server.wait_for_requests(5)
|
|
|
|
# Cleanup
|
|
expired_cleanup_thread.stop()
|
|
ratelimit_cleanup_thread.stop()
|
|
expired_cleanup_thread.join(timeout=5)
|
|
ratelimit_cleanup_thread.join(timeout=5)
|
|
server.server_close()
|
|
logging.info("Daemon stopped")
|