|
from fastapi import APIRouter, HTTPException, Depends, Header |
|
from pydantic import BaseModel |
|
from cryptography.fernet import Fernet |
|
import base64 |
|
import hashlib |
|
import os |
|
import secrets |
|
from typing import Optional |
|
|
|
router = APIRouter() |
|
|
|
|
|
AES_SECRET = os.getenv("AES_SECRET", "default-secret-key-change-this") |
|
|
|
key = base64.urlsafe_b64encode(hashlib.sha256(AES_SECRET.encode()).digest()) |
|
cipher_suite = Fernet(key) |
|
|
|
class AuthRequest(BaseModel): |
|
code: str |
|
|
|
class AuthResponse(BaseModel): |
|
success: bool |
|
token: Optional[str] = None |
|
message: Optional[str] = None |
|
|
|
def generate_user_token() -> str: |
|
"""Genera un token 煤nico para el usuario""" |
|
return secrets.token_urlsafe(32) |
|
|
|
def validate_access_code(code: str) -> bool: |
|
"""Valida el c贸digo de acceso usando AES""" |
|
try: |
|
|
|
valid_codes = [ |
|
"FAMILY2024", |
|
"DADDY123", |
|
"FRIENDS24" |
|
] |
|
|
|
|
|
return code.upper() in valid_codes |
|
except Exception: |
|
return False |
|
|
|
async def get_current_user(authorization: Optional[str] = Header(None)) -> str: |
|
"""Extrae y valida el token del usuario""" |
|
if not authorization or not authorization.startswith("Bearer "): |
|
raise HTTPException(status_code=401, detail="Token requerido") |
|
|
|
token = authorization.split(" ")[1] |
|
if not token: |
|
raise HTTPException(status_code=401, detail="Token inv谩lido") |
|
|
|
return token |
|
|
|
@router.post("/auth", response_model=AuthResponse) |
|
async def authenticate(request: AuthRequest): |
|
"""Autentica un usuario con c贸digo de acceso""" |
|
try: |
|
if validate_access_code(request.code): |
|
user_token = generate_user_token() |
|
return AuthResponse( |
|
success=True, |
|
token=user_token, |
|
message="Autenticaci贸n exitosa" |
|
) |
|
else: |
|
return AuthResponse( |
|
success=False, |
|
message="C贸digo de acceso inv谩lido" |
|
) |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail="Error interno del servidor") |
|
|
|
@router.post("/logout") |
|
async def logout(current_user: str = Depends(get_current_user)): |
|
"""Cierra la sesi贸n del usuario""" |
|
|
|
from viewers import clear_user_session |
|
clear_user_session(current_user) |
|
|
|
return {"message": "Sesi贸n cerrada correctamente"} |
|
|
|
@router.post("/ping") |
|
async def ping(current_user: str = Depends(get_current_user)): |
|
"""Verifica que el usuario est茅 autenticado""" |
|
return {"status": "authenticated", "user": current_user[:8] + "..."} |