File size: 4,573 Bytes
5844082
 
 
 
a0c6b50
5844082
a0c6b50
5844082
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c314dfe
 
5844082
c314dfe
edd6ece
c314dfe
edd6ece
a0c6b50
 
 
 
 
5844082
 
 
 
a0c6b50
5844082
a0c6b50
c314dfe
5844082
 
 
 
c314dfe
 
edd6ece
c314dfe
 
 
 
 
5844082
 
a0c6b50
c314dfe
5844082
a54bb56
5844082
a54bb56
a0c6b50
c314dfe
a0c6b50
5844082
 
c314dfe
 
5844082
 
 
c314dfe
5844082
 
c314dfe
 
 
5844082
 
 
 
 
 
a0c6b50
c314dfe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import os
import logging
import asyncio
import aiohttp
from fastapi import APIRouter, HTTPException, Header, Query
from functools import lru_cache
from typing import List, Dict, Any, Optional

router = APIRouter()

# Configuração do Supabase
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co"
SUPABASE_KEY = os.getenv("SUPA_KEY")

if not SUPABASE_KEY:
    raise ValueError("❌ SUPA_KEY não foi definido no ambiente!")

SUPABASE_HEADERS = {
    "apikey": SUPABASE_KEY,
    "Authorization": f"Bearer {SUPABASE_KEY}",
    "Content-Type": "application/json"
}

# Configuração do logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Cache para reduzir chamadas repetidas
@lru_cache(maxsize=128)
def get_cached_admin_status(user_id: str) -> bool:
    """Obtém e armazena em cache se um usuário é admin"""
    import requests
    user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}"
    response = requests.get(user_data_url, headers=SUPABASE_HEADERS)
    
    if response.status_code != 200 or not response.json():
        return False
    
    user_info = response.json()[0]
    return user_info.get("is_admin", False)

async def verify_admin_token(user_token: str) -> str:
    """Verifica se o token pertence a um administrador de forma assíncrona"""
    headers = {
        "Authorization": f"Bearer {user_token}",
        "apikey": SUPABASE_KEY,
        "Content-Type": "application/json"
    }
    
    async with aiohttp.ClientSession() as session:
        async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response:
            if response.status != 200:
                raise HTTPException(status_code=401, detail="Token inválido ou expirado")
            
            user_data = await response.json()
            user_id = user_data.get("id")
            if not user_id:
                raise HTTPException(status_code=400, detail="ID do usuário não encontrado")
    
    is_admin = await asyncio.to_thread(get_cached_admin_status, user_id)
    
    if not is_admin:
        raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários")
    
    return user_id

async def get_recent_users(limit: int = 50, search: Optional[str] = None, page: int = 0) -> Dict[str, Any]:
    """Obtém os usuários mais recentes da plataforma, com filtro opcional por nome e paginação"""
    try:
        offset = page * limit
        limit_plus_one = limit + 1
        
        query = f"{SUPABASE_URL}/rest/v1/User?select=name,avatar&order=created_at.desc&limit={limit_plus_one}&offset={offset}"
        
        if search:
            search_term = search.replace("'", "''")
            query += f"&name=ilike.*{search_term}*"
        
        headers = SUPABASE_HEADERS.copy()
        headers["Accept"] = "application/json; charset=utf-8"
        
        async with aiohttp.ClientSession() as session:
            async with session.get(query, headers=headers) as response:
                if response.status != 200:
                    logger.error(f"❌ Erro ao obter usuários: {response.status}")
                    return {"users": [], "has_next_page": False}
                
                text = await response.text(encoding='utf-8')
                import json
                users_data = json.loads(text)
        
        has_next_page = len(users_data) > limit
        users = users_data[:limit]
        
        return {
            "users": users,
            "has_next_page": has_next_page
        }
        
    except Exception as e:
        logger.error(f"❌ Erro ao obter usuários: {str(e)}")
        return {"users": [], "has_next_page": False}

@router.get("/admin/users")
async def get_recent_users_endpoint(
    user_token: str = Header(None, alias="User-key"),
    limit: int = Query(50, ge=1, le=100),
    page: int = Query(0, ge=0),
    search: Optional[str] = Query(None)
):
    """
    Endpoint para obter os usuários mais recentes da plataforma,
    com suporte a busca por nome e paginação.
    """
    try:
        user_id = await verify_admin_token(user_token)
        result = await get_recent_users(limit, search, page)
        
        return {
            "users": result["users"],
            "count": len(result["users"]),
            "has_next_page": result["has_next_page"]
        }
        
    except HTTPException as he:
        raise he
        
    except Exception as e:
        logger.error(f"❌ Erro ao obter usuários: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))