File size: 8,765 Bytes
5844082 a0c6b50 5844082 a0c6b50 5844082 c314dfe 5844082 c314dfe edd6ece c314dfe fcd7514 a0c6b50 5844082 a0c6b50 5844082 a0c6b50 c314dfe 5844082 c314dfe edd6ece c314dfe 5844082 a0c6b50 c314dfe 5844082 a54bb56 5844082 a54bb56 a0c6b50 c314dfe a0c6b50 5844082 c314dfe 5844082 c314dfe 5844082 c314dfe 5844082 a0c6b50 91cd81f 917d393 91cd81f 917d393 91cd81f 917d393 91cd81f 917d393 91cd81f 917d393 91cd81f 917d393 91cd81f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
import os
import logging
import asyncio
import aiohttp
from fastapi import APIRouter, HTTPException, Header, Query
from functools import lru_cache
from typing import List, Dict, Any, Optional
router = APIRouter()
# Configuração do Supabase
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co"
SUPABASE_KEY = os.getenv("SUPA_KEY")
if not SUPABASE_KEY:
raise ValueError("❌ SUPA_KEY não foi definido no ambiente!")
SUPABASE_HEADERS = {
"apikey": SUPABASE_KEY,
"Authorization": f"Bearer {SUPABASE_KEY}",
"Content-Type": "application/json"
}
# Configuração do logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Cache para reduzir chamadas repetidas
@lru_cache(maxsize=128)
def get_cached_admin_status(user_id: str) -> bool:
"""Obtém e armazena em cache se um usuário é admin"""
import requests
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}"
response = requests.get(user_data_url, headers=SUPABASE_HEADERS)
if response.status_code != 200 or not response.json():
return False
user_info = response.json()[0]
return user_info.get("is_admin", False)
async def verify_admin_token(user_token: str) -> str:
"""Verifica se o token pertence a um administrador de forma assíncrona"""
headers = {
"Authorization": f"Bearer {user_token}",
"apikey": SUPABASE_KEY,
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response:
if response.status != 200:
raise HTTPException(status_code=401, detail="Token inválido ou expirado")
user_data = await response.json()
user_id = user_data.get("id")
if not user_id:
raise HTTPException(status_code=400, detail="ID do usuário não encontrado")
is_admin = await asyncio.to_thread(get_cached_admin_status, user_id)
if not is_admin:
raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários")
return user_id
async def get_recent_users(limit: int = 50, search: Optional[str] = None, page: int = 0) -> Dict[str, Any]:
"""Obtém os usuários mais recentes da plataforma, com filtro opcional por nome e paginação"""
try:
offset = page * limit
limit_plus_one = limit + 1
query = f"{SUPABASE_URL}/rest/v1/User?select=id,fee,name,avatar,role,blurhash&order=created_at.desc&limit={limit_plus_one}&offset={offset}"
if search:
search_term = search.replace("'", "''")
query += f"&name=ilike.*{search_term}*"
headers = SUPABASE_HEADERS.copy()
headers["Accept"] = "application/json; charset=utf-8"
async with aiohttp.ClientSession() as session:
async with session.get(query, headers=headers) as response:
if response.status != 200:
logger.error(f"❌ Erro ao obter usuários: {response.status}")
return {"users": [], "has_next_page": False}
text = await response.text(encoding='utf-8')
import json
users_data = json.loads(text)
has_next_page = len(users_data) > limit
users = users_data[:limit]
return {
"users": users,
"has_next_page": has_next_page
}
except Exception as e:
logger.error(f"❌ Erro ao obter usuários: {str(e)}")
return {"users": [], "has_next_page": False}
@router.get("/admin/users")
async def get_recent_users_endpoint(
user_token: str = Header(None, alias="User-key"),
limit: int = Query(50, ge=1, le=100),
page: int = Query(0, ge=0),
search: Optional[str] = Query(None)
):
"""
Endpoint para obter os usuários mais recentes da plataforma,
com suporte a busca por nome e paginação.
"""
try:
user_id = await verify_admin_token(user_token)
result = await get_recent_users(limit, search, page)
return {
"users": result["users"],
"count": len(result["users"]),
"has_next_page": result["has_next_page"]
}
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro ao obter usuários: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/admin/user")
async def get_user_name(
user_id: str = Query(..., description="ID do usuário"),
user_token: str = Header(None, alias="User-key")
):
"""
Endpoint para obter informações de um usuário específico a partir do ID,
incluindo seus feeds e uma imagem de portfólio para cada feed.
"""
try:
await verify_admin_token(user_token)
# Buscar informações básicas do usuário
user_query = f"{SUPABASE_URL}/rest/v1/User?select=name,avatar,role&id=eq.{user_id}"
# Buscar feeds do usuário
feeds_query = f"{SUPABASE_URL}/rest/v1/Feeds?select=id,portfolios,created_at,description,urls,user_id&user_id=eq.{user_id}&order=created_at.desc"
headers = SUPABASE_HEADERS.copy()
headers["Accept"] = "application/json; charset=utf-8"
async with aiohttp.ClientSession() as session:
# Fazer as requisições em paralelo
user_task = session.get(user_query, headers=headers)
feeds_task = session.get(feeds_query, headers=headers)
async with user_task as user_response, feeds_task as feeds_response:
if user_response.status != 200 or feeds_response.status != 200:
status = user_response.status if user_response.status != 200 else feeds_response.status
raise HTTPException(status_code=status, detail="Erro ao consultar o Supabase")
user_data = await user_response.json()
feeds_data = await feeds_response.json()
if not user_data:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
# Processar feeds para buscar imagens de portfólio
feeds_with_images = []
for feed in feeds_data:
feed_info = {
"id": feed["id"],
"created_at": feed["created_at"],
"description": feed["description"],
"urls": feed["urls"],
"portfolios": feed["portfolios"]
}
# Verificar se há portfolios associados
if feed["portfolios"] and len(feed["portfolios"]) > 0:
# Pegar o primeiro portfolio do array
first_portfolio_id = feed["portfolios"][0]
# Buscar informações da imagem deste portfolio
portfolio_query = f"{SUPABASE_URL}/rest/v1/Portfolio?select=image_url,blurhash,width,height&id=eq.{first_portfolio_id}"
async with session.get(portfolio_query, headers=headers) as portfolio_response:
if portfolio_response.status == 200:
portfolio_data = await portfolio_response.json()
if portfolio_data and len(portfolio_data) > 0:
feed_info["thumbnail"] = {
"image_url": portfolio_data[0]["image_url"],
"blurhash": portfolio_data[0]["blurhash"],
"width": portfolio_data[0]["width"],
"height": portfolio_data[0]["height"]
}
feeds_with_images.append(feed_info)
# Construir a resposta completa
response = {
"user": user_data[0],
"feeds": feeds_with_images,
"feeds_count": len(feeds_with_images)
}
return response
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro ao buscar usuário e feeds: {str(e)}")
raise HTTPException(status_code=500, detail="Erro interno do servidor") |