abcd / api /admin.py
docs4you's picture
Upload 41 files
84121fd verified
from fastapi import APIRouter, HTTPException, Depends, Header
from pydantic import BaseModel
import os
from typing import Optional
router = APIRouter()
class AdminUpdateRequest(BaseModel):
m3u_url: Optional[str] = None
action: str # "update_playlist", "clear_cache", etc.
def verify_admin_token(authorization: Optional[str] = Header(None)) -> bool:
"""Verifica si el token es de administrador"""
if not authorization or not authorization.startswith("Bearer "):
return False
token = authorization.split(" ")[1]
admin_code = os.getenv("ADMIN_CODE", "admin123")
return token == admin_code
@router.post("/update")
async def admin_update(
request: AdminUpdateRequest,
is_admin: bool = Depends(verify_admin_token)
):
"""Endpoint de administraci贸n para actualizaciones"""
if not is_admin:
raise HTTPException(status_code=403, detail="Acceso denegado")
try:
if request.action == "clear_cache":
# Limpiar cach茅 de M3U
from m3u_parser import m3u_cache
m3u_cache["data"] = None
m3u_cache["timestamp"] = None
# Limpiar cach茅 de visualizadores
from viewers import viewers_cache, channel_viewers
viewers_cache.clear()
channel_viewers.clear()
return {"message": "Cach茅 limpiado correctamente"}
elif request.action == "update_playlist" and request.m3u_url:
# Actualizar URL de playlist (requiere reinicio)
return {
"message": "URL actualizada. Reinicia el servidor para aplicar cambios.",
"new_url": request.m3u_url
}
else:
raise HTTPException(status_code=400, detail="Acci贸n no v谩lida")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error en operaci贸n admin: {str(e)}")
@router.get("/status")
async def admin_status(is_admin: bool = Depends(verify_admin_token)):
"""Estado del sistema para administradores"""
if not is_admin:
raise HTTPException(status_code=403, detail="Acceso denegado")
from m3u_parser import m3u_cache
from viewers import viewers_cache, channel_viewers
return {
"cache_status": {
"m3u_cached": m3u_cache["data"] is not None,
"cache_timestamp": m3u_cache["timestamp"].isoformat() if m3u_cache["timestamp"] else None,
"channels_count": len(m3u_cache["data"]) if m3u_cache["data"] else 0
},
"viewers_status": {
"active_users": len(viewers_cache),
"active_channels": len(channel_viewers),
"total_viewers": sum(len(viewers) for viewers in channel_viewers.values())
},
"config": {
"m3u_url_configured": bool(os.getenv("MAIN_M3U_URL")),
"admin_code_configured": bool(os.getenv("ADMIN_CODE")),
"base_url": os.getenv("BASE_URL", "not_configured")
}
}