from fastapi import APIRouter, HTTPException, Depends, Header from pydantic import BaseModel import os from typing import Optional router = APIRouter() class AdminUpdateRequest(BaseModel): m3u_url: Optional[str] = None action: str # "update_playlist", "clear_cache", etc. def verify_admin_token(authorization: Optional[str] = Header(None)) -> bool: """Verifica si el token es de administrador""" if not authorization or not authorization.startswith("Bearer "): return False token = authorization.split(" ")[1] admin_code = os.getenv("ADMIN_CODE", "admin123") return token == admin_code @router.post("/update") async def admin_update( request: AdminUpdateRequest, is_admin: bool = Depends(verify_admin_token) ): """Endpoint de administración para actualizaciones""" if not is_admin: raise HTTPException(status_code=403, detail="Acceso denegado") try: if request.action == "clear_cache": # Limpiar caché de M3U from m3u_parser import m3u_cache m3u_cache["data"] = None m3u_cache["timestamp"] = None # Limpiar caché de visualizadores from viewers import viewers_cache, channel_viewers viewers_cache.clear() channel_viewers.clear() return {"message": "Caché limpiado correctamente"} elif request.action == "update_playlist" and request.m3u_url: # Actualizar URL de playlist (requiere reinicio) return { "message": "URL actualizada. Reinicia el servidor para aplicar cambios.", "new_url": request.m3u_url } else: raise HTTPException(status_code=400, detail="Acción no válida") except Exception as e: raise HTTPException(status_code=500, detail=f"Error en operación admin: {str(e)}") @router.get("/status") async def admin_status(is_admin: bool = Depends(verify_admin_token)): """Estado del sistema para administradores""" if not is_admin: raise HTTPException(status_code=403, detail="Acceso denegado") from m3u_parser import m3u_cache from viewers import viewers_cache, channel_viewers return { "cache_status": { "m3u_cached": m3u_cache["data"] is not None, "cache_timestamp": m3u_cache["timestamp"].isoformat() if m3u_cache["timestamp"] else None, "channels_count": len(m3u_cache["data"]) if m3u_cache["data"] else 0 }, "viewers_status": { "active_users": len(viewers_cache), "active_channels": len(channel_viewers), "total_viewers": sum(len(viewers) for viewers in channel_viewers.values()) }, "config": { "m3u_url_configured": bool(os.getenv("MAIN_M3U_URL")), "admin_code_configured": bool(os.getenv("ADMIN_CODE")), "base_url": os.getenv("BASE_URL", "not_configured") } }