import os import logging import aiohttp from datetime import datetime, timedelta import pytz from fastapi import APIRouter, HTTPException, Query, Header from typing import Dict, Any router = APIRouter() # Supabase configs SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co" SUPABASE_KEY = os.getenv("SUPA_KEY") SUPABASE_ROLE_KEY = os.getenv("SUPA_SERVICE_KEY") if not SUPABASE_KEY or not SUPABASE_ROLE_KEY: raise ValueError("❌ SUPA_KEY ou SUPA_SERVICE_KEY não foram definidos no ambiente!") SUPABASE_HEADERS = { "apikey": SUPABASE_KEY, "Authorization": f"Bearer {SUPABASE_KEY}", "Content-Type": "application/json" } SUPABASE_ROLE_HEADERS = { "apikey": SUPABASE_ROLE_KEY, "Authorization": f"Bearer {SUPABASE_ROLE_KEY}", "Content-Type": "application/json" } # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Verificação de token e permissões async def verify_token_with_permissions(user_token: str, required_permission: str = None) -> Dict[str, Any]: headers = { "Authorization": f"Bearer {user_token}", "apikey": SUPABASE_KEY, "Content-Type": "application/json" } async with aiohttp.ClientSession() as session: async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response: if response.status != 200: raise HTTPException(status_code=401, detail="Token inválido ou expirado") user_data = await response.json() user_id = user_data.get("id") if not user_id: raise HTTPException(status_code=400, detail="ID do usuário não encontrado") user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=is_admin,edit_onboarding" async with aiohttp.ClientSession() as session: async with session.get(user_data_url, headers=SUPABASE_HEADERS) as response: if response.status != 200 or not await response.json(): raise HTTPException(status_code=403, detail="Acesso negado: não foi possível verificar permissões") user_info = (await response.json())[0] is_admin = user_info.get("is_admin", False) if not is_admin: raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários") if required_permission: has_permission = user_info.get(required_permission, False) if not has_permission: raise HTTPException( status_code=403, detail=f"Acesso negado: permissão '{required_permission}' necessária" ) return { "user_id": user_id, "is_admin": is_admin, "permissions": user_info } # Rota para buscar logs @router.get("/logs") async def get_logs( page: int = Query(0, ge=0), user_token: str = Header(..., alias="User-key") ): """ Retorna uma lista paginada de logs com informações do usuário. Requer permissão de admin. Cada página contém no máximo 50 logs. """ try: await verify_token_with_permissions(user_token) limit = 50 offset = page * limit query_url = f"{SUPABASE_URL}/rest/v1/Logs?order=created_at.desc&limit={limit}&offset={offset}&select=id,user,action,reference,old_data,new_data,created_at" async with aiohttp.ClientSession() as session: async with session.get(query_url, headers=SUPABASE_HEADERS) as response: if response.status != 200: detail = await response.text() logger.error(f"❌ Erro ao buscar logs: {detail}") raise HTTPException(status_code=response.status, detail="Erro ao buscar logs") logs = await response.json() if not logs: return { "logs": [], "page": page, "has_next": False } # Buscar dados dos usuários user_ids = list({log["user"] for log in logs if log.get("user")}) user_info_map = {} if user_ids: user_ids_query = ",".join(f'"{uid}"' for uid in user_ids) users_url = f"{SUPABASE_URL}/rest/v1/User?id=in.({user_ids_query})&select=id,email,avatar,name" async with aiohttp.ClientSession() as session: async with session.get(users_url, headers=SUPABASE_HEADERS) as response: if response.status == 200: users_data = await response.json() user_info_map = {user["id"]: user for user in users_data} else: logger.warning("⚠️ Erro ao buscar dados dos usuários") def format_relative_date(iso_date: str) -> str: ny_tz = pytz.timezone("America/New_York") try: created_dt = datetime.fromisoformat(iso_date) created_dt = created_dt.astimezone(ny_tz) except Exception: return iso_date now = datetime.now(ny_tz) diff = now - created_dt if diff < timedelta(seconds=60): return "just now" elif diff < timedelta(hours=1): minutes = int(diff.total_seconds() // 60) return f"{minutes} minute{'s' if minutes != 1 else ''} ago" elif diff < timedelta(hours=24): hours = int(diff.total_seconds() // 3600) return f"{hours} hour{'s' if hours != 1 else ''} ago" elif (now.date() - created_dt.date()).days == 1: return f"Yesterday at {created_dt.strftime('%I:%M %p').lstrip('0')}" else: return created_dt.strftime("%m/%d/%Y") # Função para gerar mensagem descritiva def generate_message(log, user_info): action = log.get("action") reference = log.get("reference") old_data = log.get("old_data") or {} new_data = log.get("new_data") or {} if isinstance(old_data, str): try: old_data = json.loads(old_data) except Exception: old_data = {} if isinstance(new_data, str): try: new_data = json.loads(new_data) except Exception: new_data = {} email = user_info.get("email", "Someone") target_name = None if reference == "User": target_name = old_data.get("name") or old_data.get("email") or new_data.get("name") or new_data.get("email") elif reference == "Approval": target_name = old_data.get("name") elif reference == "Collaboration": target_name = new_data.get("email") or old_data.get("email") elif reference == "Onboarding": target_name = None if action == "update" and reference == "User": return f"{email} updated profile information of {target_name or 'a user'}" elif action == "approve" and reference == "Approval": return f"{email} approved stylist {target_name or ''}" elif action == "deny" and reference == "Approval": return f"{email} denied stylist {target_name or ''}" elif action == "update" and reference == "Onboarding": return f"{email} updated an onboarding question" elif action == "add" and reference == "Onboarding": return f"{email} added a new onboarding question" elif action == "delete" and reference == "Onboarding": return f"{email} deleted an onboarding question" elif action == "add" and reference == "Collaboration": return f"{email} added {target_name or 'a collaborator'} to the team" elif action == "delete" and reference == "Collaboration": return f"{email} removed {target_name or 'a collaborator'} from the team" elif action == "update" and reference == "Collaboration": return f"{email} updated permissions of {target_name or 'a collaborator'}" elif action == "update" and reference == "Tickets": transferred_to = new_data.get("agent_email") if transferred_to: return f"{email} transferred a ticket to {transferred_to}" elif action == "delete" and reference == "Tickets": return f"{email} closed a ticket" return f"{email} performed action: {action} on {reference}" # Enriquecer logs com mensagens e datas formatadas enriched_logs = [] for log in logs: user_raw = user_info_map.get(log["user"], {}) enriched_logs.append({ "id": log["id"], "user": { "id": user_raw.get("id"), "email": user_raw.get("email"), "avatar": user_raw.get("avatar") }, "action": log["action"], "reference": log["reference"], "old_data": log["old_data"], "new_data": log["new_data"], "created_at": log["created_at"], "message": generate_message(log, user_raw), "formatted_date": format_relative_date(log["created_at"]) }) has_next = len(logs) == limit return { "logs": enriched_logs, "page": page, "has_next": has_next } except HTTPException as he: raise he except Exception as e: logger.error(f"❌ Erro interno ao buscar logs: {str(e)}") raise HTTPException(status_code=500, detail="Erro interno do servidor")