Rename project to ddns-service

This commit is contained in:
2026-01-18 14:32:41 +01:00
parent d0ac96bad8
commit 27fd8ab438
18 changed files with 61 additions and 61 deletions

306
src/ddns_service/cli.py Normal file
View File

@@ -0,0 +1,306 @@
"""CLI commands for user and hostname management."""
import getpass
import logging
from .cleanup import cleanup_expired
from .models import (
create_tables,
DoesNotExist,
get_hostname,
get_user,
Hostname,
User,
)
from .validation import encode_hostname, encode_zone, ValidationError
def cmd_init_db(args, app):
"""Initialize database tables."""
create_tables()
print("Database tables created.")
return 0
def cmd_user_list(args, app):
"""List all users."""
users = User.select()
if not users:
print("No users found.")
return 0
print(f"\n{'Username':<20} {'Email':<30} {'Hostnames':<10} {'Created'}")
print("-" * 82)
for user in users:
hostname_count = Hostname.select().where(
Hostname.user == user
).count()
created_at = user.created_at.strftime("%Y-%m-%d %H:%M:%S")
print(
f"{user.username:<20} {user.email:<30} "
f"{hostname_count:<10} {created_at}"
)
return 0
def cmd_user_add(args, app):
"""Add a new user."""
username = args.username
email = args.email
# Check if user exists
if User.select().where(User.username == username).exists():
print(f"Error: User '{username}' already exists.")
return 1
# Get password
password = getpass.getpass("Password: ")
password_confirm = getpass.getpass("Confirm password: ")
if password != password_confirm:
print("Error: Passwords do not match.")
return 1
if len(password) < 8:
print("Error: Password must be at least 8 characters.")
return 1
# Hash password and create user
password_hash = app.password_hasher.hash(password)
User.create(username=username, email=email, password_hash=password_hash)
print(f"User '{username}' created.")
return 0
def cmd_user_delete(args, app):
"""Delete a user."""
username = args.username
try:
user = get_user(username)
except DoesNotExist:
print(f"Error: User '{username}' not found.")
return 1
# Check for hostnames
hostname_count = Hostname.select().where(Hostname.user == user).count()
if hostname_count > 0:
print(f"Error: User has {hostname_count} hostname(s). Delete them first.")
return 1
user.delete_instance()
print(f"User '{username}' deleted.")
return 0
def cmd_user_passwd(args, app):
"""Change user password."""
username = args.username
try:
user = get_user(username)
except DoesNotExist:
print(f"Error: User '{username}' not found.")
return 1
password = getpass.getpass("New password: ")
password_confirm = getpass.getpass("Confirm password: ")
if password != password_confirm:
print("Error: Passwords do not match.")
return 1
if len(password) < 8:
print("Error: Password must be at least 8 characters.")
return 1
user.password_hash = app.password_hasher.hash(password)
user.save()
print(f"Password updated for '{username}'.")
return 0
def cmd_user_email(args, app):
"""Update user email."""
username = args.username
email = args.email
try:
user = get_user(username)
except DoesNotExist:
print(f"Error: User '{username}' not found.")
return 1
user.email = email
user.save()
print(f"Email updated for '{username}'.")
return 0
def cmd_hostname_list(args, app):
"""List hostnames."""
query = Hostname.select().join(User)
if args.user:
try:
user = get_user(args.user)
query = query.where(Hostname.user == user)
except DoesNotExist:
print(f"Error: User '{args.user}' not found.")
return 1
hostnames = list(query)
if not hostnames:
print("No hostnames found.")
return 0
print(
f"{'Hostname':<35} {'User':<15} {'Zone':<20} "
f"{'DNS-TTL':<8} {'Exp-TTL':<8} {'Last-Update IPv4':<21} {'Last-Update IPv6'}"
)
print("-" * 132)
for h in hostnames:
last_ipv4_update = h.last_ipv4_update.strftime("%Y-%m-%d %H:%M:%S") if h.last_ipv4_update else "Never"
last_ipv6_update = h.last_ipv6_update.strftime("%Y-%m-%d %H:%M:%S") if h.last_ipv6_update else "Never"
print(
f"{h.hostname:<35} {h.user.username:<15} {h.zone:<20} "
f"{h.dns_ttl:<8} {h.expiry_ttl:<8} {last_ipv4_update:<21} {last_ipv6_update}"
)
return 0
def cmd_hostname_add(args, app):
"""Add a hostname."""
username = args.username
config = app.config
# Validate and encode hostname/zone
try:
hostname_str = encode_hostname(args.hostname)
zone = encode_zone(args.zone)
except ValidationError as e:
print(f"Error: {e}")
return 1
# Get TTLs from args or config defaults
dns_ttl = args.dns_ttl
if dns_ttl is None:
dns_ttl = config["defaults"]["dns_ttl"]
expiry_ttl = args.expiry_ttl
if expiry_ttl is None:
expiry_ttl = config["defaults"]["expiry_ttl"]
# Get user
try:
user = get_user(username)
except DoesNotExist:
print(f"Error: User '{username}' not found.")
return 1
# Check if hostname exists
if Hostname.select().where(Hostname.hostname == hostname_str).exists():
print(f"Error: Hostname '{hostname_str}' already exists.")
return 1
# Create hostname
Hostname.create(
user=user,
hostname=hostname_str,
zone=zone,
dns_ttl=dns_ttl,
expiry_ttl=expiry_ttl
)
print(f"Hostname '{hostname_str}' added for user '{username}'.")
return 0
def cmd_hostname_delete(args, app):
"""Delete a hostname."""
# Validate and encode hostname
try:
hostname_str = encode_hostname(args.hostname)
except ValidationError as e:
print(f"Error: {e}")
return 1
try:
hostname = get_hostname(hostname_str)
except DoesNotExist:
print(f"Error: Hostname '{hostname_str}' not found.")
return 1
# Delete DNS records if active
if hostname.last_ipv4 or hostname.last_ipv6:
# Initialize DNS service if not already
if app.dns_service is None:
try:
app.init_dns()
except Exception as e:
logging.warning(f"DNS init failed: {e}")
if app.dns_service:
if hostname.last_ipv4:
try:
app.dns_service.delete_record(
hostname.hostname, hostname.zone, "A"
)
except Exception as e:
logging.warning(f"DNS delete failed: type=A error={e}")
if hostname.last_ipv6:
try:
app.dns_service.delete_record(
hostname.hostname, hostname.zone, "AAAA"
)
except Exception as e:
logging.warning(f"DNS delete failed: type=AAAA error={e}")
hostname.delete_instance()
print(f"Hostname '{hostname_str}' deleted.")
return 0
def cmd_hostname_modify(args, app):
"""Modify hostname settings."""
# Validate and encode hostname
try:
hostname_str = encode_hostname(args.hostname)
except ValidationError as e:
print(f"Error: {e}")
return 1
try:
hostname = get_hostname(hostname_str)
except DoesNotExist:
print(f"Error: Hostname '{hostname_str}' not found.")
return 1
# Get new TTLs
dns_ttl = args.dns_ttl if args.dns_ttl is not None else hostname.dns_ttl
expiry_ttl = args.expiry_ttl if args.expiry_ttl is not None else hostname.expiry_ttl
hostname.dns_ttl = dns_ttl
hostname.expiry_ttl = expiry_ttl
hostname.save()
print(
f"Hostname '{hostname_str}' updated: "
f"dns_ttl={dns_ttl}, expiry_ttl={expiry_ttl}"
)
return 0
def cmd_cleanup(args, app):
"""Run cleanup manually."""
# Initialize services if not already
if app.dns_service is None:
try:
app.init_dns()
except Exception as e:
logging.warning(f"DNS init failed: {e}")
if app.email_service is None:
app.init_email()
count = cleanup_expired(app)
print(f"Cleanup complete: {count} expired hostname(s) processed.")
return 0