connect / routes /users.py
habulaj's picture
Update routes/users.py
c9b4649 verified
raw
history blame
20.2 kB
import os
import logging
import asyncio
import aiohttp
from pydantic import BaseModel
from fastapi import APIRouter, HTTPException, Header, Query
from functools import lru_cache
from typing import List, Dict, Any, Optional
router = APIRouter()
# Configuração do Supabase
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co"
SUPABASE_KEY = os.getenv("SUPA_KEY")
SUPABASE_ROLE_KEY = os.getenv("SUPA_SERVICE_KEY")
class FeedDeleteRequest(BaseModel):
feed_id: int
if not SUPABASE_KEY or not SUPABASE_ROLE_KEY:
raise ValueError("❌ SUPA_KEY ou SUPA_SERVICE_KEY não foram definidos no ambiente!")
SUPABASE_HEADERS = {
"apikey": SUPABASE_KEY,
"Authorization": f"Bearer {SUPABASE_KEY}",
"Content-Type": "application/json"
}
SUPABASE_ROLE_HEADERS = {
"apikey": SUPABASE_ROLE_KEY,
"Authorization": f"Bearer {SUPABASE_ROLE_KEY}",
"Content-Type": "application/json"
}
# Configuração do logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Cache para reduzir chamadas repetidas
@lru_cache(maxsize=128)
def get_cached_admin_status(user_id: str) -> bool:
"""Obtém e armazena em cache se um usuário é admin"""
import requests
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}"
response = requests.get(user_data_url, headers=SUPABASE_HEADERS)
if response.status_code != 200 or not response.json():
return False
user_info = response.json()[0]
return user_info.get("is_admin", False)
# Nova função para verificar permissões de usuário
async def get_user_permissions(user_id: str) -> Dict[str, bool]:
"""Obtém as permissões de um usuário"""
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=is_admin,view_users,manage_users"
async with aiohttp.ClientSession() as session:
async with session.get(user_data_url, headers=SUPABASE_HEADERS) as response:
if response.status != 200 or not await response.json():
return {"is_admin": False, "view_users": False, "manage_users": False}
user_info = (await response.json())[0]
return {
"is_admin": user_info.get("is_admin", False),
"view_users": user_info.get("view_users", False),
"manage_users": user_info.get("manage_users", False)
}
async def verify_admin_token(user_token: str) -> str:
"""Verifica se o token pertence a um administrador de forma assíncrona"""
headers = {
"Authorization": f"Bearer {user_token}",
"apikey": SUPABASE_KEY,
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response:
if response.status != 200:
raise HTTPException(status_code=401, detail="Token inválido ou expirado")
user_data = await response.json()
user_id = user_data.get("id")
if not user_id:
raise HTTPException(status_code=400, detail="ID do usuário não encontrado")
is_admin = await asyncio.to_thread(get_cached_admin_status, user_id)
if not is_admin:
raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários")
return user_id
# Nova função para verificar token e retornar ID do usuário e permissões
async def verify_token_with_permissions(user_token: str, required_permission: Optional[str] = None) -> Dict[str, Any]:
"""Verifica o token e retorna ID do usuário e suas permissões"""
headers = {
"Authorization": f"Bearer {user_token}",
"apikey": SUPABASE_KEY,
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response:
if response.status != 200:
raise HTTPException(status_code=401, detail="Token inválido ou expirado")
user_data = await response.json()
user_id = user_data.get("id")
if not user_id:
raise HTTPException(status_code=400, detail="ID do usuário não encontrado")
# Obter permissões do usuário
permissions = await get_user_permissions(user_id)
# Verificar se é admin
is_admin = permissions.get("is_admin", False)
# Verificar permissão específica, se requisitada
if required_permission:
has_permission = permissions.get(required_permission, False)
if not has_permission:
raise HTTPException(
status_code=403,
detail=f"Acesso negado: permissão '{required_permission}' necessária"
)
return {
"user_id": user_id,
"is_admin": is_admin,
"permissions": permissions
}
async def get_recent_users(limit: int = 50, search: Optional[str] = None, page: int = 0) -> Dict[str, Any]:
"""Obtém os usuários mais recentes da plataforma, com filtro opcional por nome e paginação"""
try:
offset = page * limit
limit_plus_one = limit + 1
# Adicionada condição para filtrar apenas usuários não-admin
query = f"{SUPABASE_URL}/rest/v1/User?select=id,name,avatar,role,blurhash&is_admin=eq.false&order=created_at.desc&limit={limit_plus_one}&offset={offset}"
if search:
search_term = search.replace("'", "''")
query += f"&name=ilike.*{search_term}*"
headers = SUPABASE_HEADERS.copy()
headers["Accept"] = "application/json; charset=utf-8"
async with aiohttp.ClientSession() as session:
async with session.get(query, headers=headers) as response:
if response.status != 200:
logger.error(f"❌ Erro ao obter usuários: {response.status}")
return {"users": [], "has_next_page": False}
text = await response.text(encoding='utf-8')
import json
users_data = json.loads(text)
has_next_page = len(users_data) > limit
users = users_data[:limit]
return {
"users": users,
"has_next_page": has_next_page
}
except Exception as e:
logger.error(f"❌ Erro ao obter usuários: {str(e)}")
return {"users": [], "has_next_page": False}
@router.get("/admin/users")
async def get_recent_users_endpoint(
user_token: str = Header(None, alias="User-key"),
limit: int = Query(50, ge=1, le=100),
page: int = Query(0, ge=0),
search: Optional[str] = Query(None)
):
"""
Endpoint para obter os usuários mais recentes da plataforma,
com suporte a busca por nome e paginação.
"""
try:
# Verificar se o usuário tem permissão para visualizar usuários
user_info = await verify_token_with_permissions(user_token, "view_users")
result = await get_recent_users(limit, search, page)
return {
"users": result["users"],
"count": len(result["users"]),
"has_next_page": result["has_next_page"]
}
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro ao obter usuários: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/admin/update-user")
async def update_user(
request: Dict[str, Any],
user_token: str = Header(None, alias="User-key")
):
"""
Endpoint para atualizar informações de um usuário específico.
Atualmente suporta atualização de name e fee.
Usa a chave de serviço para bypassar RLS do Supabase.
"""
try:
# Verificar se o usuário tem permissão para gerenciar usuários
user_info = await verify_token_with_permissions(user_token, "manage_users")
admin_id = user_info["user_id"]
# Validar os parâmetros da requisição
user_id = request.get("user_id")
name = request.get("name")
fee = request.get("fee")
if not user_id:
raise HTTPException(status_code=400, detail="ID do usuário é obrigatório")
if name is None and fee is None:
raise HTTPException(status_code=400, detail="Pelo menos um campo para atualização (name ou fee) deve ser fornecido")
# Verificar se o usuário existe - usando os headers normais para consulta
user_query = f"{SUPABASE_URL}/rest/v1/User?select=name,fee&id=eq.{user_id}"
async with aiohttp.ClientSession() as session:
async with session.get(user_query, headers=SUPABASE_HEADERS) as response:
if response.status != 200:
logger.error(f"❌ Erro ao verificar usuário: {response.status}")
raise HTTPException(status_code=response.status, detail="Erro ao consultar usuário")
user_data = await response.json()
if not user_data:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
current_user = user_data[0]
# Preparar os campos para atualização, apenas se forem diferentes
update_fields = {}
if name is not None and name != current_user.get("name"):
update_fields["name"] = name
if fee is not None and isinstance(fee, int) and fee != current_user.get("fee"):
update_fields["fee"] = fee
# Se não há campos para atualizar, retornar sem modificar
if not update_fields:
return {"message": "Nenhuma alteração necessária", "user_id": user_id}
# Atualizar o usuário no Supabase usando a chave de serviço para bypassar RLS
update_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}"
headers_with_prefer = SUPABASE_ROLE_HEADERS.copy()
headers_with_prefer["Prefer"] = "return=representation"
async with session.patch(update_url, json=update_fields, headers=headers_with_prefer) as update_response:
if update_response.status != 200:
logger.error(f"❌ Erro ao atualizar usuário: {update_response.status} - {await update_response.text()}")
raise HTTPException(status_code=update_response.status, detail="Erro ao atualizar usuário")
updated_user_data = await update_response.json()
raw_user = updated_user_data[0] if updated_user_data else {}
updated_user = {
"name": raw_user.get("name"),
"avatar": raw_user.get("avatar"),
"role": raw_user.get("role"),
"fee": raw_user.get("fee")
}
logger.info(f"✅ Usuário {user_id} atualizado com sucesso por {admin_id}: {update_fields}")
return {
"message": "Usuário atualizado com sucesso",
"user": updated_user,
"updated_by": admin_id
}
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro ao atualizar usuário: {str(e)}")
raise HTTPException(status_code=500, detail="Erro interno do servidor")
@router.post("/admin/delete-feed")
async def delete_feed(
request: FeedDeleteRequest,
user_token: str = Header(None, alias="User-key")
):
"""
Deleta um feed, seus portfolios relacionados e as imagens no bucket.
"""
try:
await verify_admin_token(user_token)
feed_id = request.feed_id
headers = SUPABASE_ROLE_HEADERS.copy()
headers["Accept"] = "application/json"
async with aiohttp.ClientSession() as session:
# Buscar o feed pelo ID
feed_query = f"{SUPABASE_URL}/rest/v1/Feeds?select=portfolios&limit=1&id=eq.{feed_id}"
async with session.get(feed_query, headers=headers) as feed_response:
if feed_response.status != 200:
raise HTTPException(status_code=feed_response.status, detail="Erro ao buscar feed")
feed_data = await feed_response.json()
if not feed_data:
raise HTTPException(status_code=404, detail="Feed não encontrado")
portfolio_ids = feed_data[0].get("portfolios", [])
if not isinstance(portfolio_ids, list):
import json
portfolio_ids = json.loads(portfolio_ids) # Caso venha como string JSON
# Buscar os portfolios para pegar os URLs das imagens
image_urls = []
if portfolio_ids:
ids_str = ",".join([str(pid) for pid in portfolio_ids])
portfolio_query = f"{SUPABASE_URL}/rest/v1/Portfolio?select=id,image_url&id=in.({ids_str})"
async with session.get(portfolio_query, headers=headers) as portfolio_response:
if portfolio_response.status == 200:
portfolio_data = await portfolio_response.json()
image_urls = [item["image_url"] for item in portfolio_data if "image_url" in item]
# Deletar os portfolios
if portfolio_ids:
delete_portfolios_url = f"{SUPABASE_URL}/rest/v1/Portfolio?id=in.({','.join(map(str, portfolio_ids))})"
async with session.delete(delete_portfolios_url, headers=headers) as delete_response:
if delete_response.status != 204:
raise HTTPException(status_code=delete_response.status, detail="Erro ao deletar portfolios")
# Deletar imagens do bucket Supabase Storage
async def delete_image_from_storage(image_url: str):
from urllib.parse import urlparse
path = urlparse(image_url).path
# Extraindo bucket e key
parts = path.strip("/").split("/")
if len(parts) < 3:
return
bucket_id = parts[1]
file_key = "/".join(parts[2:])
delete_url = f"{SUPABASE_URL}/storage/v1/object/{bucket_id}/{file_key}"
async with session.delete(delete_url, headers=headers) as delete_img_response:
if delete_img_response.status not in (200, 204):
logger.warning(f"❌ Falha ao deletar imagem: {file_key}")
await asyncio.gather(*[delete_image_from_storage(url) for url in image_urls])
# Deletar o feed
delete_feed_url = f"{SUPABASE_URL}/rest/v1/Feeds?id=eq.{feed_id}"
async with session.delete(delete_feed_url, headers=headers) as delete_feed_response:
if delete_feed_response.status != 204:
raise HTTPException(status_code=delete_feed_response.status, detail="Erro ao deletar feed")
logger.info(f"✅ Feed {feed_id} e portfolios relacionados deletados com sucesso.")
return {"message": f"Feed {feed_id} e portfolios deletados com sucesso."}
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro ao deletar feed: {str(e)}")
raise HTTPException(status_code=500, detail="Erro interno do servidor")
@router.get("/admin/user")
async def get_user_name(
user_id: str = Query(..., description="ID do usuário"),
user_token: str = Header(None, alias="User-key")
):
"""
Endpoint para obter informações de um usuário específico a partir do ID,
incluindo seus feeds e uma imagem de portfólio para cada feed.
"""
try:
# Verificar se o usuário tem permissão para visualizar usuários
user_info = await verify_token_with_permissions(user_token, "view_users")
# Buscar informações básicas do usuário
user_query = f"{SUPABASE_URL}/rest/v1/User?select=name,avatar,role,fee&id=eq.{user_id}"
# Buscar feeds do usuário
feeds_query = f"{SUPABASE_URL}/rest/v1/Feeds?select=id,portfolios,created_at,description,urls,user_id&user_id=eq.{user_id}&order=created_at.desc"
headers = SUPABASE_HEADERS.copy()
headers["Accept"] = "application/json; charset=utf-8"
async with aiohttp.ClientSession() as session:
# Fazer as requisições em paralelo
user_task = session.get(user_query, headers=headers)
feeds_task = session.get(feeds_query, headers=headers)
async with user_task as user_response, feeds_task as feeds_response:
if user_response.status != 200 or feeds_response.status != 200:
status = user_response.status if user_response.status != 200 else feeds_response.status
raise HTTPException(status_code=status, detail="Erro ao consultar o Supabase")
user_data = await user_response.json()
feeds_data = await feeds_response.json()
if not user_data:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
# Processar feeds para buscar imagens de portfólio
feeds_with_images = []
for feed in feeds_data:
feed_info = {
"id": feed["id"],
"created_at": feed["created_at"],
"description": feed["description"],
"urls": feed["urls"],
"portfolios": feed["portfolios"]
}
# Verificar se há portfolios associados
if feed["portfolios"] and len(feed["portfolios"]) > 0:
# Pegar o primeiro portfolio do array
first_portfolio_id = feed["portfolios"][0]
# Buscar informações da imagem deste portfolio
portfolio_query = f"{SUPABASE_URL}/rest/v1/Portfolio?select=image_url,blurhash,width,height&id=eq.{first_portfolio_id}"
async with session.get(portfolio_query, headers=headers) as portfolio_response:
if portfolio_response.status == 200:
portfolio_data = await portfolio_response.json()
if portfolio_data and len(portfolio_data) > 0:
feed_info["thumbnail"] = {
"image_url": portfolio_data[0]["image_url"],
"blurhash": portfolio_data[0]["blurhash"],
"width": portfolio_data[0]["width"],
"height": portfolio_data[0]["height"]
}
feeds_with_images.append(feed_info)
# Construir a resposta completa
response = {
"user": user_data[0],
"feeds": feeds_with_images,
"feeds_count": len(feeds_with_images)
}
return response
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro ao buscar usuário e feeds: {str(e)}")
raise HTTPException(status_code=500, detail="Erro interno do servidor")