"""Configuration loading and management.""" import os try: import tomllib except ImportError: import tomli as tomllib # Default config paths (searched in order) CONFIG_PATHS = [ "/etc/ddns-service/config.toml", "./config.toml", ] # Default endpoint parameter aliases DEFAULT_ENDPOINT_PARAMS = { "hostname": ["hostname", "host"], "ipv4": ["myip", "ipv4", "ip4"], "ipv6": ["myip6", "ipv6", "ip6"], "username": ["username", "user"], "password": ["password", "pass", "token"], } VALID_PARAM_KEYS = frozenset(DEFAULT_ENDPOINT_PARAMS.keys()) class ConfigError(Exception): """Raised when configuration loading fails.""" pass def normalize_endpoint(endpoint): """ Normalize and validate endpoint configuration. Args: endpoint: Raw endpoint dict from config. Returns: Normalized endpoint dict with validated params. Raises: ConfigError: If validation fails. """ if "path" not in endpoint: raise ConfigError("Endpoint missing path") path = endpoint["path"] if not isinstance(path, str) or not path.startswith("/"): raise ConfigError("Endpoint path must start with /") # Start with defaults params = {k: list(v) for k, v in DEFAULT_ENDPOINT_PARAMS.items()} # Override with user-defined params if "params" in endpoint: user_params = endpoint["params"] if not isinstance(user_params, dict): raise ConfigError("Endpoint params must be a dict") for key, aliases in user_params.items(): if key not in VALID_PARAM_KEYS: raise ConfigError("Unknown endpoint param: " + key) if not isinstance(aliases, list): raise ConfigError("Param " + key + " aliases must be list") if not aliases: raise ConfigError("Param " + key + " must have at least one alias") for alias in aliases: if not isinstance(alias, str) or not alias: raise ConfigError("Param " + key + " aliases must be non-empty strings") params[key] = aliases return {"path": path, "params": params} def find_config_file(custom_path=None): """ Find and return the config file path. Args: custom_path: Optional custom path to config file. Returns: Path to config file. Raises: ConfigError: If config file not found. """ if custom_path: if os.path.isfile(custom_path): return custom_path raise ConfigError(f"Config file not found: {custom_path}") for path in CONFIG_PATHS: if os.path.isfile(path): return path raise ConfigError( f"Config file not found. Searched: {', '.join(CONFIG_PATHS)}" ) def load_config(config_path): """ Load and validate configuration from TOML file. Args: config_path: Path to config file. Returns: Configuration dictionary with defaults applied. Raises: ConfigError: If loading fails. """ try: with open(config_path, "rb") as f: cfg = tomllib.load(f) except Exception as e: raise ConfigError(f"Failed to load config: {e}") # Set defaults for missing sections cfg.setdefault("daemon", {}) cfg["daemon"].setdefault("host", "localhost") cfg["daemon"].setdefault("port", 8443) cfg["daemon"].setdefault("log_level", "INFO") cfg["daemon"].setdefault("log_target", "stdout") cfg["daemon"].setdefault("syslog_socket", "/dev/log") cfg["daemon"].setdefault("syslog_facility", "daemon") cfg["daemon"].setdefault("log_file", "/var/log/ddns-service/ddns-service.log") cfg["daemon"].setdefault("log_file_size", 52428800) cfg["daemon"].setdefault("log_versions", 5) cfg["daemon"].setdefault("log_requests", False) cfg["daemon"].setdefault("ssl", False) cfg["daemon"].setdefault("proxy_header", "") cfg["daemon"].setdefault("trusted_proxies", []) cfg.setdefault("database", {}) cfg["database"].setdefault("backend", "sqlite") cfg.setdefault("dns_service", {}) cfg["dns_service"].setdefault("dns_server", "127.0.0.1") cfg["dns_service"].setdefault("dns_port", 53) cfg["dns_service"].setdefault("dns_timeout", 5) cfg["dns_service"].setdefault("cleanup_interval", 60) if cfg["dns_service"]["cleanup_interval"] < 1: cfg["dns_service"]["cleanup_interval"] = 1 # Validate dns_server if not cfg["dns_service"]["dns_server"]: raise ConfigError("dns_service.dns_server cannot be empty") # Validate ddns_default_key_file if provided default_key = cfg["dns_service"].get("ddns_default_key_file") if default_key and not os.path.isfile(default_key): raise ConfigError(f"ddns_default_key_file not found: {default_key}") # Validate zone_keys if provided zone_keys = cfg["dns_service"].get("zone_keys", {}) if not isinstance(zone_keys, dict): raise ConfigError("dns_service.zone_keys must be a table") for zone, key_path in zone_keys.items(): if not isinstance(key_path, str): raise ConfigError(f"dns_service.zone_keys.{zone} must be a string path") if not os.path.isfile(key_path): raise ConfigError(f"Zone key file not found for {zone}: {key_path}") cfg.setdefault("defaults", {}) cfg["defaults"].setdefault("dns_ttl", 60) cfg["defaults"].setdefault("expiry_ttl", 3600) cfg.setdefault("email", {}) cfg["email"].setdefault("enabled", False) cfg["email"].setdefault("smtp_port", 25) cfg.setdefault("rate_limit", {}) cfg["rate_limit"].setdefault("enabled", True) cfg["rate_limit"].setdefault("good_window_seconds", 60) cfg["rate_limit"].setdefault("good_max_requests", 5) cfg["rate_limit"].setdefault("bad_window_seconds", 60) cfg["rate_limit"].setdefault("bad_max_requests", 3) cfg["rate_limit"].setdefault("cleanup_interval", 60) if cfg["rate_limit"]["cleanup_interval"] < 1: cfg["rate_limit"]["cleanup_interval"] = 1 # Process endpoints if "endpoints" not in cfg: # Create default endpoint cfg["endpoints"] = [{ "path": "/update", "params": {k: list(v) for k, v in DEFAULT_ENDPOINT_PARAMS.items()}, }] else: normalized = [] paths_seen = set() for ep in cfg["endpoints"]: ep = normalize_endpoint(ep) if ep["path"] in paths_seen: raise ConfigError("Duplicate endpoint path: " + ep["path"]) paths_seen.add(ep["path"]) normalized.append(ep) cfg["endpoints"] = normalized # Build path->endpoint lookup cfg["_endpoint_map"] = {ep["path"]: ep for ep in cfg["endpoints"]} return cfg