From f297a8d740bf95a89afb58a7605985d7c0d75238 Mon Sep 17 00:00:00 2001 From: Thomas Oettli Date: Thu, 22 Jan 2026 00:36:48 +0100 Subject: [PATCH] Cleanup --- src/ddns_service/cleanup.py | 6 +- src/ddns_service/cli.py | 21 +++-- src/ddns_service/dns.py | 69 +++++++++++++++++ src/ddns_service/logging.py | 5 -- src/ddns_service/server.py | 7 +- src/ddns_service/validation.py | 138 --------------------------------- 6 files changed, 86 insertions(+), 160 deletions(-) delete mode 100644 src/ddns_service/validation.py diff --git a/src/ddns_service/cleanup.py b/src/ddns_service/cleanup.py index d268492..cf21205 100644 --- a/src/ddns_service/cleanup.py +++ b/src/ddns_service/cleanup.py @@ -44,13 +44,15 @@ def cleanup_expired(app): if app.dns_service: if ipv4_expired: logging.info( - f"Host expired: hostname={hostname.hostname} zone={hostname.zone} ip={hostname.last_ipv4}" + f"Host expired: hostname={hostname.hostname} zone={hostname.zone} " + f"ip={hostname.last_ipv4}" ) app.dns_service.delete_record(hostname.hostname, hostname.zone, "A") if ipv6_expired: logging.info( - f"Host expired: hostname={hostname.hostname} zone={hostname.zone} ip={hostname.last_ipv6}" + f"Host expired: hostname={hostname.hostname} zone={hostname.zone} " + f"ip={hostname.last_ipv6}" ) app.dns_service.delete_record(hostname.hostname, hostname.zone, "AAAA") diff --git a/src/ddns_service/cli.py b/src/ddns_service/cli.py index 05b1645..67112b1 100644 --- a/src/ddns_service/cli.py +++ b/src/ddns_service/cli.py @@ -5,14 +5,13 @@ import logging from .cleanup import cleanup_expired from .models import ( - create_tables, DoesNotExist, get_hostname, get_user, Hostname, User, ) -from .validation import encode_hostname, encode_zone, ValidationError +from .dns import encode_dnsname, EncodingError def cmd_user_list(args, app): @@ -170,9 +169,9 @@ def cmd_hostname_add(args, app): # Validate and encode hostname/zone try: - hostname_str = encode_hostname(args.hostname) - zone = encode_zone(args.zone) - except ValidationError as e: + hostname_str = encode_dnsname(args.hostname) + zone = encode_dnsname(args.zone) + except EncodingError as e: print(f"Error: {e}") return 1 @@ -214,9 +213,9 @@ def cmd_hostname_delete(args, app): """Delete a hostname.""" # Validate and encode hostname and zone try: - hostname_str = encode_hostname(args.hostname) - zone = encode_zone(args.zone) - except ValidationError as e: + hostname_str = encode_dnsname(args.hostname) + zone = encode_dnsname(args.zone) + except EncodingError as e: print(f"Error: {e}") return 1 @@ -260,9 +259,9 @@ def cmd_hostname_modify(args, app): """Modify hostname settings.""" # Validate and encode hostname and zone try: - hostname_str = encode_hostname(args.hostname) - zone = encode_zone(args.zone) - except ValidationError as e: + hostname_str = encode_dnsname(args.hostname) + zone = encode_dnsname(args.zone) + except EncodingError as e: print(f"Error: {e}") return 1 diff --git a/src/ddns_service/dns.py b/src/ddns_service/dns.py index ef45371..46d6cb9 100644 --- a/src/ddns_service/dns.py +++ b/src/ddns_service/dns.py @@ -14,6 +14,75 @@ import dns.tsigkeyring import dns.update +# Valid hostname label pattern (after punycode encoding) +LABEL_PATTERN = re.compile( + r'^[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$', re.IGNORECASE +) + + +class EncodingError(Exception): + """Raised when hostname encoding fails.""" + pass + + +def encode_dnsname(hostname): + """ + Encode hostname to ASCII using punycode (IDNA). + + Args: + hostname: Hostname string, possibly with unicode characters. + + Returns: + ASCII-encoded hostname. + + Raises: + EncodingError: If hostname is invalid. + """ + hostname = hostname.lower().strip() + + if not hostname: + raise EncodingError("Hostname cannot be empty") + + # Remove trailing dot if present + if hostname.endswith('.'): + hostname = hostname[:-1] + + if len(hostname) > 253: + raise EncodingError("Hostname too long (max 253 characters)") + + try: + # Encode each label using IDNA + labels = hostname.split('.') + encoded_labels = [] + + for label in labels: + if not label: + raise EncodingError("Empty label in hostname") + + # Encode to punycode if needed + try: + encoded = label.encode('idna').decode('ascii') + except UnicodeError as e: + raise EncodingError(f"Invalid label '{label}': {e}") + + if len(encoded) > 63: + raise EncodingError( + f"Label '{label}' too long (max 63 characters)" + ) + + if not LABEL_PATTERN.match(encoded): + raise EncodingError(f"Invalid label format: '{label}'") + + encoded_labels.append(encoded) + + return '.'.join(encoded_labels) + + except EncodingError: + raise + except Exception as e: + raise EncodingError(f"Invalid hostname '{hostname}': {e}") + + def detect_ip_type(ip): try: addr = ipaddress.ip_address(ip) diff --git a/src/ddns_service/logging.py b/src/ddns_service/logging.py index 678e822..3394e03 100644 --- a/src/ddns_service/logging.py +++ b/src/ddns_service/logging.py @@ -134,8 +134,3 @@ def setup_logging( def disable_logging(): """Disable all logging (for CLI quiet mode).""" logging.disable(logging.CRITICAL) - - -def enable_logging(): - """Re-enable logging.""" - logging.disable(logging.NOTSET) diff --git a/src/ddns_service/server.py b/src/ddns_service/server.py index 37d2d56..15c2b17 100644 --- a/src/ddns_service/server.py +++ b/src/ddns_service/server.py @@ -15,10 +15,9 @@ from urllib.parse import parse_qs, urlparse import argon2 from .cleanup import ExpiredRecordsCleanupThread, RateLimitCleanupThread -from .dns import detect_ip_type from .logging import clear_txn_id, set_txn_id from .models import DoesNotExist, get_hostname_for_user, get_user -from .validation import encode_hostname, ValidationError +from .dns import detect_ip_type, encode_dnsname, EncodingError def extract_param(params, aliases): @@ -227,8 +226,8 @@ class DDNSRequestHandler(BaseHTTPRequestHandler): # Validate and encode hostname try: - hostname_param = encode_hostname(hostname_param) - except ValidationError: + hostname_param = encode_dnsname(hostname_param) + except EncodingError: logging.warning( f"Invalid hostname: client={client_ip}, " f"hostname={hostname_param}") diff --git a/src/ddns_service/validation.py b/src/ddns_service/validation.py deleted file mode 100644 index 2048ead..0000000 --- a/src/ddns_service/validation.py +++ /dev/null @@ -1,138 +0,0 @@ -"""Hostname and zone validation with punycode support.""" - -import re - - -# Valid hostname label pattern (after punycode encoding) -LABEL_PATTERN = re.compile(r'^[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$', re.IGNORECASE) - - -class ValidationError(Exception): - """Raised when validation fails.""" - pass - - -def encode_hostname(hostname): - """ - Encode hostname to ASCII using punycode (IDNA). - - Args: - hostname: Hostname string, possibly with unicode characters. - - Returns: - ASCII-encoded hostname. - - Raises: - ValidationError: If hostname is invalid. - """ - hostname = hostname.lower().strip() - - if not hostname: - raise ValidationError("Hostname cannot be empty") - - # Remove trailing dot if present - if hostname.endswith('.'): - hostname = hostname[:-1] - - if len(hostname) > 253: - raise ValidationError("Hostname too long (max 253 characters)") - - try: - # Encode each label using IDNA - labels = hostname.split('.') - encoded_labels = [] - - for label in labels: - if not label: - raise ValidationError("Empty label in hostname") - - # Encode to punycode if needed - try: - encoded = label.encode('idna').decode('ascii') - except UnicodeError as e: - raise ValidationError(f"Invalid label '{label}': {e}") - - if len(encoded) > 63: - raise ValidationError( - f"Label '{label}' too long (max 63 characters)" - ) - - if not LABEL_PATTERN.match(encoded): - raise ValidationError(f"Invalid label format: '{label}'") - - encoded_labels.append(encoded) - - return '.'.join(encoded_labels) - - except ValidationError: - raise - except Exception as e: - raise ValidationError(f"Invalid hostname '{hostname}': {e}") - - -def encode_zone(zone): - """ - Encode zone name to ASCII using punycode (IDNA). - - Args: - zone: Zone name string, possibly with unicode characters. - - Returns: - ASCII-encoded zone name. - - Raises: - ValidationError: If zone is invalid. - """ - if not zone: - raise ValidationError("Zone cannot be empty") - - # Zone validation is same as hostname - return encode_hostname(zone) - - -def validate_hostname_in_zone(hostname, zone): - """ - Validate and encode hostname and zone, ensuring hostname is in zone. - - Args: - hostname: Hostname string. - zone: Zone string. - - Returns: - Tuple of (encoded_hostname, encoded_zone). - - Raises: - ValidationError: If validation fails. - """ - encoded_hostname = encode_hostname(hostname) - encoded_zone = encode_zone(zone) - - # Check hostname ends with zone - if not (encoded_hostname == encoded_zone or - encoded_hostname.endswith('.' + encoded_zone)): - raise ValidationError( - f"Hostname '{hostname}' is not in zone '{zone}'" - ) - - return encoded_hostname, encoded_zone - - -def get_relative_name(hostname, zone): - """ - Get the relative name (hostname without zone suffix). - - Args: - hostname: Encoded hostname. - zone: Encoded zone. - - Returns: - Relative name (e.g., 'mypc' from 'mypc.dyn.example.com'). - """ - if hostname == zone: - return '@' - - suffix = '.' + zone - if hostname.endswith(suffix): - return hostname[:-len(suffix)] - - return hostname