Files
ddns-service/src/ddns_service/cli.py

326 lines
8.8 KiB
Python

"""CLI commands for user and hostname management."""
import getpass
from . import datetime_str
from .cleanup import cleanup_expired
from .dns import encode_dnsname
from .models import (
DatabaseError,
DoesNotExist,
EncodingError,
get_hostname,
get_user,
Hostname,
User,
)
def cmd_user_list(args, app):
"""List all users."""
users = User.select()
if not users:
print("No users found.")
return 0
print(f"\n{'Username':<20} {'Email':<30} {'Hostnames':<10} {'Created'}")
print("-" * 86)
for user in users:
hostname_count = Hostname.select().where(
Hostname.user == user
).count()
created_at = datetime_str(user.created_at)
print(
f"{user.username:<20} {user.email:<30} "
f"{hostname_count:<10} {created_at}"
)
return 0
def cmd_user_add(args, app):
"""Add a new user."""
username = args.username
email = args.email
# Check if user exists
if User.select().where(User.username == username).exists():
print(f"Error: User '{username}' already exists.")
return 1
# Get password
password = getpass.getpass("Password: ")
password_confirm = getpass.getpass("Confirm password: ")
if password != password_confirm:
print("Error: Passwords do not match.")
return 1
if len(password) < 8:
print("Error: Password must be at least 8 characters.")
return 1
# Hash password and create user
password_hash = app.password_hasher.hash(password)
User.create(username=username, email=email, password_hash=password_hash)
print(f"User '{username}' created.")
return 0
def cmd_user_delete(args, app):
"""Delete a user."""
username = args.username
try:
user = get_user(username)
except DoesNotExist:
print(f"Error: User '{username}' not found.")
return 1
# Check for hostnames
hostname_count = Hostname.select().where(Hostname.user == user).count()
if hostname_count > 0:
print(f"Error: User has {hostname_count} hostname(s). Delete them first.")
return 1
user.delete_instance()
print(f"User '{username}' deleted.")
return 0
def cmd_user_passwd(args, app):
"""Change user password."""
username = args.username
try:
user = get_user(username)
except DoesNotExist:
print(f"Error: User '{username}' not found.")
return 1
password = getpass.getpass("New password: ")
password_confirm = getpass.getpass("Confirm password: ")
if password != password_confirm:
print("Error: Passwords do not match.")
return 1
if len(password) < 8:
print("Error: Password must be at least 8 characters.")
return 1
user.password_hash = app.password_hasher.hash(password)
user.save()
print(f"Password updated for '{username}'.")
return 0
def cmd_user_email(args, app):
"""Update user email."""
username = args.username
email = args.email
try:
user = get_user(username)
except DoesNotExist:
print(f"Error: User '{username}' not found.")
return 1
user.email = email
user.save()
print(f"Email updated for '{username}'.")
return 0
def cmd_hostname_list(args, app):
"""List hostnames."""
query = Hostname.select().join(User)
if args.user:
try:
user = get_user(args.user)
query = query.where(Hostname.user == user)
except DoesNotExist:
print(f"Error: User '{args.user}' not found.")
return 1
hostnames = list(query)
if not hostnames:
print("No hostnames found.")
return 0
print(
f"\n{'Hostname':<35} {'User':<15} {'Zone':<20} "
f"{'DNS-TTL':<8} {'Exp-TTL':<8} {'Last-Update IPv4':<25} {'Last-Update IPv6'}"
)
print("-" * 140)
for h in hostnames:
last_ipv4_update = datetime_str(h.last_ipv4_update)
last_ipv6_update = datetime_str(h.last_ipv6_update)
print(
f"{h.hostname:<35} {h.user.username:<15} {h.zone:<20} "
f"{h.dns_ttl:<8} {h.expiry_ttl:<8} {last_ipv4_update:<25} {last_ipv6_update}"
)
return 0
def cmd_hostname_add(args, app):
"""Add a hostname."""
username = args.username
try:
# Get user
try:
user = get_user(username)
except DoesNotExist:
print(f"Error: User '{username}' not found.")
return 1
# Check if hostname+zone exists
try:
hostname = get_hostname(args.hostname, args.zone)
print(f"Error: Hostname '{hostname.hostname}' in zone '{hostname.zone}' exists.")
return 1
except EncodingError as e:
print(f"Error: {e}")
return 1
except DoesNotExist:
pass
# Get TTLs from args or config defaults
config = app.config
dns_ttl = args.dns_ttl
if dns_ttl is None:
dns_ttl = config["defaults"]["dns_ttl"]
expiry_ttl = args.expiry_ttl
if expiry_ttl is None:
expiry_ttl = config["defaults"]["expiry_ttl"]
# Create hostname
hostname = Hostname.create(
user=user,
hostname=args.hostname,
zone=args.zone,
dns_ttl=dns_ttl,
expiry_ttl=expiry_ttl
)
print(
f"Hostname '{hostname.hostname}' in zone '{hostname.zone}' added "
f"for user '{username}'."
)
except DatabaseError as e:
print(f"Database error: {e}")
return 1
return 0
def cmd_hostname_delete(args, app):
"""Delete a hostname."""
try:
try:
hostname = get_hostname(args.hostname, args.zone)
except DoesNotExist:
hostname = encode_dnsname(args.hostname)
zone = encode_dnsname(args.zone)
print(f"Error: Hostname '{hostname}' in zone '{zone}' not found.")
return 1
except EncodingError as e:
print(f"Error: {e}")
return 1
# Delete DNS records if active
if hostname.last_ipv4 or hostname.last_ipv6:
# Initialize DNS service if not already
if app.dns_service is None:
try:
app.init_dns()
except Exception as e:
print(f"DNS init failed: {e}")
return 1
if hostname.last_ipv4:
try:
app.dns_service.delete_record(
hostname.hostname, hostname.zone, "A"
)
except Exception as e:
print(f"DNS delete failed: type=A error={e}")
return 1
if hostname.last_ipv6:
try:
app.dns_service.delete_record(
hostname.hostname, hostname.zone, "AAAA"
)
except Exception as e:
print(f"DNS delete failed: type=AAAA error={e}")
return 1
hostname.delete_instance()
print(f"Hostname '{hostname.hostname}' in zone '{hostname.zone}' deleted.")
except DatabaseError as e:
print(f"Database error: {e}")
return 1
return 0
def cmd_hostname_modify(args, app):
"""Modify hostname settings."""
try:
try:
hostname = get_hostname(args.hostname, args.zone)
except DoesNotExist:
hostname = encode_dnsname(args.hostname)
zone = encode_dnsname(args.zone)
print(f"Error: Hostname '{hostname}' in zone '{zone}' not found.")
return 1
except EncodingError as e:
print(f"Error: {e}")
return 1
# Get new TTLs
dns_ttl = args.dns_ttl if args.dns_ttl is not None else hostname.dns_ttl
expiry_ttl = args.expiry_ttl if args.expiry_ttl is not None else hostname.expiry_ttl
hostname.dns_ttl = dns_ttl
hostname.expiry_ttl = expiry_ttl
hostname.save()
print(
f"Hostname '{hostname.hostname}' in zone '{hostname.zone}' updated: "
f"dns_ttl={dns_ttl}, expiry_ttl={expiry_ttl}"
)
except DatabaseError as e:
print(f"Database error: {e}")
return 1
return 0
def cmd_cleanup(args, app):
"""Run cleanup manually."""
# Initialize services if not already
if app.dns_service is None:
try:
app.init_dns()
except Exception as e:
print(f"DNS init failed: {e}")
return 1
if app.email_service is None:
app.init_email()
try:
count = cleanup_expired(app)
print(f"Cleanup complete: {count} expired hostname(s) processed.")
except DatabaseError as e:
print(f"Database error: {e}")
return 1
except Exception as e:
print(f"Error: {e}")
return 1
return 0