Add DNS query function
This commit is contained in:
@@ -10,6 +10,7 @@ import dns.name
|
||||
import dns.query
|
||||
import dns.rcode
|
||||
import dns.rdatatype
|
||||
import dns.resolver
|
||||
import dns.tsigkeyring
|
||||
import dns.update
|
||||
|
||||
@@ -353,6 +354,38 @@ class DNSService:
|
||||
return hostname[:-len(zone_suffix)]
|
||||
return hostname
|
||||
|
||||
def query_record(self, hostname, zone, record_type):
|
||||
"""
|
||||
Check if DNS record exists.
|
||||
|
||||
Args:
|
||||
hostname: Hostname (without zone suffix).
|
||||
zone: DNS zone name.
|
||||
record_type: Record type string (A or AAAA).
|
||||
|
||||
Returns:
|
||||
IP address string if record exists, None otherwise.
|
||||
"""
|
||||
fqdn = f"{self._get_relative_name(hostname, zone)}.{zone}"
|
||||
if not fqdn.endswith("."):
|
||||
fqdn += "."
|
||||
try:
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.nameservers = [self.server]
|
||||
resolver.port = self.port
|
||||
resolver.lifetime = self.timeout
|
||||
answers = resolver.resolve(fqdn, record_type)
|
||||
return str(answers[0]) if answers else None
|
||||
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer,
|
||||
dns.resolver.NoNameservers):
|
||||
return None
|
||||
except Exception as e:
|
||||
logging.warning(
|
||||
f"DNS query failed: hostname={hostname} zone={zone} "
|
||||
f"type={record_type}: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
def update_record(self, hostname, zone, ip, ttl):
|
||||
"""
|
||||
Update a DNS record for the given hostname.
|
||||
|
||||
Reference in New Issue
Block a user