import os import logging import asyncio import aiohttp from pydantic import BaseModel from fastapi import APIRouter, HTTPException, Header, Query from functools import lru_cache from typing import List, Dict, Any, Optional router = APIRouter() # Configuração do Supabase SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co" SUPABASE_KEY = os.getenv("SUPA_KEY") SUPABASE_ROLE_KEY = os.getenv("SUPA_SERVICE_KEY") class FeedDeleteRequest(BaseModel): feed_id: int if not SUPABASE_KEY or not SUPABASE_ROLE_KEY: raise ValueError("❌ SUPA_KEY ou SUPA_SERVICE_KEY não foram definidos no ambiente!") SUPABASE_HEADERS = { "apikey": SUPABASE_KEY, "Authorization": f"Bearer {SUPABASE_KEY}", "Content-Type": "application/json" } SUPABASE_ROLE_HEADERS = { "apikey": SUPABASE_ROLE_KEY, "Authorization": f"Bearer {SUPABASE_ROLE_KEY}", "Content-Type": "application/json" } # Configuração do logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Cache para reduzir chamadas repetidas @lru_cache(maxsize=128) def get_cached_admin_status(user_id: str) -> bool: """Obtém e armazena em cache se um usuário é admin""" import requests user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}" response = requests.get(user_data_url, headers=SUPABASE_HEADERS) if response.status_code != 200 or not response.json(): return False user_info = response.json()[0] return user_info.get("is_admin", False) # Nova função para verificar permissões de usuário async def get_user_permissions(user_id: str) -> Dict[str, bool]: """Obtém as permissões de um usuário""" user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=is_admin,view_users,manage_users" async with aiohttp.ClientSession() as session: async with session.get(user_data_url, headers=SUPABASE_HEADERS) as response: if response.status != 200 or not await response.json(): return {"is_admin": False, "view_users": False, "manage_users": False} user_info = (await response.json())[0] return { "is_admin": user_info.get("is_admin", False), "view_users": user_info.get("view_users", False), "manage_users": user_info.get("manage_users", False) } async def verify_admin_token(user_token: str) -> str: """Verifica se o token pertence a um administrador de forma assíncrona""" headers = { "Authorization": f"Bearer {user_token}", "apikey": SUPABASE_KEY, "Content-Type": "application/json" } async with aiohttp.ClientSession() as session: async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response: if response.status != 200: raise HTTPException(status_code=401, detail="Token inválido ou expirado") user_data = await response.json() user_id = user_data.get("id") if not user_id: raise HTTPException(status_code=400, detail="ID do usuário não encontrado") is_admin = await asyncio.to_thread(get_cached_admin_status, user_id) if not is_admin: raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários") return user_id # Nova função para verificar token e retornar ID do usuário e permissões async def verify_token_with_permissions(user_token: str, required_permission: Optional[str] = None) -> Dict[str, Any]: """Verifica o token e retorna ID do usuário e suas permissões""" headers = { "Authorization": f"Bearer {user_token}", "apikey": SUPABASE_KEY, "Content-Type": "application/json" } async with aiohttp.ClientSession() as session: async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response: if response.status != 200: raise HTTPException(status_code=401, detail="Token inválido ou expirado") user_data = await response.json() user_id = user_data.get("id") if not user_id: raise HTTPException(status_code=400, detail="ID do usuário não encontrado") # Obter permissões do usuário permissions = await get_user_permissions(user_id) # Verificar se é admin is_admin = permissions.get("is_admin", False) # Verificar permissão específica, se requisitada if required_permission: has_permission = permissions.get(required_permission, False) if not has_permission: raise HTTPException( status_code=403, detail=f"Acesso negado: permissão '{required_permission}' necessária" ) return { "user_id": user_id, "is_admin": is_admin, "permissions": permissions } async def get_recent_users(limit: int = 50, search: Optional[str] = None, page: int = 0) -> Dict[str, Any]: """Obtém os usuários mais recentes da plataforma, com filtro opcional por nome e paginação""" try: offset = page * limit limit_plus_one = limit + 1 # Adicionada condição para filtrar apenas usuários não-admin query = f"{SUPABASE_URL}/rest/v1/User?select=id,name,avatar,role,blurhash&is_admin=eq.false&order=created_at.desc&limit={limit_plus_one}&offset={offset}" if search: search_term = search.replace("'", "''") query += f"&name=ilike.*{search_term}*" headers = SUPABASE_HEADERS.copy() headers["Accept"] = "application/json; charset=utf-8" async with aiohttp.ClientSession() as session: async with session.get(query, headers=headers) as response: if response.status != 200: logger.error(f"❌ Erro ao obter usuários: {response.status}") return {"users": [], "has_next_page": False} text = await response.text(encoding='utf-8') import json users_data = json.loads(text) has_next_page = len(users_data) > limit users = users_data[:limit] return { "users": users, "has_next_page": has_next_page } except Exception as e: logger.error(f"❌ Erro ao obter usuários: {str(e)}") return {"users": [], "has_next_page": False} @router.get("/admin/users") async def get_recent_users_endpoint( user_token: str = Header(None, alias="User-key"), limit: int = Query(50, ge=1, le=100), page: int = Query(0, ge=0), search: Optional[str] = Query(None) ): """ Endpoint para obter os usuários mais recentes da plataforma, com suporte a busca por nome e paginação. """ try: # Verificar se o usuário tem permissão para visualizar usuários user_info = await verify_token_with_permissions(user_token, "view_users") result = await get_recent_users(limit, search, page) return { "users": result["users"], "count": len(result["users"]), "has_next_page": result["has_next_page"] } except HTTPException as he: raise he except Exception as e: logger.error(f"❌ Erro ao obter usuários: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/admin/update-user") async def update_user( request: Dict[str, Any], user_token: str = Header(None, alias="User-key") ): """ Endpoint para atualizar informações de um usuário específico. Atualmente suporta atualização de name e fee. Usa a chave de serviço para bypassar RLS do Supabase. """ try: # Verificar se o usuário tem permissão para gerenciar usuários user_info = await verify_token_with_permissions(user_token, "manage_users") admin_id = user_info["user_id"] # Validar os parâmetros da requisição user_id = request.get("user_id") name = request.get("name") fee = request.get("fee") if not user_id: raise HTTPException(status_code=400, detail="ID do usuário é obrigatório") if name is None and fee is None: raise HTTPException(status_code=400, detail="Pelo menos um campo para atualização (name ou fee) deve ser fornecido") # Verificar se o usuário existe - usando os headers normais para consulta user_query = f"{SUPABASE_URL}/rest/v1/User?select=name,fee&id=eq.{user_id}" async with aiohttp.ClientSession() as session: async with session.get(user_query, headers=SUPABASE_HEADERS) as response: if response.status != 200: logger.error(f"❌ Erro ao verificar usuário: {response.status}") raise HTTPException(status_code=response.status, detail="Erro ao consultar usuário") user_data = await response.json() if not user_data: raise HTTPException(status_code=404, detail="Usuário não encontrado") current_user = user_data[0] # Preparar os campos para atualização, apenas se forem diferentes update_fields = {} if name is not None and name != current_user.get("name"): update_fields["name"] = name if fee is not None and isinstance(fee, int) and fee != current_user.get("fee"): update_fields["fee"] = fee # Se não há campos para atualizar, retornar sem modificar if not update_fields: return {"message": "Nenhuma alteração necessária", "user_id": user_id} # Atualizar o usuário no Supabase usando a chave de serviço para bypassar RLS update_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}" headers_with_prefer = SUPABASE_ROLE_HEADERS.copy() headers_with_prefer["Prefer"] = "return=representation" async with session.patch(update_url, json=update_fields, headers=headers_with_prefer) as update_response: if update_response.status != 200: logger.error(f"❌ Erro ao atualizar usuário: {update_response.status} - {await update_response.text()}") raise HTTPException(status_code=update_response.status, detail="Erro ao atualizar usuário") updated_user_data = await update_response.json() raw_user = updated_user_data[0] if updated_user_data else {} updated_user = { "name": raw_user.get("name"), "avatar": raw_user.get("avatar"), "role": raw_user.get("role"), "fee": raw_user.get("fee") } logger.info(f"✅ Usuário {user_id} atualizado com sucesso por {admin_id}: {update_fields}") return { "message": "Usuário atualizado com sucesso", "user": updated_user, "updated_by": admin_id } except HTTPException as he: raise he except Exception as e: logger.error(f"❌ Erro ao atualizar usuário: {str(e)}") raise HTTPException(status_code=500, detail="Erro interno do servidor") @router.post("/admin/delete-feed") async def delete_feed( request: FeedDeleteRequest, user_token: str = Header(None, alias="User-key") ): """ Deleta um feed, seus portfolios relacionados e as imagens no bucket. """ try: await verify_admin_token(user_token) feed_id = request.feed_id headers = SUPABASE_ROLE_HEADERS.copy() headers["Accept"] = "application/json" async with aiohttp.ClientSession() as session: # Buscar o feed pelo ID feed_query = f"{SUPABASE_URL}/rest/v1/Feeds?select=portfolios&limit=1&id=eq.{feed_id}" async with session.get(feed_query, headers=headers) as feed_response: if feed_response.status != 200: raise HTTPException(status_code=feed_response.status, detail="Erro ao buscar feed") feed_data = await feed_response.json() if not feed_data: raise HTTPException(status_code=404, detail="Feed não encontrado") portfolio_ids = feed_data[0].get("portfolios", []) if not isinstance(portfolio_ids, list): import json portfolio_ids = json.loads(portfolio_ids) # Caso venha como string JSON # Buscar os portfolios para pegar os URLs das imagens image_urls = [] if portfolio_ids: ids_str = ",".join([str(pid) for pid in portfolio_ids]) portfolio_query = f"{SUPABASE_URL}/rest/v1/Portfolio?select=id,image_url&id=in.({ids_str})" async with session.get(portfolio_query, headers=headers) as portfolio_response: if portfolio_response.status == 200: portfolio_data = await portfolio_response.json() image_urls = [item["image_url"] for item in portfolio_data if "image_url" in item] # Deletar os portfolios if portfolio_ids: delete_portfolios_url = f"{SUPABASE_URL}/rest/v1/Portfolio?id=in.({','.join(map(str, portfolio_ids))})" async with session.delete(delete_portfolios_url, headers=headers) as delete_response: if delete_response.status != 204: raise HTTPException(status_code=delete_response.status, detail="Erro ao deletar portfolios") # Deletar imagens do bucket Supabase Storage async def delete_image_from_storage(image_url: str): from urllib.parse import urlparse path = urlparse(image_url).path # Extraindo bucket e key parts = path.strip("/").split("/") if len(parts) < 3: return bucket_id = parts[1] file_key = "/".join(parts[2:]) delete_url = f"{SUPABASE_URL}/storage/v1/object/{bucket_id}/{file_key}" async with session.delete(delete_url, headers=headers) as delete_img_response: if delete_img_response.status not in (200, 204): logger.warning(f"❌ Falha ao deletar imagem: {file_key}") await asyncio.gather(*[delete_image_from_storage(url) for url in image_urls]) # Deletar o feed delete_feed_url = f"{SUPABASE_URL}/rest/v1/Feeds?id=eq.{feed_id}" async with session.delete(delete_feed_url, headers=headers) as delete_feed_response: if delete_feed_response.status != 204: raise HTTPException(status_code=delete_feed_response.status, detail="Erro ao deletar feed") logger.info(f"✅ Feed {feed_id} e portfolios relacionados deletados com sucesso.") return {"message": f"Feed {feed_id} e portfolios deletados com sucesso."} except HTTPException as he: raise he except Exception as e: logger.error(f"❌ Erro ao deletar feed: {str(e)}") raise HTTPException(status_code=500, detail="Erro interno do servidor") @router.get("/admin/user") async def get_user_name( user_id: str = Query(..., description="ID do usuário"), user_token: str = Header(None, alias="User-key") ): """ Endpoint para obter informações de um usuário específico a partir do ID, incluindo seus feeds e uma imagem de portfólio para cada feed. """ try: # Verificar se o usuário tem permissão para visualizar usuários user_info = await verify_token_with_permissions(user_token, "view_users") # Buscar informações básicas do usuário user_query = f"{SUPABASE_URL}/rest/v1/User?select=name,avatar,role,fee&id=eq.{user_id}" # Buscar feeds do usuário feeds_query = f"{SUPABASE_URL}/rest/v1/Feeds?select=id,portfolios,created_at,description,urls,user_id&user_id=eq.{user_id}&order=created_at.desc" headers = SUPABASE_HEADERS.copy() headers["Accept"] = "application/json; charset=utf-8" async with aiohttp.ClientSession() as session: # Fazer as requisições em paralelo user_task = session.get(user_query, headers=headers) feeds_task = session.get(feeds_query, headers=headers) async with user_task as user_response, feeds_task as feeds_response: if user_response.status != 200 or feeds_response.status != 200: status = user_response.status if user_response.status != 200 else feeds_response.status raise HTTPException(status_code=status, detail="Erro ao consultar o Supabase") user_data = await user_response.json() feeds_data = await feeds_response.json() if not user_data: raise HTTPException(status_code=404, detail="Usuário não encontrado") # Processar feeds para buscar imagens de portfólio feeds_with_images = [] for feed in feeds_data: feed_info = { "id": feed["id"], "created_at": feed["created_at"], "description": feed["description"], "urls": feed["urls"], "portfolios": feed["portfolios"] } # Verificar se há portfolios associados if feed["portfolios"] and len(feed["portfolios"]) > 0: # Pegar o primeiro portfolio do array first_portfolio_id = feed["portfolios"][0] # Buscar informações da imagem deste portfolio portfolio_query = f"{SUPABASE_URL}/rest/v1/Portfolio?select=image_url,blurhash,width,height&id=eq.{first_portfolio_id}" async with session.get(portfolio_query, headers=headers) as portfolio_response: if portfolio_response.status == 200: portfolio_data = await portfolio_response.json() if portfolio_data and len(portfolio_data) > 0: feed_info["thumbnail"] = { "image_url": portfolio_data[0]["image_url"], "blurhash": portfolio_data[0]["blurhash"], "width": portfolio_data[0]["width"], "height": portfolio_data[0]["height"] } feeds_with_images.append(feed_info) # Construir a resposta completa response = { "user": user_data[0], "feeds": feeds_with_images, "feeds_count": len(feeds_with_images) } return response except HTTPException as he: raise he except Exception as e: logger.error(f"❌ Erro ao buscar usuário e feeds: {str(e)}") raise HTTPException(status_code=500, detail="Erro interno do servidor")