import os import stripe import logging import asyncio import aiohttp from datetime import datetime from pydantic import BaseModel from fastapi import APIRouter, HTTPException, Header, Query from functools import lru_cache from typing import List, Dict, Any, Optional router = APIRouter() # 🔥 Pegando as chaves do ambiente stripe.api_key = os.getenv("STRIPE_KEY") # Lendo do ambiente stripe.api_version = "2023-10-16" # Configuração do Supabase SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co" SUPABASE_KEY = os.getenv("SUPA_KEY") SUPABASE_ROLE_KEY = os.getenv("SUPA_SERVICE_KEY") if not stripe.api_key or not SUPABASE_KEY or not SUPABASE_ROLE_KEY: raise ValueError("❌ STRIPE_KEY, SUPA_KEY ou SUPA_SERVICE_KEY não foram definidos no ambiente!") SUPABASE_HEADERS = { "apikey": SUPABASE_KEY, "Authorization": f"Bearer {SUPABASE_KEY}", "Content-Type": "application/json" } SUPABASE_ROLE_HEADERS = { "apikey": SUPABASE_ROLE_KEY, "Authorization": f"Bearer {SUPABASE_ROLE_KEY}", "Content-Type": "application/json" } # Configuração do logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Cache para reduzir chamadas repetidas @lru_cache(maxsize=128) def get_cached_admin_status(user_id: str) -> bool: """Obtém e armazena em cache se um usuário é admin""" import requests user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}" response = requests.get(user_data_url, headers=SUPABASE_HEADERS) if response.status_code != 200 or not response.json(): return False user_info = response.json()[0] return user_info.get("is_admin", False) # Nova função para verificar permissões de usuário async def get_user_permissions(user_id: str) -> Dict[str, bool]: """Obtém as permissões de um usuário""" user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=is_admin,view_users,manage_users" async with aiohttp.ClientSession() as session: async with session.get(user_data_url, headers=SUPABASE_HEADERS) as response: if response.status != 200 or not await response.json(): return {"is_admin": False, "view_users": False, "manage_users": False} user_info = (await response.json())[0] return { "is_admin": user_info.get("is_admin", False), "view_users": user_info.get("view_users", False), "manage_users": user_info.get("manage_users", False) } async def verify_admin_token(user_token: str) -> str: """Verifica se o token pertence a um administrador de forma assíncrona""" headers = { "Authorization": f"Bearer {user_token}", "apikey": SUPABASE_KEY, "Content-Type": "application/json" } async with aiohttp.ClientSession() as session: async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response: if response.status != 200: raise HTTPException(status_code=401, detail="Token inválido ou expirado") user_data = await response.json() user_id = user_data.get("id") if not user_id: raise HTTPException(status_code=400, detail="ID do usuário não encontrado") is_admin = await asyncio.to_thread(get_cached_admin_status, user_id) if not is_admin: raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários") return user_id # Nova função para verificar token e retornar ID do usuário e permissões async def verify_token_with_permissions(user_token: str, required_permission: Optional[str] = None) -> Dict[str, Any]: """Verifica o token e retorna ID do usuário e suas permissões""" headers = { "Authorization": f"Bearer {user_token}", "apikey": SUPABASE_KEY, "Content-Type": "application/json" } async with aiohttp.ClientSession() as session: async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response: if response.status != 200: raise HTTPException(status_code=401, detail="Token inválido ou expirado") user_data = await response.json() user_id = user_data.get("id") if not user_id: raise HTTPException(status_code=400, detail="ID do usuário não encontrado") # Obter permissões do usuário permissions = await get_user_permissions(user_id) # Verificar se é admin is_admin = permissions.get("is_admin", False) # Verificar permissão específica, se requisitada if required_permission: has_permission = permissions.get(required_permission, False) if not has_permission: raise HTTPException( status_code=403, detail=f"Acesso negado: permissão '{required_permission}' necessária" ) return { "user_id": user_id, "is_admin": is_admin, "permissions": permissions } async def get_recent_users(limit: int = 50, search: Optional[str] = None, page: int = 0) -> Dict[str, Any]: """Obtém os usuários mais recentes da plataforma, com filtro opcional por nome e paginação""" try: offset = page * limit limit_plus_one = limit + 1 # Adicionada condição para filtrar apenas usuários não-admin query = f"{SUPABASE_URL}/rest/v1/User?select=id,name,avatar,role,blurhash,blocked,deleted_account&is_admin=eq.false&order=created_at.desc&limit={limit_plus_one}&offset={offset}" if search: search_term = search.replace("'", "''") query += f"&name=ilike.*{search_term}*" headers = SUPABASE_HEADERS.copy() headers["Accept"] = "application/json; charset=utf-8" async with aiohttp.ClientSession() as session: async with session.get(query, headers=headers) as response: if response.status != 200: logger.error(f"❌ Erro ao obter usuários: {response.status}") return {"users": [], "has_next_page": False} text = await response.text(encoding='utf-8') import json users_data = json.loads(text) has_next_page = len(users_data) > limit users = users_data[:limit] return { "users": users, "has_next_page": has_next_page } except Exception as e: logger.error(f"❌ Erro ao obter usuários: {str(e)}") return {"users": [], "has_next_page": False} @router.get("/admin/users") async def get_recent_users_endpoint( user_token: str = Header(None, alias="User-key"), limit: int = Query(50, ge=1, le=100), page: int = Query(0, ge=0), search: Optional[str] = Query(None) ): """ Endpoint para obter os usuários mais recentes da plataforma, com suporte a busca por nome e paginação. """ try: # Verificar se o usuário tem permissão para visualizar usuários user_info = await verify_token_with_permissions(user_token, "view_users") result = await get_recent_users(limit, search, page) return { "users": result["users"], "count": len(result["users"]), "has_next_page": result["has_next_page"] } except HTTPException as he: raise he except Exception as e: logger.error(f"❌ Erro ao obter usuários: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/admin/user-transfers") async def get_user_transfer_history( user_token: str = Header(None, alias="User-key"), user_id: str = Query(..., description="ID of the user in Supabase") ): """ Returns financial information for a user: - If the user is a customer (stripe_id starts with 'cus_'), returns last 10 charges. - If the user is a seller (stripe_id starts with 'acct_'), returns total revenue and upcoming payout info. """ try: # Verify admin permission await verify_token_with_permissions(user_token, "view_users") # Get user's stripe_id user_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=stripe_id" async with aiohttp.ClientSession() as session: async with session.get(user_url, headers=SUPABASE_HEADERS) as response: if response.status != 200: raise HTTPException(status_code=404, detail="User not found") data = await response.json() if not data or not data[0].get("stripe_id"): raise HTTPException(status_code=404, detail="Stripe ID not found for this user") stripe_id = data[0]["stripe_id"] # Se for estilista (Stripe Account) if stripe_id.startswith("acct_"): # List all transfers to this connected account transfers = stripe.Transfer.list(destination=stripe_id, limit=100) total_revenue = sum([t["amount"] for t in transfers["data"]]) / 100 # Próximo pagamento (caso exista) upcoming_payouts = stripe.Payout.list( destination=None, # Para connected accounts, isso vem automaticamente stripe_account=stripe_id, limit=1, arrival_date={"gte": int(datetime.now().timestamp())} ) upcoming = upcoming_payouts["data"][0] if upcoming_payouts["data"] else None return { "stripe_id": stripe_id, "role": "stylist", "total_revenue": total_revenue, "upcoming_payout": { "amount": upcoming["amount"] / 100 if upcoming else None, "arrival_date": datetime.fromtimestamp(upcoming["arrival_date"]).isoformat() if upcoming else None, }, "transfers_count": len(transfers["data"]) } # Se for cliente (Stripe Customer) elif stripe_id.startswith("cus_"): all_charges = stripe.Charge.list(customer=stripe_id, limit=100) total_count = len(all_charges["data"]) recent_charges = sorted(all_charges["data"], key=lambda c: c["created"], reverse=True)[:10] # Status mapping (id → label + dark color) status_map = { "succeeded": {"label": "Succeeded", "color": "#15803d"}, "pending": {"label": "Pending", "color": "#92400e"}, "failed": {"label": "Failed", "color": "#991b1b"}, "canceled": {"label": "Canceled", "color": "#374151"}, } async def fetch_stylist(stylist_id: str) -> Optional[Dict[str, Any]]: url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{stylist_id}&select=name,email,avatar" async with aiohttp.ClientSession() as session: async with session.get(url, headers=SUPABASE_HEADERS) as response: if response.status == 200: data = await response.json() if data: return data[0] return None # Buscar stylists tasks = [] for charge in recent_charges: metadata = charge.get("metadata", {}) stylist_id = metadata.get("stylist_id") task = fetch_stylist(stylist_id) if stylist_id else None tasks.append(task) stylist_data_list = await asyncio.gather(*[t if t else asyncio.sleep(0) for t in tasks]) # Formatando transfers = [] for i, charge in enumerate(recent_charges): metadata = charge.get("metadata", {}) stylist_info = stylist_data_list[i] if i < len(stylist_data_list) else None status_id = charge["status"] status_info = status_map.get(status_id, { "label": "Unknown", "color": "#1f2937" }) transfers.append({ "id": charge["id"], "amount": charge["amount"] / 100, "currency": charge["currency"].upper(), "status_info": { "id": status_id, "label": status_info["label"], "color": status_info["color"] }, "created_at": datetime.fromtimestamp(charge["created"]).isoformat(), "description": charge.get("description"), "payment_method": charge.get("payment_method_details", {}).get("type"), "receipt_url": charge.get("receipt_url"), "metadata": metadata, "stylist": stylist_info }) return { "stripe_id": stripe_id, "role": "customer", "transfers": transfers, "count": total_count } else: raise HTTPException(status_code=400, detail="Invalid stripe_id format.") except HTTPException as he: raise he except Exception as e: logger.error(f"❌ Error retrieving transfers: {str(e)}") raise HTTPException(status_code=500, detail="Internal error while fetching financial data.")