habulaj commited on
Commit
037071f
·
verified ·
1 Parent(s): 3a59e86

Update routes/users.py

Browse files
Files changed (1) hide show
  1. routes/users.py +68 -6
routes/users.py CHANGED
@@ -50,6 +50,23 @@ def get_cached_admin_status(user_id: str) -> bool:
50
  user_info = response.json()[0]
51
  return user_info.get("is_admin", False)
52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
  async def verify_admin_token(user_token: str) -> str:
54
  """Verifica se o token pertence a um administrador de forma assíncrona"""
55
  headers = {
@@ -75,13 +92,54 @@ async def verify_admin_token(user_token: str) -> str:
75
 
76
  return user_id
77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
  async def get_recent_users(limit: int = 50, search: Optional[str] = None, page: int = 0) -> Dict[str, Any]:
79
  """Obtém os usuários mais recentes da plataforma, com filtro opcional por nome e paginação"""
80
  try:
81
  offset = page * limit
82
  limit_plus_one = limit + 1
83
 
84
- query = f"{SUPABASE_URL}/rest/v1/User?select=id,name,avatar,role,blurhash&order=created_at.desc&limit={limit_plus_one}&offset={offset}"
 
85
 
86
  if search:
87
  search_term = search.replace("'", "''")
@@ -124,7 +182,9 @@ async def get_recent_users_endpoint(
124
  com suporte a busca por nome e paginação.
125
  """
126
  try:
127
- user_id = await verify_admin_token(user_token)
 
 
128
  result = await get_recent_users(limit, search, page)
129
 
130
  return {
@@ -151,8 +211,9 @@ async def update_user(
151
  Usa a chave de serviço para bypassar RLS do Supabase.
152
  """
153
  try:
154
- # Verificar se o usuário é um administrador
155
- admin_id = await verify_admin_token(user_token)
 
156
 
157
  # Validar os parâmetros da requisição
158
  user_id = request.get("user_id")
@@ -232,7 +293,7 @@ async def delete_feed(
232
  try:
233
  await verify_admin_token(user_token)
234
 
235
- feed_id = request.feed_id # <- AQUI!
236
 
237
  headers = SUPABASE_ROLE_HEADERS.copy()
238
  headers["Accept"] = "application/json"
@@ -312,7 +373,8 @@ async def get_user_name(
312
  incluindo seus feeds e uma imagem de portfólio para cada feed.
313
  """
314
  try:
315
- await verify_admin_token(user_token)
 
316
 
317
  # Buscar informações básicas do usuário
318
  user_query = f"{SUPABASE_URL}/rest/v1/User?select=name,avatar,role,fee&id=eq.{user_id}"
 
50
  user_info = response.json()[0]
51
  return user_info.get("is_admin", False)
52
 
53
+ # Nova função para verificar permissões de usuário
54
+ async def get_user_permissions(user_id: str) -> Dict[str, bool]:
55
+ """Obtém as permissões de um usuário"""
56
+ user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=is_admin,view_users,manage_users"
57
+
58
+ async with aiohttp.ClientSession() as session:
59
+ async with session.get(user_data_url, headers=SUPABASE_HEADERS) as response:
60
+ if response.status != 200 or not await response.json():
61
+ return {"is_admin": False, "view_users": False, "manage_users": False}
62
+
63
+ user_info = (await response.json())[0]
64
+ return {
65
+ "is_admin": user_info.get("is_admin", False),
66
+ "view_users": user_info.get("view_users", False),
67
+ "manage_users": user_info.get("manage_users", False)
68
+ }
69
+
70
  async def verify_admin_token(user_token: str) -> str:
71
  """Verifica se o token pertence a um administrador de forma assíncrona"""
72
  headers = {
 
92
 
93
  return user_id
94
 
95
+ # Nova função para verificar token e retornar ID do usuário e permissões
96
+ async def verify_token_with_permissions(user_token: str, required_permission: Optional[str] = None) -> Dict[str, Any]:
97
+ """Verifica o token e retorna ID do usuário e suas permissões"""
98
+ headers = {
99
+ "Authorization": f"Bearer {user_token}",
100
+ "apikey": SUPABASE_KEY,
101
+ "Content-Type": "application/json"
102
+ }
103
+
104
+ async with aiohttp.ClientSession() as session:
105
+ async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response:
106
+ if response.status != 200:
107
+ raise HTTPException(status_code=401, detail="Token inválido ou expirado")
108
+
109
+ user_data = await response.json()
110
+ user_id = user_data.get("id")
111
+ if not user_id:
112
+ raise HTTPException(status_code=400, detail="ID do usuário não encontrado")
113
+
114
+ # Obter permissões do usuário
115
+ permissions = await get_user_permissions(user_id)
116
+
117
+ # Verificar se é admin
118
+ is_admin = permissions.get("is_admin", False)
119
+
120
+ # Verificar permissão específica, se requisitada
121
+ if required_permission:
122
+ has_permission = permissions.get(required_permission, False)
123
+ if not has_permission:
124
+ raise HTTPException(
125
+ status_code=403,
126
+ detail=f"Acesso negado: permissão '{required_permission}' necessária"
127
+ )
128
+
129
+ return {
130
+ "user_id": user_id,
131
+ "is_admin": is_admin,
132
+ "permissions": permissions
133
+ }
134
+
135
  async def get_recent_users(limit: int = 50, search: Optional[str] = None, page: int = 0) -> Dict[str, Any]:
136
  """Obtém os usuários mais recentes da plataforma, com filtro opcional por nome e paginação"""
137
  try:
138
  offset = page * limit
139
  limit_plus_one = limit + 1
140
 
141
+ # Adicionada condição para filtrar apenas usuários não-admin
142
+ query = f"{SUPABASE_URL}/rest/v1/User?select=id,name,avatar,role,blurhash&is_admin=eq.false&order=created_at.desc&limit={limit_plus_one}&offset={offset}"
143
 
144
  if search:
145
  search_term = search.replace("'", "''")
 
182
  com suporte a busca por nome e paginação.
183
  """
184
  try:
185
+ # Verificar se o usuário tem permissão para visualizar usuários
186
+ user_info = await verify_token_with_permissions(user_token, "view_users")
187
+
188
  result = await get_recent_users(limit, search, page)
189
 
190
  return {
 
211
  Usa a chave de serviço para bypassar RLS do Supabase.
212
  """
213
  try:
214
+ # Verificar se o usuário tem permissão para gerenciar usuários
215
+ user_info = await verify_token_with_permissions(user_token, "manage_users")
216
+ admin_id = user_info["user_id"]
217
 
218
  # Validar os parâmetros da requisição
219
  user_id = request.get("user_id")
 
293
  try:
294
  await verify_admin_token(user_token)
295
 
296
+ feed_id = request.feed_id
297
 
298
  headers = SUPABASE_ROLE_HEADERS.copy()
299
  headers["Accept"] = "application/json"
 
373
  incluindo seus feeds e uma imagem de portfólio para cada feed.
374
  """
375
  try:
376
+ # Verificar se o usuário tem permissão para visualizar usuários
377
+ user_info = await verify_token_with_permissions(user_token, "view_users")
378
 
379
  # Buscar informações básicas do usuário
380
  user_query = f"{SUPABASE_URL}/rest/v1/User?select=name,avatar,role,fee&id=eq.{user_id}"