|
import os |
|
import logging |
|
import asyncio |
|
import aiohttp |
|
from pydantic import BaseModel |
|
from fastapi import APIRouter, HTTPException, Header, Query |
|
from functools import lru_cache |
|
from typing import List, Dict, Any, Optional |
|
|
|
router = APIRouter() |
|
|
|
|
|
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co" |
|
SUPABASE_KEY = os.getenv("SUPA_KEY") |
|
SUPABASE_ROLE_KEY = os.getenv("SUPA_SERVICE_KEY") |
|
|
|
class FeedDeleteRequest(BaseModel): |
|
feed_id: int |
|
|
|
if not SUPABASE_KEY or not SUPABASE_ROLE_KEY: |
|
raise ValueError("❌ SUPA_KEY ou SUPA_SERVICE_KEY não foram definidos no ambiente!") |
|
|
|
SUPABASE_HEADERS = { |
|
"apikey": SUPABASE_KEY, |
|
"Authorization": f"Bearer {SUPABASE_KEY}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
SUPABASE_ROLE_HEADERS = { |
|
"apikey": SUPABASE_ROLE_KEY, |
|
"Authorization": f"Bearer {SUPABASE_ROLE_KEY}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@lru_cache(maxsize=128) |
|
def get_cached_admin_status(user_id: str) -> bool: |
|
"""Obtém e armazena em cache se um usuário é admin""" |
|
import requests |
|
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}" |
|
response = requests.get(user_data_url, headers=SUPABASE_HEADERS) |
|
|
|
if response.status_code != 200 or not response.json(): |
|
return False |
|
|
|
user_info = response.json()[0] |
|
return user_info.get("is_admin", False) |
|
|
|
|
|
async def get_user_permissions(user_id: str) -> Dict[str, bool]: |
|
"""Obtém as permissões de um usuário""" |
|
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=is_admin,view_users,manage_users" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(user_data_url, headers=SUPABASE_HEADERS) as response: |
|
if response.status != 200 or not await response.json(): |
|
return {"is_admin": False, "view_users": False, "manage_users": False} |
|
|
|
user_info = (await response.json())[0] |
|
return { |
|
"is_admin": user_info.get("is_admin", False), |
|
"view_users": user_info.get("view_users", False), |
|
"manage_users": user_info.get("manage_users", False) |
|
} |
|
|
|
async def verify_admin_token(user_token: str) -> str: |
|
"""Verifica se o token pertence a um administrador de forma assíncrona""" |
|
headers = { |
|
"Authorization": f"Bearer {user_token}", |
|
"apikey": SUPABASE_KEY, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=401, detail="Token inválido ou expirado") |
|
|
|
user_data = await response.json() |
|
user_id = user_data.get("id") |
|
if not user_id: |
|
raise HTTPException(status_code=400, detail="ID do usuário não encontrado") |
|
|
|
is_admin = await asyncio.to_thread(get_cached_admin_status, user_id) |
|
|
|
if not is_admin: |
|
raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários") |
|
|
|
return user_id |
|
|
|
|
|
async def verify_token_with_permissions(user_token: str, required_permission: Optional[str] = None) -> Dict[str, Any]: |
|
"""Verifica o token e retorna ID do usuário e suas permissões""" |
|
headers = { |
|
"Authorization": f"Bearer {user_token}", |
|
"apikey": SUPABASE_KEY, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=401, detail="Token inválido ou expirado") |
|
|
|
user_data = await response.json() |
|
user_id = user_data.get("id") |
|
if not user_id: |
|
raise HTTPException(status_code=400, detail="ID do usuário não encontrado") |
|
|
|
|
|
permissions = await get_user_permissions(user_id) |
|
|
|
|
|
is_admin = permissions.get("is_admin", False) |
|
|
|
|
|
if required_permission: |
|
has_permission = permissions.get(required_permission, False) |
|
if not has_permission: |
|
raise HTTPException( |
|
status_code=403, |
|
detail=f"Acesso negado: permissão '{required_permission}' necessária" |
|
) |
|
|
|
return { |
|
"user_id": user_id, |
|
"is_admin": is_admin, |
|
"permissions": permissions |
|
} |
|
|
|
async def get_recent_users(limit: int = 50, search: Optional[str] = None, page: int = 0) -> Dict[str, Any]: |
|
"""Obtém os usuários mais recentes da plataforma, com filtro opcional por nome e paginação""" |
|
try: |
|
offset = page * limit |
|
limit_plus_one = limit + 1 |
|
|
|
|
|
query = f"{SUPABASE_URL}/rest/v1/User?select=id,name,avatar,role,blurhash&is_admin=eq.false&order=created_at.desc&limit={limit_plus_one}&offset={offset}" |
|
|
|
if search: |
|
search_term = search.replace("'", "''") |
|
query += f"&name=ilike.*{search_term}*" |
|
|
|
headers = SUPABASE_HEADERS.copy() |
|
headers["Accept"] = "application/json; charset=utf-8" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(query, headers=headers) as response: |
|
if response.status != 200: |
|
logger.error(f"❌ Erro ao obter usuários: {response.status}") |
|
return {"users": [], "has_next_page": False} |
|
|
|
text = await response.text(encoding='utf-8') |
|
import json |
|
users_data = json.loads(text) |
|
|
|
has_next_page = len(users_data) > limit |
|
users = users_data[:limit] |
|
|
|
return { |
|
"users": users, |
|
"has_next_page": has_next_page |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Erro ao obter usuários: {str(e)}") |
|
return {"users": [], "has_next_page": False} |
|
|
|
@router.get("/admin/users") |
|
async def get_recent_users_endpoint( |
|
user_token: str = Header(None, alias="User-key"), |
|
limit: int = Query(50, ge=1, le=100), |
|
page: int = Query(0, ge=0), |
|
search: Optional[str] = Query(None) |
|
): |
|
""" |
|
Endpoint para obter os usuários mais recentes da plataforma, |
|
com suporte a busca por nome e paginação. |
|
""" |
|
try: |
|
|
|
user_info = await verify_token_with_permissions(user_token, "view_users") |
|
|
|
result = await get_recent_users(limit, search, page) |
|
|
|
return { |
|
"users": result["users"], |
|
"count": len(result["users"]), |
|
"has_next_page": result["has_next_page"] |
|
} |
|
|
|
except HTTPException as he: |
|
raise he |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Erro ao obter usuários: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@router.post("/admin/update-user") |
|
async def update_user( |
|
request: Dict[str, Any], |
|
user_token: str = Header(None, alias="User-key") |
|
): |
|
""" |
|
Endpoint para atualizar informações de um usuário específico. |
|
Atualmente suporta atualização de name e fee. |
|
Usa a chave de serviço para bypassar RLS do Supabase. |
|
""" |
|
try: |
|
|
|
user_info = await verify_token_with_permissions(user_token, "manage_users") |
|
admin_id = user_info["user_id"] |
|
|
|
|
|
user_id = request.get("user_id") |
|
name = request.get("name") |
|
fee = request.get("fee") |
|
|
|
if not user_id: |
|
raise HTTPException(status_code=400, detail="ID do usuário é obrigatório") |
|
|
|
if name is None and fee is None: |
|
raise HTTPException(status_code=400, detail="Pelo menos um campo para atualização (name ou fee) deve ser fornecido") |
|
|
|
|
|
user_query = f"{SUPABASE_URL}/rest/v1/User?select=name,fee&id=eq.{user_id}" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(user_query, headers=SUPABASE_HEADERS) as response: |
|
if response.status != 200: |
|
logger.error(f"❌ Erro ao verificar usuário: {response.status}") |
|
raise HTTPException(status_code=response.status, detail="Erro ao consultar usuário") |
|
|
|
user_data = await response.json() |
|
|
|
if not user_data: |
|
raise HTTPException(status_code=404, detail="Usuário não encontrado") |
|
|
|
current_user = user_data[0] |
|
|
|
|
|
update_fields = {} |
|
|
|
if name is not None and name != current_user.get("name"): |
|
update_fields["name"] = name |
|
|
|
if fee is not None and isinstance(fee, int) and fee != current_user.get("fee"): |
|
update_fields["fee"] = fee |
|
|
|
|
|
if not update_fields: |
|
return {"message": "Nenhuma alteração necessária", "user_id": user_id} |
|
|
|
|
|
update_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}" |
|
|
|
headers_with_prefer = SUPABASE_ROLE_HEADERS.copy() |
|
headers_with_prefer["Prefer"] = "return=representation" |
|
|
|
async with session.patch(update_url, json=update_fields, headers=headers_with_prefer) as update_response: |
|
if update_response.status != 200: |
|
logger.error(f"❌ Erro ao atualizar usuário: {update_response.status} - {await update_response.text()}") |
|
raise HTTPException(status_code=update_response.status, detail="Erro ao atualizar usuário") |
|
|
|
updated_user_data = await update_response.json() |
|
raw_user = updated_user_data[0] if updated_user_data else {} |
|
updated_user = { |
|
"name": raw_user.get("name"), |
|
"avatar": raw_user.get("avatar"), |
|
"role": raw_user.get("role"), |
|
"fee": raw_user.get("fee") |
|
} |
|
|
|
logger.info(f"✅ Usuário {user_id} atualizado com sucesso por {admin_id}: {update_fields}") |
|
|
|
return { |
|
"message": "Usuário atualizado com sucesso", |
|
"user": updated_user, |
|
"updated_by": admin_id |
|
} |
|
|
|
except HTTPException as he: |
|
raise he |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Erro ao atualizar usuário: {str(e)}") |
|
raise HTTPException(status_code=500, detail="Erro interno do servidor") |
|
|
|
@router.post("/admin/delete-feed") |
|
async def delete_feed( |
|
request: FeedDeleteRequest, |
|
user_token: str = Header(None, alias="User-key") |
|
): |
|
""" |
|
Deleta um feed, seus portfolios relacionados e as imagens no bucket. |
|
""" |
|
try: |
|
await verify_admin_token(user_token) |
|
|
|
feed_id = request.feed_id |
|
|
|
headers = SUPABASE_ROLE_HEADERS.copy() |
|
headers["Accept"] = "application/json" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
|
feed_query = f"{SUPABASE_URL}/rest/v1/Feeds?select=portfolios&limit=1&id=eq.{feed_id}" |
|
async with session.get(feed_query, headers=headers) as feed_response: |
|
if feed_response.status != 200: |
|
raise HTTPException(status_code=feed_response.status, detail="Erro ao buscar feed") |
|
|
|
feed_data = await feed_response.json() |
|
if not feed_data: |
|
raise HTTPException(status_code=404, detail="Feed não encontrado") |
|
|
|
portfolio_ids = feed_data[0].get("portfolios", []) |
|
if not isinstance(portfolio_ids, list): |
|
import json |
|
portfolio_ids = json.loads(portfolio_ids) |
|
|
|
|
|
image_urls = [] |
|
if portfolio_ids: |
|
ids_str = ",".join([str(pid) for pid in portfolio_ids]) |
|
portfolio_query = f"{SUPABASE_URL}/rest/v1/Portfolio?select=id,image_url&id=in.({ids_str})" |
|
async with session.get(portfolio_query, headers=headers) as portfolio_response: |
|
if portfolio_response.status == 200: |
|
portfolio_data = await portfolio_response.json() |
|
image_urls = [item["image_url"] for item in portfolio_data if "image_url" in item] |
|
|
|
|
|
if portfolio_ids: |
|
delete_portfolios_url = f"{SUPABASE_URL}/rest/v1/Portfolio?id=in.({','.join(map(str, portfolio_ids))})" |
|
async with session.delete(delete_portfolios_url, headers=headers) as delete_response: |
|
if delete_response.status != 204: |
|
raise HTTPException(status_code=delete_response.status, detail="Erro ao deletar portfolios") |
|
|
|
|
|
async def delete_image_from_storage(image_url: str): |
|
from urllib.parse import urlparse |
|
path = urlparse(image_url).path |
|
|
|
parts = path.strip("/").split("/") |
|
if len(parts) < 3: |
|
return |
|
bucket_id = parts[1] |
|
file_key = "/".join(parts[2:]) |
|
delete_url = f"{SUPABASE_URL}/storage/v1/object/{bucket_id}/{file_key}" |
|
async with session.delete(delete_url, headers=headers) as delete_img_response: |
|
if delete_img_response.status not in (200, 204): |
|
logger.warning(f"❌ Falha ao deletar imagem: {file_key}") |
|
|
|
await asyncio.gather(*[delete_image_from_storage(url) for url in image_urls]) |
|
|
|
|
|
delete_feed_url = f"{SUPABASE_URL}/rest/v1/Feeds?id=eq.{feed_id}" |
|
async with session.delete(delete_feed_url, headers=headers) as delete_feed_response: |
|
if delete_feed_response.status != 204: |
|
raise HTTPException(status_code=delete_feed_response.status, detail="Erro ao deletar feed") |
|
|
|
logger.info(f"✅ Feed {feed_id} e portfolios relacionados deletados com sucesso.") |
|
return {"message": f"Feed {feed_id} e portfolios deletados com sucesso."} |
|
|
|
except HTTPException as he: |
|
raise he |
|
except Exception as e: |
|
logger.error(f"❌ Erro ao deletar feed: {str(e)}") |
|
raise HTTPException(status_code=500, detail="Erro interno do servidor") |
|
|
|
@router.get("/admin/user") |
|
async def get_user_name( |
|
user_id: str = Query(..., description="ID do usuário"), |
|
user_token: str = Header(None, alias="User-key") |
|
): |
|
""" |
|
Endpoint para obter informações de um usuário específico a partir do ID, |
|
incluindo seus feeds e uma imagem de portfólio para cada feed. |
|
""" |
|
try: |
|
|
|
user_info = await verify_token_with_permissions(user_token, "view_users") |
|
|
|
|
|
user_query = f"{SUPABASE_URL}/rest/v1/User?select=name,avatar,role,fee&id=eq.{user_id}" |
|
|
|
|
|
feeds_query = f"{SUPABASE_URL}/rest/v1/Feeds?select=id,portfolios,created_at,description,urls,user_id&user_id=eq.{user_id}&order=created_at.desc" |
|
|
|
headers = SUPABASE_HEADERS.copy() |
|
headers["Accept"] = "application/json; charset=utf-8" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
|
user_task = session.get(user_query, headers=headers) |
|
feeds_task = session.get(feeds_query, headers=headers) |
|
|
|
async with user_task as user_response, feeds_task as feeds_response: |
|
if user_response.status != 200 or feeds_response.status != 200: |
|
status = user_response.status if user_response.status != 200 else feeds_response.status |
|
raise HTTPException(status_code=status, detail="Erro ao consultar o Supabase") |
|
|
|
user_data = await user_response.json() |
|
feeds_data = await feeds_response.json() |
|
|
|
if not user_data: |
|
raise HTTPException(status_code=404, detail="Usuário não encontrado") |
|
|
|
|
|
feeds_with_images = [] |
|
for feed in feeds_data: |
|
feed_info = { |
|
"id": feed["id"], |
|
"created_at": feed["created_at"], |
|
"description": feed["description"], |
|
"urls": feed["urls"], |
|
"portfolios": feed["portfolios"] |
|
} |
|
|
|
|
|
if feed["portfolios"] and len(feed["portfolios"]) > 0: |
|
|
|
first_portfolio_id = feed["portfolios"][0] |
|
|
|
|
|
portfolio_query = f"{SUPABASE_URL}/rest/v1/Portfolio?select=image_url,blurhash,width,height&id=eq.{first_portfolio_id}" |
|
|
|
async with session.get(portfolio_query, headers=headers) as portfolio_response: |
|
if portfolio_response.status == 200: |
|
portfolio_data = await portfolio_response.json() |
|
|
|
if portfolio_data and len(portfolio_data) > 0: |
|
feed_info["thumbnail"] = { |
|
"image_url": portfolio_data[0]["image_url"], |
|
"blurhash": portfolio_data[0]["blurhash"], |
|
"width": portfolio_data[0]["width"], |
|
"height": portfolio_data[0]["height"] |
|
} |
|
|
|
feeds_with_images.append(feed_info) |
|
|
|
|
|
response = { |
|
"user": user_data[0], |
|
"feeds": feeds_with_images, |
|
"feeds_count": len(feeds_with_images) |
|
} |
|
|
|
return response |
|
|
|
except HTTPException as he: |
|
raise he |
|
except Exception as e: |
|
logger.error(f"❌ Erro ao buscar usuário e feeds: {str(e)}") |
|
raise HTTPException(status_code=500, detail="Erro interno do servidor") |