from fastapi import APIRouter, HTTPException, Depends, Header from pydantic import BaseModel from cryptography.fernet import Fernet import base64 import hashlib import os import secrets from typing import Optional router = APIRouter() # Configuración AES AES_SECRET = os.getenv("AES_SECRET", "default-secret-key-change-this") # Generar clave Fernet desde el secreto key = base64.urlsafe_b64encode(hashlib.sha256(AES_SECRET.encode()).digest()) cipher_suite = Fernet(key) class AuthRequest(BaseModel): code: str class AuthResponse(BaseModel): success: bool token: Optional[str] = None message: Optional[str] = None def generate_user_token() -> str: """Genera un token único para el usuario""" return secrets.token_urlsafe(32) def validate_access_code(code: str) -> bool: """Valida el código de acceso usando AES""" try: # Lista de códigos válidos (en producción, estos estarían cifrados) valid_codes = [ "FAMILY2024", "DADDY123", "FRIENDS24" ] # Validación simple (en producción usar cifrado AES) return code.upper() in valid_codes except Exception: return False async def get_current_user(authorization: Optional[str] = Header(None)) -> str: """Extrae y valida el token del usuario""" if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Token requerido") token = authorization.split(" ")[1] if not token: raise HTTPException(status_code=401, detail="Token inválido") return token @router.post("/auth", response_model=AuthResponse) async def authenticate(request: AuthRequest): """Autentica un usuario con código de acceso""" try: if validate_access_code(request.code): user_token = generate_user_token() return AuthResponse( success=True, token=user_token, message="Autenticación exitosa" ) else: return AuthResponse( success=False, message="Código de acceso inválido" ) except Exception as e: raise HTTPException(status_code=500, detail="Error interno del servidor") @router.post("/logout") async def logout(current_user: str = Depends(get_current_user)): """Cierra la sesión del usuario""" # Limpiar caché de usuario si existe from viewers import clear_user_session clear_user_session(current_user) return {"message": "Sesión cerrada correctamente"} @router.post("/ping") async def ping(current_user: str = Depends(get_current_user)): """Verifica que el usuario esté autenticado""" return {"status": "authenticated", "user": current_user[:8] + "..."}