This commit is contained in:
2026-01-22 00:36:48 +01:00
parent cb476c0a1a
commit f297a8d740
6 changed files with 86 additions and 160 deletions

View File

@@ -44,13 +44,15 @@ def cleanup_expired(app):
if app.dns_service: if app.dns_service:
if ipv4_expired: if ipv4_expired:
logging.info( logging.info(
f"Host expired: hostname={hostname.hostname} zone={hostname.zone} ip={hostname.last_ipv4}" f"Host expired: hostname={hostname.hostname} zone={hostname.zone} "
f"ip={hostname.last_ipv4}"
) )
app.dns_service.delete_record(hostname.hostname, hostname.zone, "A") app.dns_service.delete_record(hostname.hostname, hostname.zone, "A")
if ipv6_expired: if ipv6_expired:
logging.info( logging.info(
f"Host expired: hostname={hostname.hostname} zone={hostname.zone} ip={hostname.last_ipv6}" f"Host expired: hostname={hostname.hostname} zone={hostname.zone} "
f"ip={hostname.last_ipv6}"
) )
app.dns_service.delete_record(hostname.hostname, hostname.zone, "AAAA") app.dns_service.delete_record(hostname.hostname, hostname.zone, "AAAA")

View File

@@ -5,14 +5,13 @@ import logging
from .cleanup import cleanup_expired from .cleanup import cleanup_expired
from .models import ( from .models import (
create_tables,
DoesNotExist, DoesNotExist,
get_hostname, get_hostname,
get_user, get_user,
Hostname, Hostname,
User, User,
) )
from .validation import encode_hostname, encode_zone, ValidationError from .dns import encode_dnsname, EncodingError
def cmd_user_list(args, app): def cmd_user_list(args, app):
@@ -170,9 +169,9 @@ def cmd_hostname_add(args, app):
# Validate and encode hostname/zone # Validate and encode hostname/zone
try: try:
hostname_str = encode_hostname(args.hostname) hostname_str = encode_dnsname(args.hostname)
zone = encode_zone(args.zone) zone = encode_dnsname(args.zone)
except ValidationError as e: except EncodingError as e:
print(f"Error: {e}") print(f"Error: {e}")
return 1 return 1
@@ -214,9 +213,9 @@ def cmd_hostname_delete(args, app):
"""Delete a hostname.""" """Delete a hostname."""
# Validate and encode hostname and zone # Validate and encode hostname and zone
try: try:
hostname_str = encode_hostname(args.hostname) hostname_str = encode_dnsname(args.hostname)
zone = encode_zone(args.zone) zone = encode_dnsname(args.zone)
except ValidationError as e: except EncodingError as e:
print(f"Error: {e}") print(f"Error: {e}")
return 1 return 1
@@ -260,9 +259,9 @@ def cmd_hostname_modify(args, app):
"""Modify hostname settings.""" """Modify hostname settings."""
# Validate and encode hostname and zone # Validate and encode hostname and zone
try: try:
hostname_str = encode_hostname(args.hostname) hostname_str = encode_dnsname(args.hostname)
zone = encode_zone(args.zone) zone = encode_dnsname(args.zone)
except ValidationError as e: except EncodingError as e:
print(f"Error: {e}") print(f"Error: {e}")
return 1 return 1

View File

@@ -14,6 +14,75 @@ import dns.tsigkeyring
import dns.update import dns.update
# Valid hostname label pattern (after punycode encoding)
LABEL_PATTERN = re.compile(
r'^[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$', re.IGNORECASE
)
class EncodingError(Exception):
"""Raised when hostname encoding fails."""
pass
def encode_dnsname(hostname):
"""
Encode hostname to ASCII using punycode (IDNA).
Args:
hostname: Hostname string, possibly with unicode characters.
Returns:
ASCII-encoded hostname.
Raises:
EncodingError: If hostname is invalid.
"""
hostname = hostname.lower().strip()
if not hostname:
raise EncodingError("Hostname cannot be empty")
# Remove trailing dot if present
if hostname.endswith('.'):
hostname = hostname[:-1]
if len(hostname) > 253:
raise EncodingError("Hostname too long (max 253 characters)")
try:
# Encode each label using IDNA
labels = hostname.split('.')
encoded_labels = []
for label in labels:
if not label:
raise EncodingError("Empty label in hostname")
# Encode to punycode if needed
try:
encoded = label.encode('idna').decode('ascii')
except UnicodeError as e:
raise EncodingError(f"Invalid label '{label}': {e}")
if len(encoded) > 63:
raise EncodingError(
f"Label '{label}' too long (max 63 characters)"
)
if not LABEL_PATTERN.match(encoded):
raise EncodingError(f"Invalid label format: '{label}'")
encoded_labels.append(encoded)
return '.'.join(encoded_labels)
except EncodingError:
raise
except Exception as e:
raise EncodingError(f"Invalid hostname '{hostname}': {e}")
def detect_ip_type(ip): def detect_ip_type(ip):
try: try:
addr = ipaddress.ip_address(ip) addr = ipaddress.ip_address(ip)

View File

@@ -134,8 +134,3 @@ def setup_logging(
def disable_logging(): def disable_logging():
"""Disable all logging (for CLI quiet mode).""" """Disable all logging (for CLI quiet mode)."""
logging.disable(logging.CRITICAL) logging.disable(logging.CRITICAL)
def enable_logging():
"""Re-enable logging."""
logging.disable(logging.NOTSET)

View File

@@ -15,10 +15,9 @@ from urllib.parse import parse_qs, urlparse
import argon2 import argon2
from .cleanup import ExpiredRecordsCleanupThread, RateLimitCleanupThread from .cleanup import ExpiredRecordsCleanupThread, RateLimitCleanupThread
from .dns import detect_ip_type
from .logging import clear_txn_id, set_txn_id from .logging import clear_txn_id, set_txn_id
from .models import DoesNotExist, get_hostname_for_user, get_user from .models import DoesNotExist, get_hostname_for_user, get_user
from .validation import encode_hostname, ValidationError from .dns import detect_ip_type, encode_dnsname, EncodingError
def extract_param(params, aliases): def extract_param(params, aliases):
@@ -227,8 +226,8 @@ class DDNSRequestHandler(BaseHTTPRequestHandler):
# Validate and encode hostname # Validate and encode hostname
try: try:
hostname_param = encode_hostname(hostname_param) hostname_param = encode_dnsname(hostname_param)
except ValidationError: except EncodingError:
logging.warning( logging.warning(
f"Invalid hostname: client={client_ip}, " f"Invalid hostname: client={client_ip}, "
f"hostname={hostname_param}") f"hostname={hostname_param}")

View File

@@ -1,138 +0,0 @@
"""Hostname and zone validation with punycode support."""
import re
# Valid hostname label pattern (after punycode encoding)
LABEL_PATTERN = re.compile(r'^[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$', re.IGNORECASE)
class ValidationError(Exception):
"""Raised when validation fails."""
pass
def encode_hostname(hostname):
"""
Encode hostname to ASCII using punycode (IDNA).
Args:
hostname: Hostname string, possibly with unicode characters.
Returns:
ASCII-encoded hostname.
Raises:
ValidationError: If hostname is invalid.
"""
hostname = hostname.lower().strip()
if not hostname:
raise ValidationError("Hostname cannot be empty")
# Remove trailing dot if present
if hostname.endswith('.'):
hostname = hostname[:-1]
if len(hostname) > 253:
raise ValidationError("Hostname too long (max 253 characters)")
try:
# Encode each label using IDNA
labels = hostname.split('.')
encoded_labels = []
for label in labels:
if not label:
raise ValidationError("Empty label in hostname")
# Encode to punycode if needed
try:
encoded = label.encode('idna').decode('ascii')
except UnicodeError as e:
raise ValidationError(f"Invalid label '{label}': {e}")
if len(encoded) > 63:
raise ValidationError(
f"Label '{label}' too long (max 63 characters)"
)
if not LABEL_PATTERN.match(encoded):
raise ValidationError(f"Invalid label format: '{label}'")
encoded_labels.append(encoded)
return '.'.join(encoded_labels)
except ValidationError:
raise
except Exception as e:
raise ValidationError(f"Invalid hostname '{hostname}': {e}")
def encode_zone(zone):
"""
Encode zone name to ASCII using punycode (IDNA).
Args:
zone: Zone name string, possibly with unicode characters.
Returns:
ASCII-encoded zone name.
Raises:
ValidationError: If zone is invalid.
"""
if not zone:
raise ValidationError("Zone cannot be empty")
# Zone validation is same as hostname
return encode_hostname(zone)
def validate_hostname_in_zone(hostname, zone):
"""
Validate and encode hostname and zone, ensuring hostname is in zone.
Args:
hostname: Hostname string.
zone: Zone string.
Returns:
Tuple of (encoded_hostname, encoded_zone).
Raises:
ValidationError: If validation fails.
"""
encoded_hostname = encode_hostname(hostname)
encoded_zone = encode_zone(zone)
# Check hostname ends with zone
if not (encoded_hostname == encoded_zone or
encoded_hostname.endswith('.' + encoded_zone)):
raise ValidationError(
f"Hostname '{hostname}' is not in zone '{zone}'"
)
return encoded_hostname, encoded_zone
def get_relative_name(hostname, zone):
"""
Get the relative name (hostname without zone suffix).
Args:
hostname: Encoded hostname.
zone: Encoded zone.
Returns:
Relative name (e.g., 'mypc' from 'mypc.dyn.example.com').
"""
if hostname == zone:
return '@'
suffix = '.' + zone
if hostname.endswith(suffix):
return hostname[:-len(suffix)]
return hostname