connect / routes /approvals.py
habulaj's picture
Update routes/approvals.py
a70d8a7 verified
import os
import logging
import aiohttp
import asyncio
from fastapi import APIRouter, HTTPException, Header
from typing import Dict, Any, Optional
from functools import lru_cache
from pydantic import BaseModel
import requests
from collections import defaultdict
router = APIRouter()
# 📦 Models
class ApproveAccountRequest(BaseModel):
user_id: str
class RejectAccountRequest(BaseModel):
user_id: str
reason: Optional[str] = None
# 🔥 Supabase Configuração com Secrets
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co"
SUPABASE_KEY = os.getenv("SUPA_KEY")
SUPABASE_ROLE_KEY = os.getenv("SUPA_SERVICE_KEY")
if not SUPABASE_KEY or not SUPABASE_ROLE_KEY:
raise ValueError("❌ SUPA_KEY ou SUPA_SERVICE_KEY não foram definidos no ambiente!")
SUPABASE_HEADERS = {
"apikey": SUPABASE_KEY,
"Authorization": f"Bearer {SUPABASE_KEY}",
"Content-Type": "application/json"
}
SUPABASE_ROLE_HEADERS = {
"apikey": SUPABASE_ROLE_KEY,
"Authorization": f"Bearer {SUPABASE_ROLE_KEY}",
"Content-Type": "application/json"
}
# ⚙️ Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 🔒 Cache de admin
@lru_cache(maxsize=128)
def get_cached_admin_status(user_id: str) -> bool:
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}"
response = requests.get(user_data_url, headers=SUPABASE_HEADERS)
if response.status_code != 200 or not response.json():
return False
user_info = response.json()[0]
return user_info.get("is_admin", False)
# Nova função para obter permissões do usuário
async def get_user_permissions(user_id: str) -> Dict[str, bool]:
"""Obtém as permissões de um usuário, incluindo approve_stylists"""
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=is_admin,approve_stylists"
async with aiohttp.ClientSession() as session:
async with session.get(user_data_url, headers=SUPABASE_HEADERS) as response:
if response.status != 200 or not await response.json():
return {"is_admin": False, "approve_stylists": False}
user_info = (await response.json())[0]
return {
"is_admin": user_info.get("is_admin", False),
"approve_stylists": user_info.get("approve_stylists", False)
}
# 🔐 Verificação de token com permissões especificadas
async def verify_token_with_permissions(user_token: str, required_permission: Optional[str] = None) -> Dict[str, Any]:
"""Verifica o token e verifica se o usuário tem a permissão necessária"""
headers = {
"Authorization": f"Bearer {user_token}",
"apikey": SUPABASE_KEY,
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response:
if response.status != 200:
raise HTTPException(status_code=401, detail="Token inválido ou expirado")
user_data = await response.json()
user_id = user_data.get("id")
if not user_id:
raise HTTPException(status_code=400, detail="ID do usuário não encontrado")
# Obter permissões do usuário
permissions = await get_user_permissions(user_id)
# Verificar permissão específica, se requisitada
if required_permission:
has_permission = permissions.get(required_permission, False)
if not has_permission:
raise HTTPException(
status_code=403,
detail=f"Acesso negado: permissão '{required_permission}' necessária"
)
return {
"user_id": user_id,
"permissions": permissions
}
# 🔐 Verificação de token admin (mantida para compatibilidade)
async def verify_admin_token(user_token: str) -> str:
headers = {
"Authorization": f"Bearer {user_token}",
"apikey": SUPABASE_KEY,
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response:
if response.status != 200:
raise HTTPException(status_code=401, detail="Token inválido ou expirado")
user_data = await response.json()
user_id = user_data.get("id")
if not user_id:
raise HTTPException(status_code=400, detail="ID do usuário não encontrado")
is_admin = await asyncio.to_thread(get_cached_admin_status, user_id)
if not is_admin:
raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários")
return user_id
# 📥 Buscar usuários pendentes com respostas
async def get_users_pending_approval() -> Dict[str, Any]:
try:
query_users = (
f"{SUPABASE_URL}/rest/v1/User"
"?select=id,name,email,avatar,role,blurhash"
"&approved_account=eq.false"
"&finished_onboarding=eq.true"
"&or=(approval_reason.is.null,approval_reason.eq.)"
)
headers = SUPABASE_HEADERS.copy()
headers["Accept"] = "application/json; charset=utf-8"
async with aiohttp.ClientSession() as session:
async with session.get(query_users, headers=headers) as response:
if response.status != 200:
logger.error(f"❌ Erro ao buscar usuários pendentes: {response.status}")
raise HTTPException(status_code=500, detail="Erro ao consultar usuários")
users = await response.json()
if not users:
return {"users": [], "count": 0}
user_ids = [user['id'] for user in users]
user_ids_str = ",".join(user_ids)
query_answers = (
f"{SUPABASE_URL}/rest/v1/User answers"
f"?select=user_id,question_id,answers"
f"&user_id=in.({user_ids_str})"
)
async with session.get(query_answers, headers=headers) as response:
if response.status != 200:
logger.warning("⚠️ Erro ao buscar respostas de usuários")
user_answers = []
else:
user_answers = await response.json()
query_questions = f"{SUPABASE_URL}/rest/v1/Onboarding?select=id,title"
async with session.get(query_questions, headers=headers) as response:
if response.status != 200:
logger.warning("⚠️ Erro ao buscar perguntas")
questions = []
else:
questions = await response.json()
question_map = {q["id"]: q["title"] for q in questions}
grouped_answers = defaultdict(list)
for answer in user_answers:
title = question_map.get(answer["question_id"], "Pergunta desconhecida")
grouped_answers[answer["user_id"]].append({
"question": title,
"answer": answer.get("answers", [])
})
for user in users:
user["answers"] = grouped_answers.get(user["id"], [])
return {"users": users, "count": len(users)}
except Exception as e:
logger.error(f"❌ Erro ao buscar usuários e respostas: {str(e)}")
raise HTTPException(status_code=500, detail="Erro interno do servidor")
# ✅ Aprovar conta
@router.post("/admin/approve-account")
async def approve_account(
body: ApproveAccountRequest,
user_token: str = Header(None, alias="User-key")
):
try:
# Verificar se o usuário tem permissão para aprovar estilistas
user_info = await verify_token_with_permissions(user_token, "approve_stylists")
update_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{body.user_id}"
payload = { "approved_account": True }
async with aiohttp.ClientSession() as session:
async with session.patch(update_url, headers=SUPABASE_ROLE_HEADERS, json=payload) as response:
if response.status != 204:
logger.error(f"❌ Erro ao aprovar conta: {response.status}")
raise HTTPException(status_code=500, detail="Erro ao aprovar a conta")
return {"status": "success", "message": "Conta aprovada com sucesso."}
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro ao aprovar conta: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# ❌ Reprovar conta
@router.post("/admin/reject-account")
async def reject_account(
body: RejectAccountRequest,
user_token: str = Header(None, alias="User-key")
):
try:
# Verificar se o usuário tem permissão para reprovar estilistas
user_info = await verify_token_with_permissions(user_token, "approve_stylists")
update_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{body.user_id}"
payload = {
"approved_account": False,
"approval_reason": body.reason or "No reason specified"
}
async with aiohttp.ClientSession() as session:
async with session.patch(update_url, headers=SUPABASE_ROLE_HEADERS, json=payload) as response:
if response.status != 204:
logger.error(f"❌ Erro ao reprovar conta: {response.status}")
raise HTTPException(status_code=500, detail="Erro ao reprovar a conta")
return {"status": "success", "message": "Conta reprovada com sucesso."}
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro ao reprovar conta: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# 🔍 Listar contas pendentes
@router.get("/admin/pending-approvals")
async def pending_approvals_endpoint(
user_token: str = Header(None, alias="User-key")
):
try:
# Verificar se o usuário tem permissão para visualizar estilistas pendentes
user_info = await verify_token_with_permissions(user_token, "approve_stylists")
return await get_users_pending_approval()
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro no endpoint de aprovações: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))