|
import os |
|
import stripe |
|
import logging |
|
import asyncio |
|
import aiohttp |
|
from datetime import datetime |
|
from pydantic import BaseModel |
|
from fastapi import APIRouter, HTTPException, Header, Query |
|
from functools import lru_cache |
|
from typing import List, Dict, Any, Optional |
|
|
|
router = APIRouter() |
|
|
|
|
|
stripe.api_key = os.getenv("STRIPE_KEY") |
|
stripe.api_version = "2023-10-16" |
|
|
|
|
|
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co" |
|
SUPABASE_KEY = os.getenv("SUPA_KEY") |
|
SUPABASE_ROLE_KEY = os.getenv("SUPA_SERVICE_KEY") |
|
|
|
if not stripe.api_key or not SUPABASE_KEY or not SUPABASE_ROLE_KEY: |
|
raise ValueError("❌ STRIPE_KEY, SUPA_KEY ou SUPA_SERVICE_KEY não foram definidos no ambiente!") |
|
|
|
SUPABASE_HEADERS = { |
|
"apikey": SUPABASE_KEY, |
|
"Authorization": f"Bearer {SUPABASE_KEY}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
SUPABASE_ROLE_HEADERS = { |
|
"apikey": SUPABASE_ROLE_KEY, |
|
"Authorization": f"Bearer {SUPABASE_ROLE_KEY}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@lru_cache(maxsize=128) |
|
def get_cached_admin_status(user_id: str) -> bool: |
|
"""Obtém e armazena em cache se um usuário é admin""" |
|
import requests |
|
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}" |
|
response = requests.get(user_data_url, headers=SUPABASE_HEADERS) |
|
|
|
if response.status_code != 200 or not response.json(): |
|
return False |
|
|
|
user_info = response.json()[0] |
|
return user_info.get("is_admin", False) |
|
|
|
|
|
async def get_user_permissions(user_id: str) -> Dict[str, bool]: |
|
"""Obtém as permissões de um usuário""" |
|
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=is_admin,view_users,manage_users" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(user_data_url, headers=SUPABASE_HEADERS) as response: |
|
if response.status != 200 or not await response.json(): |
|
return {"is_admin": False, "view_users": False, "manage_users": False} |
|
|
|
user_info = (await response.json())[0] |
|
return { |
|
"is_admin": user_info.get("is_admin", False), |
|
"view_users": user_info.get("view_users", False), |
|
"manage_users": user_info.get("manage_users", False) |
|
} |
|
|
|
async def verify_admin_token(user_token: str) -> str: |
|
"""Verifica se o token pertence a um administrador de forma assíncrona""" |
|
headers = { |
|
"Authorization": f"Bearer {user_token}", |
|
"apikey": SUPABASE_KEY, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=401, detail="Token inválido ou expirado") |
|
|
|
user_data = await response.json() |
|
user_id = user_data.get("id") |
|
if not user_id: |
|
raise HTTPException(status_code=400, detail="ID do usuário não encontrado") |
|
|
|
is_admin = await asyncio.to_thread(get_cached_admin_status, user_id) |
|
|
|
if not is_admin: |
|
raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários") |
|
|
|
return user_id |
|
|
|
|
|
async def verify_token_with_permissions(user_token: str, required_permission: Optional[str] = None) -> Dict[str, Any]: |
|
"""Verifica o token e retorna ID do usuário e suas permissões""" |
|
headers = { |
|
"Authorization": f"Bearer {user_token}", |
|
"apikey": SUPABASE_KEY, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=401, detail="Token inválido ou expirado") |
|
|
|
user_data = await response.json() |
|
user_id = user_data.get("id") |
|
if not user_id: |
|
raise HTTPException(status_code=400, detail="ID do usuário não encontrado") |
|
|
|
|
|
permissions = await get_user_permissions(user_id) |
|
|
|
|
|
is_admin = permissions.get("is_admin", False) |
|
|
|
|
|
if required_permission: |
|
has_permission = permissions.get(required_permission, False) |
|
if not has_permission: |
|
raise HTTPException( |
|
status_code=403, |
|
detail=f"Acesso negado: permissão '{required_permission}' necessária" |
|
) |
|
|
|
return { |
|
"user_id": user_id, |
|
"is_admin": is_admin, |
|
"permissions": permissions |
|
} |
|
|
|
async def get_recent_users(limit: int = 50, search: Optional[str] = None, page: int = 0) -> Dict[str, Any]: |
|
"""Obtém os usuários mais recentes da plataforma, com filtro opcional por nome e paginação""" |
|
try: |
|
offset = page * limit |
|
limit_plus_one = limit + 1 |
|
|
|
|
|
query = f"{SUPABASE_URL}/rest/v1/User?select=id,name,avatar,role,blurhash,blocked,deleted_account&is_admin=eq.false&order=created_at.desc&limit={limit_plus_one}&offset={offset}" |
|
|
|
if search: |
|
search_term = search.replace("'", "''") |
|
query += f"&name=ilike.*{search_term}*" |
|
|
|
headers = SUPABASE_HEADERS.copy() |
|
headers["Accept"] = "application/json; charset=utf-8" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(query, headers=headers) as response: |
|
if response.status != 200: |
|
logger.error(f"❌ Erro ao obter usuários: {response.status}") |
|
return {"users": [], "has_next_page": False} |
|
|
|
text = await response.text(encoding='utf-8') |
|
import json |
|
users_data = json.loads(text) |
|
|
|
has_next_page = len(users_data) > limit |
|
users = users_data[:limit] |
|
|
|
return { |
|
"users": users, |
|
"has_next_page": has_next_page |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Erro ao obter usuários: {str(e)}") |
|
return {"users": [], "has_next_page": False} |
|
|
|
@router.get("/admin/users") |
|
async def get_recent_users_endpoint( |
|
user_token: str = Header(None, alias="User-key"), |
|
limit: int = Query(50, ge=1, le=100), |
|
page: int = Query(0, ge=0), |
|
search: Optional[str] = Query(None) |
|
): |
|
""" |
|
Endpoint para obter os usuários mais recentes da plataforma, |
|
com suporte a busca por nome e paginação. |
|
""" |
|
try: |
|
|
|
user_info = await verify_token_with_permissions(user_token, "view_users") |
|
|
|
result = await get_recent_users(limit, search, page) |
|
|
|
return { |
|
"users": result["users"], |
|
"count": len(result["users"]), |
|
"has_next_page": result["has_next_page"] |
|
} |
|
|
|
except HTTPException as he: |
|
raise he |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Erro ao obter usuários: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@router.get("/admin/user-transfers") |
|
async def get_user_transfer_history( |
|
user_token: str = Header(None, alias="User-key"), |
|
user_id: str = Query(..., description="ID of the user in Supabase") |
|
): |
|
""" |
|
Returns financial information for a user: |
|
- If the user is a customer (stripe_id starts with 'cus_'), returns last 10 charges. |
|
- If the user is a seller (stripe_id starts with 'acct_'), returns total revenue and upcoming payout info. |
|
""" |
|
try: |
|
|
|
await verify_token_with_permissions(user_token, "view_users") |
|
|
|
|
|
user_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=stripe_id" |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(user_url, headers=SUPABASE_HEADERS) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=404, detail="User not found") |
|
data = await response.json() |
|
if not data or not data[0].get("stripe_id"): |
|
raise HTTPException(status_code=404, detail="Stripe ID not found for this user") |
|
stripe_id = data[0]["stripe_id"] |
|
|
|
|
|
if stripe_id.startswith("acct_"): |
|
|
|
transfers = stripe.Transfer.list(destination=stripe_id, limit=100) |
|
total_revenue = sum([t["amount"] for t in transfers["data"]]) / 100 |
|
|
|
|
|
upcoming_payouts = stripe.Payout.list( |
|
destination=None, |
|
stripe_account=stripe_id, |
|
limit=1, |
|
arrival_date={"gte": int(datetime.now().timestamp())} |
|
) |
|
|
|
upcoming = upcoming_payouts["data"][0] if upcoming_payouts["data"] else None |
|
|
|
return { |
|
"stripe_id": stripe_id, |
|
"role": "stylist", |
|
"total_revenue": total_revenue, |
|
"upcoming_payout": { |
|
"amount": upcoming["amount"] / 100 if upcoming else None, |
|
"arrival_date": datetime.fromtimestamp(upcoming["arrival_date"]).isoformat() if upcoming else None, |
|
}, |
|
"transfers_count": len(transfers["data"]) |
|
} |
|
|
|
|
|
elif stripe_id.startswith("cus_"): |
|
all_charges = stripe.Charge.list(customer=stripe_id, limit=100) |
|
total_count = len(all_charges["data"]) |
|
recent_charges = sorted(all_charges["data"], key=lambda c: c["created"], reverse=True)[:10] |
|
|
|
|
|
status_map = { |
|
"succeeded": {"label": "Succeeded", "color": "#15803d"}, |
|
"pending": {"label": "Pending", "color": "#92400e"}, |
|
"failed": {"label": "Failed", "color": "#991b1b"}, |
|
"canceled": {"label": "Canceled", "color": "#374151"}, |
|
} |
|
|
|
async def fetch_stylist(stylist_id: str) -> Optional[Dict[str, Any]]: |
|
url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{stylist_id}&select=name,email,avatar" |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url, headers=SUPABASE_HEADERS) as response: |
|
if response.status == 200: |
|
data = await response.json() |
|
if data: |
|
return data[0] |
|
return None |
|
|
|
|
|
tasks = [] |
|
for charge in recent_charges: |
|
metadata = charge.get("metadata", {}) |
|
stylist_id = metadata.get("stylist_id") |
|
task = fetch_stylist(stylist_id) if stylist_id else None |
|
tasks.append(task) |
|
stylist_data_list = await asyncio.gather(*[t if t else asyncio.sleep(0) for t in tasks]) |
|
|
|
|
|
transfers = [] |
|
for i, charge in enumerate(recent_charges): |
|
metadata = charge.get("metadata", {}) |
|
stylist_info = stylist_data_list[i] if i < len(stylist_data_list) else None |
|
status_id = charge["status"] |
|
status_info = status_map.get(status_id, { |
|
"label": "Unknown", |
|
"color": "#1f2937" |
|
}) |
|
transfers.append({ |
|
"id": charge["id"], |
|
"amount": charge["amount"] / 100, |
|
"currency": charge["currency"].upper(), |
|
"status_info": { |
|
"id": status_id, |
|
"label": status_info["label"], |
|
"color": status_info["color"] |
|
}, |
|
"created_at": datetime.fromtimestamp(charge["created"]).isoformat(), |
|
"description": charge.get("description"), |
|
"payment_method": charge.get("payment_method_details", {}).get("type"), |
|
"receipt_url": charge.get("receipt_url"), |
|
"metadata": metadata, |
|
"stylist": stylist_info |
|
}) |
|
|
|
return { |
|
"stripe_id": stripe_id, |
|
"role": "customer", |
|
"transfers": transfers, |
|
"count": total_count |
|
} |
|
|
|
else: |
|
raise HTTPException(status_code=400, detail="Invalid stripe_id format.") |
|
|
|
except HTTPException as he: |
|
raise he |
|
except Exception as e: |
|
logger.error(f"❌ Error retrieving transfers: {str(e)}") |
|
raise HTTPException(status_code=500, detail="Internal error while fetching financial data.") |