abcd / api /auth.py
docs4you's picture
Upload 41 files
84121fd verified
from fastapi import APIRouter, HTTPException, Depends, Header
from pydantic import BaseModel
from cryptography.fernet import Fernet
import base64
import hashlib
import os
import secrets
from typing import Optional
router = APIRouter()
# Configuraci贸n AES
AES_SECRET = os.getenv("AES_SECRET", "default-secret-key-change-this")
# Generar clave Fernet desde el secreto
key = base64.urlsafe_b64encode(hashlib.sha256(AES_SECRET.encode()).digest())
cipher_suite = Fernet(key)
class AuthRequest(BaseModel):
code: str
class AuthResponse(BaseModel):
success: bool
token: Optional[str] = None
message: Optional[str] = None
def generate_user_token() -> str:
"""Genera un token 煤nico para el usuario"""
return secrets.token_urlsafe(32)
def validate_access_code(code: str) -> bool:
"""Valida el c贸digo de acceso usando AES"""
try:
# Lista de c贸digos v谩lidos (en producci贸n, estos estar铆an cifrados)
valid_codes = [
"FAMILY2024",
"DADDY123",
"FRIENDS24"
]
# Validaci贸n simple (en producci贸n usar cifrado AES)
return code.upper() in valid_codes
except Exception:
return False
async def get_current_user(authorization: Optional[str] = Header(None)) -> str:
"""Extrae y valida el token del usuario"""
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Token requerido")
token = authorization.split(" ")[1]
if not token:
raise HTTPException(status_code=401, detail="Token inv谩lido")
return token
@router.post("/auth", response_model=AuthResponse)
async def authenticate(request: AuthRequest):
"""Autentica un usuario con c贸digo de acceso"""
try:
if validate_access_code(request.code):
user_token = generate_user_token()
return AuthResponse(
success=True,
token=user_token,
message="Autenticaci贸n exitosa"
)
else:
return AuthResponse(
success=False,
message="C贸digo de acceso inv谩lido"
)
except Exception as e:
raise HTTPException(status_code=500, detail="Error interno del servidor")
@router.post("/logout")
async def logout(current_user: str = Depends(get_current_user)):
"""Cierra la sesi贸n del usuario"""
# Limpiar cach茅 de usuario si existe
from viewers import clear_user_session
clear_user_session(current_user)
return {"message": "Sesi贸n cerrada correctamente"}
@router.post("/ping")
async def ping(current_user: str = Depends(get_current_user)):
"""Verifica que el usuario est茅 autenticado"""
return {"status": "authenticated", "user": current_user[:8] + "..."}