|
import os |
|
import logging |
|
import aiohttp |
|
from datetime import datetime, timedelta |
|
import pytz |
|
from fastapi import APIRouter, HTTPException, Query, Header |
|
from typing import Dict, Any |
|
|
|
router = APIRouter() |
|
|
|
|
|
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co" |
|
SUPABASE_KEY = os.getenv("SUPA_KEY") |
|
SUPABASE_ROLE_KEY = os.getenv("SUPA_SERVICE_KEY") |
|
|
|
if not SUPABASE_KEY or not SUPABASE_ROLE_KEY: |
|
raise ValueError("❌ SUPA_KEY ou SUPA_SERVICE_KEY não foram definidos no ambiente!") |
|
|
|
SUPABASE_HEADERS = { |
|
"apikey": SUPABASE_KEY, |
|
"Authorization": f"Bearer {SUPABASE_KEY}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
SUPABASE_ROLE_HEADERS = { |
|
"apikey": SUPABASE_ROLE_KEY, |
|
"Authorization": f"Bearer {SUPABASE_ROLE_KEY}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
async def verify_token_with_permissions(user_token: str, required_permission: str = None) -> Dict[str, Any]: |
|
headers = { |
|
"Authorization": f"Bearer {user_token}", |
|
"apikey": SUPABASE_KEY, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=401, detail="Token inválido ou expirado") |
|
|
|
user_data = await response.json() |
|
user_id = user_data.get("id") |
|
if not user_id: |
|
raise HTTPException(status_code=400, detail="ID do usuário não encontrado") |
|
|
|
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=is_admin,edit_onboarding" |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(user_data_url, headers=SUPABASE_HEADERS) as response: |
|
if response.status != 200 or not await response.json(): |
|
raise HTTPException(status_code=403, detail="Acesso negado: não foi possível verificar permissões") |
|
|
|
user_info = (await response.json())[0] |
|
is_admin = user_info.get("is_admin", False) |
|
|
|
if not is_admin: |
|
raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários") |
|
|
|
if required_permission: |
|
has_permission = user_info.get(required_permission, False) |
|
if not has_permission: |
|
raise HTTPException( |
|
status_code=403, |
|
detail=f"Acesso negado: permissão '{required_permission}' necessária" |
|
) |
|
|
|
return { |
|
"user_id": user_id, |
|
"is_admin": is_admin, |
|
"permissions": user_info |
|
} |
|
|
|
|
|
@router.get("/logs") |
|
async def get_logs( |
|
page: int = Query(0, ge=0), |
|
user_token: str = Header(..., alias="User-key") |
|
): |
|
""" |
|
Retorna uma lista paginada de logs com informações do usuário. |
|
Requer permissão de admin. |
|
Cada página contém no máximo 50 logs. |
|
""" |
|
try: |
|
await verify_token_with_permissions(user_token) |
|
|
|
limit = 50 |
|
offset = page * limit |
|
|
|
query_url = f"{SUPABASE_URL}/rest/v1/Logs?order=created_at.desc&limit={limit}&offset={offset}&select=id,user,action,reference,old_data,new_data,created_at" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(query_url, headers=SUPABASE_HEADERS) as response: |
|
if response.status != 200: |
|
detail = await response.text() |
|
logger.error(f"❌ Erro ao buscar logs: {detail}") |
|
raise HTTPException(status_code=response.status, detail="Erro ao buscar logs") |
|
|
|
logs = await response.json() |
|
|
|
if not logs: |
|
return { |
|
"logs": [], |
|
"page": page, |
|
"has_next": False |
|
} |
|
|
|
|
|
user_ids = list({log["user"] for log in logs if log.get("user")}) |
|
user_info_map = {} |
|
|
|
if user_ids: |
|
user_ids_query = ",".join(f'"{uid}"' for uid in user_ids) |
|
users_url = f"{SUPABASE_URL}/rest/v1/User?id=in.({user_ids_query})&select=id,email,avatar,name" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(users_url, headers=SUPABASE_HEADERS) as response: |
|
if response.status == 200: |
|
users_data = await response.json() |
|
user_info_map = {user["id"]: user for user in users_data} |
|
else: |
|
logger.warning("⚠️ Erro ao buscar dados dos usuários") |
|
|
|
|
|
def format_relative_date(iso_date: str) -> str: |
|
ny_tz = pytz.timezone("America/New_York") |
|
try: |
|
created_dt = datetime.fromisoformat(iso_date) |
|
created_dt = created_dt.astimezone(ny_tz) |
|
except Exception: |
|
return iso_date |
|
|
|
now = datetime.now(ny_tz) |
|
diff = now - created_dt |
|
|
|
if diff < timedelta(seconds=60): |
|
return "just now" |
|
elif diff < timedelta(hours=1): |
|
minutes = int(diff.total_seconds() // 60) |
|
return f"{minutes} minute{'s' if minutes != 1 else ''} ago" |
|
elif diff < timedelta(hours=24): |
|
hours = int(diff.total_seconds() // 3600) |
|
return f"{hours} hour{'s' if hours != 1 else ''} ago" |
|
elif (now.date() - created_dt.date()).days == 1: |
|
return f"Yesterday at {created_dt.strftime('%I:%M %p').lstrip('0')}" |
|
else: |
|
return created_dt.strftime("%m/%d/%Y") |
|
|
|
|
|
def generate_message(log, user_info): |
|
action = log.get("action") |
|
reference = log.get("reference") |
|
old_data = log.get("old_data") or {} |
|
new_data = log.get("new_data") or {} |
|
|
|
if isinstance(old_data, str): |
|
try: |
|
old_data = json.loads(old_data) |
|
except Exception: |
|
old_data = {} |
|
if isinstance(new_data, str): |
|
try: |
|
new_data = json.loads(new_data) |
|
except Exception: |
|
new_data = {} |
|
|
|
email = user_info.get("email", "Someone") |
|
target_name = None |
|
|
|
if reference == "User": |
|
target_name = old_data.get("name") or old_data.get("email") or new_data.get("name") or new_data.get("email") |
|
elif reference == "Approval": |
|
target_name = old_data.get("name") |
|
elif reference == "Collaboration": |
|
target_name = new_data.get("email") or old_data.get("email") |
|
elif reference == "Onboarding": |
|
target_name = None |
|
|
|
if action == "update" and reference == "User": |
|
return f"{email} updated profile information of {target_name or 'a user'}" |
|
elif action == "approve" and reference == "Approval": |
|
return f"{email} approved stylist {target_name or ''}" |
|
elif action == "deny" and reference == "Approval": |
|
return f"{email} denied stylist {target_name or ''}" |
|
elif action == "update" and reference == "Onboarding": |
|
return f"{email} updated an onboarding question" |
|
elif action == "add" and reference == "Onboarding": |
|
return f"{email} added a new onboarding question" |
|
elif action == "delete" and reference == "Onboarding": |
|
return f"{email} deleted an onboarding question" |
|
elif action == "add" and reference == "Collaboration": |
|
return f"{email} added {target_name or 'a collaborator'} to the team" |
|
elif action == "delete" and reference == "Collaboration": |
|
return f"{email} removed {target_name or 'a collaborator'} from the team" |
|
elif action == "update" and reference == "Collaboration": |
|
return f"{email} updated permissions of {target_name or 'a collaborator'}" |
|
elif action == "update" and reference == "Tickets": |
|
transferred_to = new_data.get("agent_email") |
|
if transferred_to: |
|
return f"{email} transferred a ticket to {transferred_to}" |
|
elif action == "delete" and reference == "Tickets": |
|
return f"{email} closed a ticket" |
|
|
|
return f"{email} performed action: {action} on {reference}" |
|
|
|
|
|
enriched_logs = [] |
|
for log in logs: |
|
user_raw = user_info_map.get(log["user"], {}) |
|
enriched_logs.append({ |
|
"id": log["id"], |
|
"user": { |
|
"id": user_raw.get("id"), |
|
"email": user_raw.get("email"), |
|
"avatar": user_raw.get("avatar") |
|
}, |
|
"action": log["action"], |
|
"reference": log["reference"], |
|
"old_data": log["old_data"], |
|
"new_data": log["new_data"], |
|
"created_at": log["created_at"], |
|
"message": generate_message(log, user_raw), |
|
"formatted_date": format_relative_date(log["created_at"]) |
|
}) |
|
|
|
has_next = len(logs) == limit |
|
|
|
return { |
|
"logs": enriched_logs, |
|
"page": page, |
|
"has_next": has_next |
|
} |
|
|
|
except HTTPException as he: |
|
raise he |
|
except Exception as e: |
|
logger.error(f"❌ Erro interno ao buscar logs: {str(e)}") |
|
raise HTTPException(status_code=500, detail="Erro interno do servidor") |