File size: 8,765 Bytes
5844082 a0c6b50 5844082 a0c6b50 5844082 c314dfe 5844082 c314dfe edd6ece c314dfe fcd7514 a0c6b50 5844082 a0c6b50 5844082 a0c6b50 c314dfe 5844082 c314dfe edd6ece c314dfe 5844082 a0c6b50 c314dfe 5844082 a54bb56 5844082 a54bb56 a0c6b50 c314dfe a0c6b50 5844082 c314dfe 5844082 c314dfe 5844082 c314dfe 5844082 a0c6b50 91cd81f 917d393 91cd81f 917d393 91cd81f 917d393 91cd81f 917d393 91cd81f 917d393 91cd81f 917d393 91cd81f |
|
import os
import logging
import asyncio
import aiohttp
from fastapi import APIRouter, HTTPException, Header, Query
from functools import lru_cache
from typing import List, Dict, Any, Optional
router = APIRouter()
# Configuração do Supabase
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co"
SUPABASE_KEY = os.getenv("SUPA_KEY")
if not SUPABASE_KEY:
raise ValueError("❌ SUPA_KEY não foi definido no ambiente!")
SUPABASE_HEADERS = {
"apikey": SUPABASE_KEY,
"Authorization": f"Bearer {SUPABASE_KEY}",
"Content-Type": "application/json"
}
# Configuração do logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Cache para reduzir chamadas repetidas
@lru_cache(maxsize=128)
def get_cached_admin_status(user_id: str) -> bool:
"""Obtém e armazena em cache se um usuário é admin"""
import requests
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}"
response = requests.get(user_data_url, headers=SUPABASE_HEADERS)
if response.status_code != 200 or not response.json():
return False
user_info = response.json()[0]
return user_info.get("is_admin", False)
async def verify_admin_token(user_token: str) -> str:
"""Verifica se o token pertence a um administrador de forma assíncrona"""
headers = {
"Authorization": f"Bearer {user_token}",
"apikey": SUPABASE_KEY,
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response:
if response.status != 200:
raise HTTPException(status_code=401, detail="Token inválido ou expirado")
user_data = await response.json()
user_id = user_data.get("id")
if not user_id:
raise HTTPException(status_code=400, detail="ID do usuário não encontrado")
is_admin = await asyncio.to_thread(get_cached_admin_status, user_id)
if not is_admin:
raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários")
return user_id
async def get_recent_users(limit: int = 50, search: Optional[str] = None, page: int = 0) -> Dict[str, Any]:
"""Obtém os usuários mais recentes da plataforma, com filtro opcional por nome e paginação"""
try:
offset = page * limit
limit_plus_one = limit + 1
query = f"{SUPABASE_URL}/rest/v1/User?select=id,fee,name,avatar,role,blurhash&order=created_at.desc&limit={limit_plus_one}&offset={offset}"
if search:
search_term = search.replace("'", "''")
query += f"&name=ilike.*{search_term}*"
headers = SUPABASE_HEADERS.copy()
headers["Accept"] = "application/json; charset=utf-8"
async with aiohttp.ClientSession() as session:
async with session.get(query, headers=headers) as response:
if response.status != 200:
logger.error(f"❌ Erro ao obter usuários: {response.status}")
return {"users": [], "has_next_page": False}
text = await response.text(encoding='utf-8')
import json
users_data = json.loads(text)
has_next_page = len(users_data) > limit
users = users_data[:limit]
return {
"users": users,
"has_next_page": has_next_page
}
except Exception as e:
logger.error(f"❌ Erro ao obter usuários: {str(e)}")
return {"users": [], "has_next_page": False}
@router.get("/admin/users")
async def get_recent_users_endpoint(
user_token: str = Header(None, alias="User-key"),
limit: int = Query(50, ge=1, le=100),
page: int = Query(0, ge=0),
search: Optional[str] = Query(None)
):
"""
Endpoint para obter os usuários mais recentes da plataforma,
com suporte a busca por nome e paginação.
"""
try:
user_id = await verify_admin_token(user_token)
result = await get_recent_users(limit, search, page)
return {
"users": result["users"],
"count": len(result["users"]),
"has_next_page": result["has_next_page"]
}
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro ao obter usuários: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/admin/user")
async def get_user_name(
user_id: str = Query(..., description="ID do usuário"),
user_token: str = Header(None, alias="User-key")
):
"""
Endpoint para obter informações de um usuário específico a partir do ID,
incluindo seus feeds e uma imagem de portfólio para cada feed.
"""
try:
await verify_admin_token(user_token)
# Buscar informações básicas do usuário
user_query = f"{SUPABASE_URL}/rest/v1/User?select=name,avatar,role&id=eq.{user_id}"
# Buscar feeds do usuário
feeds_query = f"{SUPABASE_URL}/rest/v1/Feeds?select=id,portfolios,created_at,description,urls,user_id&user_id=eq.{user_id}&order=created_at.desc"
headers = SUPABASE_HEADERS.copy()
headers["Accept"] = "application/json; charset=utf-8"
async with aiohttp.ClientSession() as session:
# Fazer as requisições em paralelo
user_task = session.get(user_query, headers=headers)
feeds_task = session.get(feeds_query, headers=headers)
async with user_task as user_response, feeds_task as feeds_response:
if user_response.status != 200 or feeds_response.status != 200:
status = user_response.status if user_response.status != 200 else feeds_response.status
raise HTTPException(status_code=status, detail="Erro ao consultar o Supabase")
user_data = await user_response.json()
feeds_data = await feeds_response.json()
if not user_data:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
# Processar feeds para buscar imagens de portfólio
feeds_with_images = []
for feed in feeds_data:
feed_info = {
"id": feed["id"],
"created_at": feed["created_at"],
"description": feed["description"],
"urls": feed["urls"],
"portfolios": feed["portfolios"]
}
# Verificar se há portfolios associados
if feed["portfolios"] and len(feed["portfolios"]) > 0:
# Pegar o primeiro portfolio do array
first_portfolio_id = feed["portfolios"][0]
# Buscar informações da imagem deste portfolio
portfolio_query = f"{SUPABASE_URL}/rest/v1/Portfolio?select=image_url,blurhash,width,height&id=eq.{first_portfolio_id}"
async with session.get(portfolio_query, headers=headers) as portfolio_response:
if portfolio_response.status == 200:
portfolio_data = await portfolio_response.json()
if portfolio_data and len(portfolio_data) > 0:
feed_info["thumbnail"] = {
"image_url": portfolio_data[0]["image_url"],
"blurhash": portfolio_data[0]["blurhash"],
"width": portfolio_data[0]["width"],
"height": portfolio_data[0]["height"]
}
feeds_with_images.append(feed_info)
# Construir a resposta completa
response = {
"user": user_data[0],
"feeds": feeds_with_images,
"feeds_count": len(feeds_with_images)
}
return response
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro ao buscar usuário e feeds: {str(e)}")
raise HTTPException(status_code=500, detail="Erro interno do servidor") |