abcd / api /viewers.py
docs4you's picture
Upload 41 files
84121fd verified
raw
history blame
4.22 kB
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from typing import Dict, Optional
from datetime import datetime, timedelta
from auth import get_current_user
router = APIRouter()
# Cache de visualizadores en memoria
# Estructura: {user_token: {"channel_url": str, "timestamp": datetime}}
viewers_cache: Dict[str, Dict] = {}
# Contador de visualizadores por canal
# Estructura: {channel_url: set(user_tokens)}
channel_viewers: Dict[str, set] = {}
class ViewerRequest(BaseModel):
channel_url: str
class ViewerResponse(BaseModel):
viewers: int
switched: bool
message: Optional[str] = None
def cleanup_old_viewers():
"""Limpia visualizadores inactivos (m谩s de 5 minutos)"""
cutoff_time = datetime.now() - timedelta(minutes=5)
# Limpiar viewers_cache
expired_users = []
for user_token, data in viewers_cache.items():
if data.get("timestamp", datetime.min) < cutoff_time:
expired_users.append(user_token)
for user_token in expired_users:
old_channel = viewers_cache[user_token].get("channel_url")
if old_channel and old_channel in channel_viewers:
channel_viewers[old_channel].discard(user_token)
if not channel_viewers[old_channel]:
del channel_viewers[old_channel]
del viewers_cache[user_token]
def clear_user_session(user_token: str):
"""Limpia la sesi贸n de un usuario espec铆fico"""
if user_token in viewers_cache:
old_channel = viewers_cache[user_token].get("channel_url")
if old_channel and old_channel in channel_viewers:
channel_viewers[old_channel].discard(user_token)
if not channel_viewers[old_channel]:
del channel_viewers[old_channel]
del viewers_cache[user_token]
@router.post("/viewers", response_model=ViewerResponse)
async def set_viewing(
request: ViewerRequest,
current_user: str = Depends(get_current_user)
):
"""Registra que un usuario est谩 viendo un canal"""
# Limpiar visualizadores antiguos
cleanup_old_viewers()
channel_url = request.channel_url
switched = False
message = None
# Verificar si el usuario ya est谩 viendo otro canal
if current_user in viewers_cache:
old_channel = viewers_cache[current_user].get("channel_url")
if old_channel and old_channel != channel_url:
# Usuario cambia de canal
switched = True
message = "Canal cambiado autom谩ticamente"
# Remover del canal anterior
if old_channel in channel_viewers:
channel_viewers[old_channel].discard(current_user)
if not channel_viewers[old_channel]:
del channel_viewers[old_channel]
# Registrar en el nuevo canal
viewers_cache[current_user] = {
"channel_url": channel_url,
"timestamp": datetime.now()
}
# Actualizar contador del canal
if channel_url not in channel_viewers:
channel_viewers[channel_url] = set()
channel_viewers[channel_url].add(current_user)
# Contar visualizadores actuales
viewer_count = len(channel_viewers.get(channel_url, set()))
return ViewerResponse(
viewers=viewer_count,
switched=switched,
message=message
)
@router.get("/viewers/{channel_id}")
async def get_viewer_count(
channel_id: str,
current_user: str = Depends(get_current_user)
):
"""Obtiene el n煤mero de visualizadores de un canal"""
# Limpiar visualizadores antiguos
cleanup_old_viewers()
# Buscar el canal por ID (simplificado, en producci贸n usar base de datos)
viewer_count = 0
for channel_url, viewers in channel_viewers.items():
if channel_id in channel_url or channel_url.endswith(channel_id):
viewer_count = len(viewers)
break
return {"viewers": viewer_count}
@router.delete("/viewers")
async def stop_viewing(current_user: str = Depends(get_current_user)):
"""Detiene la visualizaci贸n actual del usuario"""
clear_user_session(current_user)
return {"message": "Visualizaci贸n detenida"}