habulaj commited on
Commit
8fb497a
·
verified ·
1 Parent(s): 9b1d7c0

Update routes/logs.py

Browse files
Files changed (1) hide show
  1. routes/logs.py +78 -15
routes/logs.py CHANGED
@@ -1,13 +1,80 @@
 
1
  import logging
2
  import aiohttp
3
  from fastapi import APIRouter, HTTPException, Query, Header
4
- from typing import List, Dict, Any
5
-
6
- from . import SUPABASE_URL, SUPABASE_HEADERS, verify_token_with_permissions
7
 
8
  router = APIRouter()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  logger = logging.getLogger(__name__)
10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  @router.get("/logs")
12
  async def get_logs(
13
  page: int = Query(0, ge=0),
@@ -19,7 +86,6 @@ async def get_logs(
19
  Cada página contém no máximo 50 logs.
20
  """
21
  try:
22
- # Verifica permissão de administrador
23
  await verify_token_with_permissions(user_token)
24
 
25
  limit = 50
@@ -36,7 +102,6 @@ async def get_logs(
36
 
37
  logs = await response.json()
38
 
39
- # Se não houver registros, retorna lista vazia
40
  if not logs:
41
  return {
42
  "logs": [],
@@ -44,24 +109,22 @@ async def get_logs(
44
  "has_next": False
45
  }
46
 
47
- # Coletar todos os user_ids distintos
48
  user_ids = list({log["user"] for log in logs if log.get("user")})
49
- if not user_ids:
50
- user_info_map = {}
51
- else:
52
  user_ids_query = ",".join(f'"{uid}"' for uid in user_ids)
53
  users_url = f"{SUPABASE_URL}/rest/v1/User?id=in.({user_ids_query})&select=id,name,avatar"
54
 
55
  async with aiohttp.ClientSession() as session:
56
  async with session.get(users_url, headers=SUPABASE_HEADERS) as response:
57
- if response.status != 200:
58
- logger.warning("⚠️ Erro ao buscar dados dos usuários")
59
- user_info_map = {}
60
- else:
61
  users_data = await response.json()
62
  user_info_map = {user["id"]: {"name": user["name"], "avatar": user["avatar"]} for user in users_data}
 
 
63
 
64
- # Montar resposta com user_info
65
  enriched_logs = []
66
  for log in logs:
67
  user_data = user_info_map.get(log["user"], {"name": None, "avatar": None})
@@ -91,4 +154,4 @@ async def get_logs(
91
  raise he
92
  except Exception as e:
93
  logger.error(f"❌ Erro interno ao buscar logs: {str(e)}")
94
- raise HTTPException(status_code=500, detail="Erro interno do servidor")
 
1
+ import os
2
  import logging
3
  import aiohttp
4
  from fastapi import APIRouter, HTTPException, Query, Header
5
+ from typing import Dict, Any
 
 
6
 
7
  router = APIRouter()
8
+
9
+ # Supabase configs
10
+ SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co"
11
+ SUPABASE_KEY = os.getenv("SUPA_KEY")
12
+ SUPABASE_ROLE_KEY = os.getenv("SUPA_SERVICE_KEY")
13
+
14
+ if not SUPABASE_KEY or not SUPABASE_ROLE_KEY:
15
+ raise ValueError("❌ SUPA_KEY ou SUPA_SERVICE_KEY não foram definidos no ambiente!")
16
+
17
+ SUPABASE_HEADERS = {
18
+ "apikey": SUPABASE_KEY,
19
+ "Authorization": f"Bearer {SUPABASE_KEY}",
20
+ "Content-Type": "application/json"
21
+ }
22
+
23
+ SUPABASE_ROLE_HEADERS = {
24
+ "apikey": SUPABASE_ROLE_KEY,
25
+ "Authorization": f"Bearer {SUPABASE_ROLE_KEY}",
26
+ "Content-Type": "application/json"
27
+ }
28
+
29
+ # Logging
30
+ logging.basicConfig(level=logging.INFO)
31
  logger = logging.getLogger(__name__)
32
 
33
+ # Verificação de token e permissões
34
+ async def verify_token_with_permissions(user_token: str, required_permission: str = None) -> Dict[str, Any]:
35
+ headers = {
36
+ "Authorization": f"Bearer {user_token}",
37
+ "apikey": SUPABASE_KEY,
38
+ "Content-Type": "application/json"
39
+ }
40
+
41
+ async with aiohttp.ClientSession() as session:
42
+ async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response:
43
+ if response.status != 200:
44
+ raise HTTPException(status_code=401, detail="Token inválido ou expirado")
45
+
46
+ user_data = await response.json()
47
+ user_id = user_data.get("id")
48
+ if not user_id:
49
+ raise HTTPException(status_code=400, detail="ID do usuário não encontrado")
50
+
51
+ user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=is_admin,edit_onboarding"
52
+ async with aiohttp.ClientSession() as session:
53
+ async with session.get(user_data_url, headers=SUPABASE_HEADERS) as response:
54
+ if response.status != 200 or not await response.json():
55
+ raise HTTPException(status_code=403, detail="Acesso negado: não foi possível verificar permissões")
56
+
57
+ user_info = (await response.json())[0]
58
+ is_admin = user_info.get("is_admin", False)
59
+
60
+ if not is_admin:
61
+ raise HTTPException(status_code=403, detail="Acesso negado: privilégios de administrador necessários")
62
+
63
+ if required_permission:
64
+ has_permission = user_info.get(required_permission, False)
65
+ if not has_permission:
66
+ raise HTTPException(
67
+ status_code=403,
68
+ detail=f"Acesso negado: permissão '{required_permission}' necessária"
69
+ )
70
+
71
+ return {
72
+ "user_id": user_id,
73
+ "is_admin": is_admin,
74
+ "permissions": user_info
75
+ }
76
+
77
+ # Rota para buscar logs
78
  @router.get("/logs")
79
  async def get_logs(
80
  page: int = Query(0, ge=0),
 
86
  Cada página contém no máximo 50 logs.
87
  """
88
  try:
 
89
  await verify_token_with_permissions(user_token)
90
 
91
  limit = 50
 
102
 
103
  logs = await response.json()
104
 
 
105
  if not logs:
106
  return {
107
  "logs": [],
 
109
  "has_next": False
110
  }
111
 
112
+ # Pegar dados dos usuários responsáveis pelos logs
113
  user_ids = list({log["user"] for log in logs if log.get("user")})
114
+ user_info_map = {}
115
+
116
+ if user_ids:
117
  user_ids_query = ",".join(f'"{uid}"' for uid in user_ids)
118
  users_url = f"{SUPABASE_URL}/rest/v1/User?id=in.({user_ids_query})&select=id,name,avatar"
119
 
120
  async with aiohttp.ClientSession() as session:
121
  async with session.get(users_url, headers=SUPABASE_HEADERS) as response:
122
+ if response.status == 200:
 
 
 
123
  users_data = await response.json()
124
  user_info_map = {user["id"]: {"name": user["name"], "avatar": user["avatar"]} for user in users_data}
125
+ else:
126
+ logger.warning("⚠️ Erro ao buscar dados dos usuários")
127
 
 
128
  enriched_logs = []
129
  for log in logs:
130
  user_data = user_info_map.get(log["user"], {"name": None, "avatar": None})
 
154
  raise he
155
  except Exception as e:
156
  logger.error(f"❌ Erro interno ao buscar logs: {str(e)}")
157
+ raise HTTPException(status_code=500, detail="Erro interno do servidor")