connect / routes /logs.py
habulaj's picture
Update routes/logs.py
a4d58e5 verified
import os
import logging
import aiohttp
from datetime import datetime, timedelta
import pytz
from fastapi import APIRouter, HTTPException, Query, Header
from typing import Dict, Any
router = APIRouter()
# Supabase configs
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co"
SUPABASE_KEY = os.getenv("SUPA_KEY")
SUPABASE_ROLE_KEY = os.getenv("SUPA_SERVICE_KEY")
if not SUPABASE_KEY or not SUPABASE_ROLE_KEY:
raise ValueError("❌ SUPA_KEY ou SUPA_SERVICE_KEY não foram definidos no ambiente!")
SUPABASE_HEADERS = {
"apikey": SUPABASE_KEY,
"Authorization": f"Bearer {SUPABASE_KEY}",
"Content-Type": "application/json"
}
SUPABASE_ROLE_HEADERS = {
"apikey": SUPABASE_ROLE_KEY,
"Authorization": f"Bearer {SUPABASE_ROLE_KEY}",
"Content-Type": "application/json"
}
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Verificação de token e permissões
async def verify_token_with_permissions(user_token: str, required_permission: str = None) -> Dict[str, Any]:
headers = {
"Authorization": f"Bearer {user_token}",
"apikey": SUPABASE_KEY,
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response:
if response.status != 200:
raise HTTPException(status_code=401, detail="Token inválido ou expirado")
user_data = await response.json()
user_id = user_data.get("id")
if not user_id:
raise HTTPException(status_code=400, detail="ID do usuário não encontrado")
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=is_admin,edit_onboarding"
async with aiohttp.ClientSession() as session:
async with session.get(user_data_url, headers=SUPABASE_HEADERS) as response:
if response.status != 200 or not await response.json():
raise HTTPException(status_code=403, detail="Acesso negado: não foi possível verificar permissões")
user_info = (await response.json())[0]
is_admin = user_info.get("is_admin", False)
if not is_admin:
raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários")
if required_permission:
has_permission = user_info.get(required_permission, False)
if not has_permission:
raise HTTPException(
status_code=403,
detail=f"Acesso negado: permissão '{required_permission}' necessária"
)
return {
"user_id": user_id,
"is_admin": is_admin,
"permissions": user_info
}
# Rota para buscar logs
@router.get("/logs")
async def get_logs(
page: int = Query(0, ge=0),
user_token: str = Header(..., alias="User-key")
):
"""
Retorna uma lista paginada de logs com informações do usuário.
Requer permissão de admin.
Cada página contém no máximo 50 logs.
"""
try:
await verify_token_with_permissions(user_token)
limit = 50
offset = page * limit
query_url = f"{SUPABASE_URL}/rest/v1/Logs?order=created_at.desc&limit={limit}&offset={offset}&select=id,user,action,reference,old_data,new_data,created_at"
async with aiohttp.ClientSession() as session:
async with session.get(query_url, headers=SUPABASE_HEADERS) as response:
if response.status != 200:
detail = await response.text()
logger.error(f"❌ Erro ao buscar logs: {detail}")
raise HTTPException(status_code=response.status, detail="Erro ao buscar logs")
logs = await response.json()
if not logs:
return {
"logs": [],
"page": page,
"has_next": False
}
# Buscar dados dos usuários
user_ids = list({log["user"] for log in logs if log.get("user")})
user_info_map = {}
if user_ids:
user_ids_query = ",".join(f'"{uid}"' for uid in user_ids)
users_url = f"{SUPABASE_URL}/rest/v1/User?id=in.({user_ids_query})&select=id,email,avatar,name"
async with aiohttp.ClientSession() as session:
async with session.get(users_url, headers=SUPABASE_HEADERS) as response:
if response.status == 200:
users_data = await response.json()
user_info_map = {user["id"]: user for user in users_data}
else:
logger.warning("⚠️ Erro ao buscar dados dos usuários")
def format_relative_date(iso_date: str) -> str:
ny_tz = pytz.timezone("America/New_York")
try:
created_dt = datetime.fromisoformat(iso_date)
created_dt = created_dt.astimezone(ny_tz)
except Exception:
return iso_date
now = datetime.now(ny_tz)
diff = now - created_dt
if diff < timedelta(seconds=60):
return "just now"
elif diff < timedelta(hours=1):
minutes = int(diff.total_seconds() // 60)
return f"{minutes} minute{'s' if minutes != 1 else ''} ago"
elif diff < timedelta(hours=24):
hours = int(diff.total_seconds() // 3600)
return f"{hours} hour{'s' if hours != 1 else ''} ago"
elif (now.date() - created_dt.date()).days == 1:
return f"Yesterday at {created_dt.strftime('%I:%M %p').lstrip('0')}"
else:
return created_dt.strftime("%m/%d/%Y")
# Função para gerar mensagem descritiva
def generate_message(log, user_info):
action = log.get("action")
reference = log.get("reference")
old_data = log.get("old_data") or {}
new_data = log.get("new_data") or {}
if isinstance(old_data, str):
try:
old_data = json.loads(old_data)
except Exception:
old_data = {}
if isinstance(new_data, str):
try:
new_data = json.loads(new_data)
except Exception:
new_data = {}
email = user_info.get("email", "Someone")
target_name = None
if reference == "User":
target_name = old_data.get("name") or old_data.get("email") or new_data.get("name") or new_data.get("email")
elif reference == "Approval":
target_name = old_data.get("name")
elif reference == "Collaboration":
target_name = new_data.get("email") or old_data.get("email")
elif reference == "Onboarding":
target_name = None
if action == "update" and reference == "User":
return f"{email} updated profile information of {target_name or 'a user'}"
elif action == "approve" and reference == "Approval":
return f"{email} approved stylist {target_name or ''}"
elif action == "deny" and reference == "Approval":
return f"{email} denied stylist {target_name or ''}"
elif action == "update" and reference == "Onboarding":
return f"{email} updated an onboarding question"
elif action == "add" and reference == "Onboarding":
return f"{email} added a new onboarding question"
elif action == "delete" and reference == "Onboarding":
return f"{email} deleted an onboarding question"
elif action == "add" and reference == "Collaboration":
return f"{email} added {target_name or 'a collaborator'} to the team"
elif action == "delete" and reference == "Collaboration":
return f"{email} removed {target_name or 'a collaborator'} from the team"
elif action == "update" and reference == "Collaboration":
return f"{email} updated permissions of {target_name or 'a collaborator'}"
elif action == "update" and reference == "Tickets":
transferred_to = new_data.get("agent_email")
if transferred_to:
return f"{email} transferred a ticket to {transferred_to}"
elif action == "delete" and reference == "Tickets":
return f"{email} closed a ticket"
return f"{email} performed action: {action} on {reference}"
# Enriquecer logs com mensagens e datas formatadas
enriched_logs = []
for log in logs:
user_raw = user_info_map.get(log["user"], {})
enriched_logs.append({
"id": log["id"],
"user": {
"id": user_raw.get("id"),
"email": user_raw.get("email"),
"avatar": user_raw.get("avatar")
},
"action": log["action"],
"reference": log["reference"],
"old_data": log["old_data"],
"new_data": log["new_data"],
"created_at": log["created_at"],
"message": generate_message(log, user_raw),
"formatted_date": format_relative_date(log["created_at"])
})
has_next = len(logs) == limit
return {
"logs": enriched_logs,
"page": page,
"has_next": has_next
}
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"❌ Erro interno ao buscar logs: {str(e)}")
raise HTTPException(status_code=500, detail="Erro interno do servidor")