Files
ddns-service/src/ddns_service/config.py

215 lines
6.7 KiB
Python

"""Configuration loading and management."""
import os
try:
import tomllib
except ImportError:
import tomli as tomllib
# Default config paths (searched in order)
CONFIG_PATHS = [
"/etc/ddns-service/config.toml",
"./config.toml",
]
# Default endpoint parameter aliases
DEFAULT_ENDPOINT_PARAMS = {
"hostname": ["hostname", "host"],
"ipv4": ["myip", "ipv4", "ip4"],
"ipv6": ["myip6", "ipv6", "ip6"],
"username": ["username", "user"],
"password": ["password", "pass", "token"],
"notify_change": ["notify_change"],
}
VALID_PARAM_KEYS = frozenset(DEFAULT_ENDPOINT_PARAMS.keys())
class ConfigError(Exception):
"""Raised when configuration loading fails."""
pass
def normalize_endpoint(endpoint):
"""
Normalize and validate endpoint configuration.
Args:
endpoint: Raw endpoint dict from config.
Returns:
Normalized endpoint dict with validated params.
Raises:
ConfigError: If validation fails.
"""
if "path" not in endpoint:
raise ConfigError("Endpoint missing path")
path = endpoint["path"]
if not isinstance(path, str) or not path.startswith("/"):
raise ConfigError("Endpoint path must start with /")
# Start with defaults
params = {k: list(v) for k, v in DEFAULT_ENDPOINT_PARAMS.items()}
# Override with user-defined params
if "params" in endpoint:
user_params = endpoint["params"]
if not isinstance(user_params, dict):
raise ConfigError("Endpoint params must be a dict")
for key, aliases in user_params.items():
if key not in VALID_PARAM_KEYS:
raise ConfigError("Unknown endpoint param: " + key)
if not isinstance(aliases, list):
raise ConfigError("Param " + key + " aliases must be list")
if not aliases:
raise ConfigError("Param " + key + " must have at least one alias")
for alias in aliases:
if not isinstance(alias, str) or not alias:
raise ConfigError("Param " + key + " aliases must be non-empty strings")
params[key] = aliases
return {"path": path, "params": params}
def find_config_file(custom_path=None):
"""
Find and return the config file path.
Args:
custom_path: Optional custom path to config file.
Returns:
Path to config file.
Raises:
ConfigError: If config file not found.
"""
if custom_path:
if os.path.isfile(custom_path):
return custom_path
raise ConfigError(f"Config file not found: {custom_path}")
for path in CONFIG_PATHS:
if os.path.isfile(path):
return path
raise ConfigError(
f"Config file not found. Searched: {', '.join(CONFIG_PATHS)}"
)
def load_config(config_path):
"""
Load and validate configuration from TOML file.
Args:
config_path: Path to config file.
Returns:
Configuration dictionary with defaults applied.
Raises:
ConfigError: If loading fails.
"""
try:
with open(config_path, "rb") as f:
cfg = tomllib.load(f)
except Exception as e:
raise ConfigError(f"Failed to load config: {e}")
# Set defaults for missing sections
cfg.setdefault("daemon", {})
cfg["daemon"].setdefault("host", "localhost")
cfg["daemon"].setdefault("port", 8443)
cfg["daemon"].setdefault("log_level", "INFO")
cfg["daemon"].setdefault("log_target", "stdout")
cfg["daemon"].setdefault("syslog_socket", "/dev/log")
cfg["daemon"].setdefault("syslog_facility", "daemon")
cfg["daemon"].setdefault("log_file", "/var/log/ddns-service/ddns-service.log")
cfg["daemon"].setdefault("log_file_size", 52428800)
cfg["daemon"].setdefault("log_versions", 5)
cfg["daemon"].setdefault("log_requests", False)
cfg["daemon"].setdefault("ssl", False)
cfg["daemon"].setdefault("proxy_header", "")
cfg["daemon"].setdefault("trusted_proxies", [])
cfg.setdefault("database", {})
cfg["database"].setdefault("backend", "sqlite")
cfg.setdefault("dns_service", {})
cfg["dns_service"].setdefault("dns_server", "127.0.0.1")
cfg["dns_service"].setdefault("dns_port", 53)
cfg["dns_service"].setdefault("dns_timeout", 5)
cfg["dns_service"].setdefault("cleanup_interval", 60)
if cfg["dns_service"]["cleanup_interval"] < 1:
cfg["dns_service"]["cleanup_interval"] = 1
# Validate dns_server
if not cfg["dns_service"]["dns_server"]:
raise ConfigError("dns_service.dns_server cannot be empty")
# Validate ddns_default_key_file if provided
default_key = cfg["dns_service"].get("ddns_default_key_file")
if default_key and not os.path.isfile(default_key):
raise ConfigError(f"ddns_default_key_file not found: {default_key}")
# Validate zone_keys if provided
zone_keys = cfg["dns_service"].get("zone_keys", {})
if not isinstance(zone_keys, dict):
raise ConfigError("dns_service.zone_keys must be a table")
for zone, key_path in zone_keys.items():
if not isinstance(key_path, str):
raise ConfigError(f"dns_service.zone_keys.{zone} must be a string path")
if not os.path.isfile(key_path):
raise ConfigError(f"Zone key file not found for {zone}: {key_path}")
cfg.setdefault("defaults", {})
cfg["defaults"].setdefault("dns_ttl", 60)
cfg["defaults"].setdefault("expiry_ttl", 3600)
cfg.setdefault("email", {})
cfg["email"].setdefault("enabled", False)
cfg["email"].setdefault("smtp_port", 25)
cfg.setdefault("rate_limit", {})
cfg["rate_limit"].setdefault("enabled", True)
cfg["rate_limit"].setdefault("good_window_seconds", 60)
cfg["rate_limit"].setdefault("good_max_requests", 5)
cfg["rate_limit"].setdefault("bad_window_seconds", 60)
cfg["rate_limit"].setdefault("bad_max_requests", 3)
cfg["rate_limit"].setdefault("cleanup_interval", 60)
if cfg["rate_limit"]["cleanup_interval"] < 1:
cfg["rate_limit"]["cleanup_interval"] = 1
# Process endpoints
if "endpoints" not in cfg:
# Create default endpoint
cfg["endpoints"] = [{
"path": "/update",
"params": {k: list(v) for k, v in DEFAULT_ENDPOINT_PARAMS.items()},
}]
else:
normalized = []
paths_seen = set()
for ep in cfg["endpoints"]:
ep = normalize_endpoint(ep)
if ep["path"] in paths_seen:
raise ConfigError("Duplicate endpoint path: " + ep["path"])
paths_seen.add(ep["path"])
normalized.append(ep)
cfg["endpoints"] = normalized
# Build path->endpoint lookup
cfg["_endpoint_map"] = {ep["path"]: ep for ep in cfg["endpoints"]}
return cfg