"""TTL cleanup functionality.""" import logging import threading from datetime import datetime, timedelta from .models import Hostname, User def cleanup_expired(app): """ Clean up expired hostnames and return count of cleaned entries. Args: app: Application instance with dns_service and email_service. Returns: Number of expired hostnames processed. """ now = datetime.now() expired_count = 0 for hostname in Hostname.select().join(User).where( (Hostname.expiry_ttl != 0) & ((Hostname.last_ipv4.is_null(False) & Hostname.last_ipv4_update.is_null(False)) | (Hostname.last_ipv6.is_null(False) & Hostname.last_ipv6_update.is_null(False)))): ipv4_expired = False ipv6_expired = False if hostname.last_ipv4: expiry_time = hostname.last_ipv4_update + timedelta(seconds=hostname.expiry_ttl) if now > expiry_time: ipv4_expired = True if hostname.last_ipv6: expiry_time = hostname.last_ipv6_update + timedelta(seconds=hostname.expiry_ttl) if now > expiry_time: ipv6_expired = True if not ipv4_expired and not ipv6_expired: continue if app.dns_service: if ipv4_expired: logging.info( f"Host expired: hostname={hostname.hostname} zone={hostname.zone} " f"ip={hostname.last_ipv4}" ) app.dns_service.delete_record(hostname.hostname, hostname.zone, "A") if ipv6_expired: logging.info( f"Host expired: hostname={hostname.hostname} zone={hostname.zone} " f"ip={hostname.last_ipv6}" ) app.dns_service.delete_record(hostname.hostname, hostname.zone, "AAAA") if app.email_service: last_ipv4 = last_ipv4_update = None if ipv4_expired: last_ipv4 = hostname.last_ipv4 last_ipv4_update = hostname.last_ipv4_update last_ipv6 = last_ipv6_update = None if ipv6_expired: last_ipv6 = hostname.last_ipv6 last_ipv6_update = hostname.last_ipv6_update app.email_service.send_expiry_notification( hostname.user.email, f"{hostname.hostname}.{hostname.zone}", last_ipv4, last_ipv4_update, last_ipv6, last_ipv6_update, hostname.expiry_ttl ) # Clear IP addresses if ipv4_expired: hostname.last_ipv4 = None if ipv6_expired: hostname.last_ipv6 = None if ipv4_expired or ipv6_expired: hostname.save() expired_count += 1 return expired_count class ExpiredRecordsCleanupThread(threading.Thread): """Background thread for periodic expired records cleanup.""" def __init__(self, app): """ Initialize expired records cleanup thread. Args: app: Application instance. """ super().__init__(daemon=True) self.app = app self.interval = app.config["dns_service"]["cleanup_interval"] self.stop_event = threading.Event() def run(self): """Run the cleanup loop.""" logging.info(f"Expired records cleanup thread started: interval={self.interval}s") while not self.stop_event.wait(self.interval): try: count = cleanup_expired(self.app) if count > 0: logging.info(f"Expired records cleanup completed: count={count}") except Exception as e: logging.error(f"Expired records cleanup error: {e}") def stop(self): """Signal the thread to stop.""" self.stop_event.set() class RateLimitCleanupThread(threading.Thread): """Background thread for periodic rate limiter cleanup.""" def __init__(self, app): """ Initialize rate limiter cleanup thread. Args: app: Application instance. """ super().__init__(daemon=True) self.app = app self.interval = app.config["rate_limit"]["cleanup_interval"] self.stop_event = threading.Event() def run(self): """Run the cleanup loop.""" logging.info(f"Rate limit cleanup thread started: interval={self.interval}s") while not self.stop_event.wait(self.interval): try: if self.app.good_limiter: self.app.good_limiter.cleanup() if self.app.bad_limiter: self.app.bad_limiter.cleanup() except Exception as e: logging.error(f"Rate limit cleanup error: {e}") def stop(self): """Signal the thread to stop.""" self.stop_event.set()