Remove dependency on dns-manager and use dnspython directly instead

This commit is contained in:
2026-01-18 17:54:58 +01:00
parent 5570bab736
commit 6c8a1999eb
5 changed files with 323 additions and 87 deletions

View File

@@ -1,11 +1,17 @@
"""DNS operations using dns-manager library."""
"""DNS operations using RFC 2136 dynamic updates."""
import ipaddress
import logging
import os
import re
import stat
import dns.rdataset
import dns.name
import dns.query
import dns.rcode
import dns.rdatatype
from dnsmgr import DNSManager, name_from_text, rdata_from_text
import dns.tsigkeyring
import dns.update
def detect_ip_type(ip):
@@ -27,8 +33,83 @@ class DNSError(Exception):
pass
# TSIG algorithm name mapping (BIND short names -> dnspython FQDN form)
TSIG_ALGORITHMS = {
"hmac-md5": "hmac-md5.sig-alg.reg.int.",
"hmac-sha1": "hmac-sha1.",
"hmac-sha224": "hmac-sha224.",
"hmac-sha256": "hmac-sha256.",
"hmac-sha384": "hmac-sha384.",
"hmac-sha512": "hmac-sha512.",
}
def parse_bind_key_file(path):
"""
Parse BIND TSIG key file.
Format: key "keyname" { algorithm hmac-sha256; secret "base64..."; };
Args:
path: Path to key file, or None.
Returns:
Tuple of (keyring, algorithm) or (None, None) if path is None.
Raises:
DNSError: If parsing fails.
"""
if not path:
return None, None
try:
# Check file permissions
file_stat = os.stat(path)
if file_stat.st_mode & stat.S_IROTH:
logging.warning(f"TSIG key file is world-readable: {path}")
with open(path, "r") as f:
content = f.read()
# Extract key name
name_match = re.search(r'key\s+"([^"]+)"', content)
if not name_match:
raise DNSError(f"Invalid key file {path}: no key name found")
key_name = name_match.group(1)
# Ensure key name ends with dot for FQDN
if not key_name.endswith("."):
key_name = key_name + "."
# Extract algorithm
algo_match = re.search(r'algorithm\s+([^;]+);', content)
if not algo_match:
raise DNSError(f"Invalid key file {path}: no algorithm found")
algo_str = algo_match.group(1).strip().lower()
# Map to dnspython algorithm name
algorithm = TSIG_ALGORITHMS.get(algo_str)
if not algorithm:
raise DNSError(f"Unsupported TSIG algorithm in {path}: {algo_str}")
# Extract secret
secret_match = re.search(r'secret\s+"([^"]+)"', content)
if not secret_match:
raise DNSError(f"Invalid key file {path}: no secret found")
secret = secret_match.group(1)
keyring = dns.tsigkeyring.from_text({key_name: secret})
return keyring, dns.name.from_text(algorithm)
except DNSError:
raise
except Exception as e:
raise DNSError(f"Failed to parse key file {path}: {e}")
class DNSService:
"""DNS service for managing DNS records."""
"""DNS service for RFC 2136 dynamic updates."""
def __init__(self, config):
"""
@@ -41,75 +122,133 @@ class DNSService:
DNSError: If initialization fails.
"""
try:
config_file = config["dns_service"]["manager_config_file"]
self.manager = DNSManager(config_file)
logging.debug(f"DNS manager initialized: config={config_file}")
except Exception as e:
raise DNSError(f"Failed to initialize DNS manager: {e}")
dns_cfg = config["dns_service"]
self.server = dns_cfg["dns_server"]
self.port = dns_cfg.get("dns_port", 53)
self.timeout = dns_cfg.get("dns_timeout", 5)
def _get_zone(self, zone):
"""Get zone object by name."""
zones = self.manager.get_zones(zone)
if not zones:
raise DNSError(f"Zone not found: {zone}")
zone_obj = zones[0]
self.manager.get_zone_content(zone_obj)
return zone_obj
# Parse default TSIG key
default_key_file = dns_cfg.get("ddns_default_key_file")
self.default_keyring, self.default_algorithm = parse_bind_key_file(
default_key_file
)
# Parse per-zone TSIG keys
self.zone_keys = {}
zone_keys_cfg = dns_cfg.get("zone_keys", {})
for zone, key_path in zone_keys_cfg.items():
keyring, algorithm = parse_bind_key_file(key_path)
# Normalize zone name
if not zone.endswith("."):
zone = zone + "."
self.zone_keys[zone] = (keyring, algorithm)
if self.default_keyring or self.zone_keys:
logging.debug(
f"DNS service initialized: server={self.server}:{self.port} "
f"tsig=enabled zones={len(self.zone_keys)}"
)
else:
logging.debug(
f"DNS service initialized: server={self.server}:{self.port} "
f"tsig=disabled"
)
except DNSError:
raise
except Exception as e:
raise DNSError(f"Failed to initialize DNS service: {e}")
def _get_key_for_zone(self, zone):
"""
Get TSIG key for a zone.
Args:
zone: Zone name (will be normalized to FQDN).
Returns:
Tuple of (keyring, algorithm) or (None, None) for unauthenticated.
"""
# Normalize zone name
if not zone.endswith("."):
zone = zone + "."
# Check zone-specific key first
if zone in self.zone_keys:
return self.zone_keys[zone]
# Fall back to default key
return self.default_keyring, self.default_algorithm
def _make_update(self, zone):
"""
Create DNS update message for zone.
Args:
zone: Zone name.
Returns:
dns.update.Update message object.
"""
# Normalize zone name
if not zone.endswith("."):
zone = zone + "."
keyring, algorithm = self._get_key_for_zone(zone)
if keyring:
return dns.update.Update(
zone,
keyring=keyring,
keyalgorithm=algorithm
)
else:
return dns.update.Update(zone)
def _send_update(self, update):
"""
Send DNS update message to server.
Args:
update: dns.update.Update message object.
Raises:
DNSError: If update fails.
"""
try:
response = dns.query.tcp(
update,
self.server,
port=self.port,
timeout=self.timeout
)
rcode = response.rcode()
if rcode != dns.rcode.NOERROR:
raise DNSError(f"DNS update failed: {dns.rcode.to_text(rcode)}")
except dns.exception.Timeout:
raise DNSError(f"DNS update timeout: {self.server}:{self.port}")
except DNSError:
raise
except Exception as e:
raise DNSError(f"DNS update failed: {e}")
def _get_relative_name(self, hostname, zone):
"""Get hostname relative to zone."""
if hostname.endswith("." + zone):
return hostname[:-len(zone) - 1]
return hostname
def _delete_record(self, zone_obj, name, rdtype):
"""Delete record if present."""
deleted = False
zone_obj.filter_by_name(name, zone_obj.origin)
node = zone_obj.get_node(name)
if not node:
return deleted
for rdataset in zone_obj.get_node(name):
if rdataset.rdtype == rdtype:
self.manager.delete_zone_record(zone_obj, name, rdataset)
deleted = True
return deleted
def delete_record(self, hostname, zone, record_type):
"""
Delete DNS record(s) for the given hostname and record type.
Get hostname relative to zone.
Args:
hostname: Fully qualified hostname.
zone: DNS zone name.
record_type: Record type (A or AAAA).
zone: Zone name.
Returns:
True if record was deleted.
Raises:
DNSError: If delete fails.
Relative name for use in DNS update.
"""
try:
deleted = False
zone_obj = self._get_zone(zone)
name = name_from_text(hostname, zone_obj.origin)
rdtype = dns.rdatatype.from_text(record_type)
if self._delete_record(zone_obj, name, rdtype):
logging.debug(
f"DNS record deleted: hostname={hostname} "
f"zone={zone_obj.origin} type={record_type}"
)
return deleted
except Exception as e:
raise DNSError(f"Failed to delete DNS record for {hostname}: {e}")
# Strip zone suffix to get relative name
zone_suffix = "." + zone
if hostname.endswith(zone_suffix):
return hostname[:-len(zone_suffix)]
return hostname
def update_record(self, hostname, zone, ip, ttl):
"""
@@ -126,27 +265,51 @@ class DNSService:
"""
try:
record_type, normalized_ip = detect_ip_type(ip)
name = self._get_relative_name(hostname, zone)
zone_obj = self._get_zone(zone)
name = name_from_text(hostname, zone_obj.origin)
rdtype = dns.rdatatype.from_text(record_type)
update = self._make_update(zone)
update.replace(name, ttl, record_type, normalized_ip)
self._send_update(update)
# Delete existing record if present
self._delete_record(zone_obj, name, rdtype)
# Create new rdata
rdata = rdata_from_text(rdtype, normalized_ip, zone_obj.origin)
# Create rdataset with TTL
rdataset = dns.rdataset.Rdataset(rdata.rdclass, rdata.rdtype)
rdataset.update_ttl(ttl)
rdataset.add(rdata)
# Add the record
self.manager.add_zone_record(zone_obj, name, rdataset)
logging.debug(
f"DNS record updated: hostname={hostname} zone={zone_obj.origin} "
f"DNS record updated: hostname={hostname} zone={zone} "
f"type={record_type} ip={normalized_ip} ttl={ttl}"
)
except DNSError:
raise
except Exception as e:
raise DNSError(f"Failed to update DNS record for {hostname}: {e}")
def delete_record(self, hostname, zone, record_type):
"""
Delete DNS record(s) for the given hostname and record type.
Args:
hostname: Fully qualified hostname.
zone: DNS zone name.
record_type: Record type (A or AAAA).
Returns:
True (for compatibility).
Raises:
DNSError: If delete fails.
"""
try:
name = self._get_relative_name(hostname, zone)
update = self._make_update(zone)
update.delete(name, record_type)
self._send_update(update)
logging.debug(
f"DNS record deleted: hostname={hostname} zone={zone} "
f"type={record_type}"
)
return True
except DNSError:
raise
except Exception as e:
raise DNSError(f"Failed to delete DNS record for {hostname}: {e}")