Ananthakr1shnan commited on
Commit
6fef50e
·
verified ·
1 Parent(s): 7713855

Update src/components/auth.py

Browse files
Files changed (1) hide show
  1. src/components/auth.py +160 -227
src/components/auth.py CHANGED
@@ -1,5 +1,5 @@
1
  """
2
- Common issues and fixes for authentication errors in Hugging Face Spaces deployment
3
  """
4
 
5
  import jwt
@@ -8,119 +8,116 @@ import json
8
  import os
9
  from datetime import datetime, timedelta
10
  from typing import Optional, Dict, Any
11
- from functools import wraps
12
-
13
- # Import Flask components only when available
14
- try:
15
- from flask import request, jsonify, session, redirect, url_for
16
- FLASK_AVAILABLE = True
17
- except ImportError:
18
- FLASK_AVAILABLE = False
19
- request = None
20
- jsonify = None
21
- session = None
22
- redirect = None
23
- url_for = None
24
 
25
  class AuthManager:
26
  def __init__(self, secret_key: str = None):
27
  self.secret_key = secret_key or os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production')
28
 
29
- # HUGGING FACE FIX: Use absolute paths or ensure directory exists
30
- self.base_dir = os.path.abspath(os.path.dirname(__file__))
31
- self.data_dir = os.path.join(self.base_dir, 'data')
32
- self.users_file = os.path.join(self.data_dir, 'users.json')
33
- self.session_file = os.path.join(self.data_dir, 'active_sessions.json')
 
 
 
 
 
 
 
 
 
 
34
 
35
- self.active_sessions = {}
36
  self.ensure_users_file()
 
37
 
38
- # DEBUG: Print initialization info
39
- print(f"DEBUG: AuthManager initialized")
40
- print(f"DEBUG: Data directory: {self.data_dir}")
41
- print(f"DEBUG: Users file: {self.users_file}")
42
- print(f"DEBUG: Users file exists: {os.path.exists(self.users_file)}")
43
 
44
  def ensure_users_file(self):
45
  """Ensure users file exists with better error handling"""
46
  try:
47
- os.makedirs(self.data_dir, exist_ok=True)
48
- print(f"DEBUG: Data directory created/exists: {self.data_dir}")
49
 
50
- if not os.path.exists(self.users_file):
51
  with open(self.users_file, 'w') as f:
52
  json.dump({}, f)
53
- print(f"DEBUG: Created empty users file: {self.users_file}")
54
- else:
55
- print(f"DEBUG: Users file already exists: {self.users_file}")
56
-
 
 
 
57
  except Exception as e:
58
- print(f"ERROR: Failed to create users file: {e}")
59
- # Create fallback in-memory storage
60
- self.users_file = None
61
- print("DEBUG: Using in-memory user storage as fallback")
 
62
 
63
- def hash_password(self, password: str) -> str:
64
- """Hash password with bcrypt"""
65
  try:
66
- return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
 
 
67
  except Exception as e:
68
- print(f"ERROR: Password hashing failed: {e}")
69
- raise
 
 
 
70
 
71
  def verify_password(self, password: str, hashed: str) -> bool:
72
  """Verify password against hash"""
73
  try:
74
  return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
75
  except Exception as e:
76
- print(f"ERROR: Password verification failed: {e}")
77
  return False
78
 
79
  def load_users(self) -> Dict[str, Any]:
80
- """Load users from file with better error handling"""
81
- if not self.users_file:
82
- print("DEBUG: No users file, returning empty dict")
83
- return {}
84
-
85
  try:
86
- with open(self.users_file, 'r') as f:
87
- users = json.load(f)
88
- print(f"DEBUG: Loaded {len(users)} users from file")
89
- return users
90
- except FileNotFoundError:
91
- print("DEBUG: Users file not found, returning empty dict")
92
  return {}
93
  except Exception as e:
94
- print(f"ERROR: Failed to load users: {e}")
95
  return {}
96
 
97
  def save_users(self, users: Dict[str, Any]):
98
- """Save users to file with better error handling"""
99
- if not self.users_file:
100
- print("DEBUG: No users file configured, cannot save")
101
  return
102
-
103
  try:
104
  with open(self.users_file, 'w') as f:
105
  json.dump(users, f, indent=2)
106
- print(f"DEBUG: Saved {len(users)} users to file")
107
  except Exception as e:
108
- print(f"ERROR: Failed to save users: {e}")
109
 
110
  def create_user(self, username: str, email: str, password: str) -> Dict[str, Any]:
111
- """Create a new user with debug info"""
112
- print(f"DEBUG: Creating user '{username}' with email '{email}'")
113
-
114
  users = self.load_users()
115
 
116
  if username in users:
117
- print(f"DEBUG: Username '{username}' already exists")
118
  return {'success': False, 'error': 'Username already exists'}
119
 
120
  # Check if email already exists
121
  for user_data in users.values():
122
  if user_data.get('email') == email:
123
- print(f"DEBUG: Email '{email}' already registered")
124
  return {'success': False, 'error': 'Email already registered'}
125
 
126
  # Create user
@@ -134,34 +131,30 @@ class AuthManager:
134
  }
135
 
136
  self.save_users(users)
137
- print(f"DEBUG: User '{username}' created successfully with ID '{user_id}'")
138
  return {'success': True, 'user_id': user_id}
139
 
140
  def authenticate_user(self, username: str, password: str) -> Dict[str, Any]:
141
- """Authenticate user credentials with detailed debug info"""
142
- print(f"DEBUG: Authenticating user '{username}'")
143
 
144
  users = self.load_users()
145
- print(f"DEBUG: Total users in database: {len(users)}")
146
- print(f"DEBUG: Available usernames: {list(users.keys())}")
147
 
148
  if username not in users:
149
- print(f"DEBUG: Username '{username}' not found in database")
 
150
  return {'success': False, 'error': 'Invalid username or password'}
151
 
152
  user = users[username]
153
- print(f"DEBUG: Found user '{username}', checking password...")
154
 
155
  if not self.verify_password(password, user['password_hash']):
156
- print(f"DEBUG: Password verification failed for '{username}'")
157
  return {'success': False, 'error': 'Invalid username or password'}
158
 
159
  if not user.get('is_active', True):
160
- print(f"DEBUG: User '{username}' is not active")
161
  return {'success': False, 'error': 'Account is disabled'}
162
 
163
- print(f"DEBUG: User '{username}' authenticated successfully")
164
-
165
  # Generate JWT token
166
  try:
167
  token = jwt.encode({
@@ -170,11 +163,10 @@ class AuthManager:
170
  'exp': datetime.utcnow() + timedelta(hours=8)
171
  }, self.secret_key, algorithm='HS256')
172
 
173
- print(f"DEBUG: JWT token generated for '{username}'")
174
-
175
  # Track active session
176
  self.add_active_session(user['user_id'], token)
177
 
 
178
  return {
179
  'success': True,
180
  'token': token,
@@ -182,42 +174,34 @@ class AuthManager:
182
  'username': username
183
  }
184
  except Exception as e:
185
- print(f"ERROR: JWT token generation failed: {e}")
186
  return {'success': False, 'error': 'Authentication failed'}
187
 
188
  def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
189
- """Verify JWT token with debug info"""
190
- print(f"DEBUG: Verifying token...")
191
-
192
  try:
193
  payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
194
  user_id = payload.get('user_id')
195
- print(f"DEBUG: Token decoded successfully for user_id: {user_id}")
196
 
197
  # Check if session is still active
198
  if not self.is_session_active(user_id, token):
199
- print(f"DEBUG: Session not active for user_id: {user_id}")
200
  return None
201
 
202
  # Update session activity
203
  self.update_session_activity(user_id)
204
- print(f"DEBUG: Token verified successfully for user_id: {user_id}")
205
-
206
  return payload
207
  except jwt.ExpiredSignatureError:
208
- print("DEBUG: Token expired")
209
  return None
210
- except jwt.InvalidTokenError as e:
211
- print(f"DEBUG: Invalid token: {e}")
212
  return None
213
  except Exception as e:
214
- print(f"ERROR: Token verification failed: {e}")
215
  return None
216
 
217
  def create_default_admin(self) -> Dict[str, Any]:
218
  """Create default admin user if it doesn't exist"""
219
- print("DEBUG: Creating default admin user...")
220
-
221
  users = self.load_users()
222
 
223
  admin_username = "admin"
@@ -225,59 +209,54 @@ class AuthManager:
225
 
226
  # Check if admin already exists
227
  if admin_username in users:
228
- print("DEBUG: Admin user already exists")
229
  return {'success': True, 'message': 'Admin user already exists'}
230
 
231
- # Check if user_id already exists
232
- for user_data in users.values():
233
- if user_data.get('user_id') == admin_user_id:
234
- print("DEBUG: Admin user ID already exists")
235
- return {'success': True, 'message': 'Admin user ID already exists'}
236
-
237
  # Create admin user
238
- users[admin_username] = {
239
- 'user_id': admin_user_id,
240
- 'email': '[email protected]',
241
- 'password_hash': self.hash_password('admin123'),
242
- 'created_at': datetime.now().isoformat(),
243
- 'is_active': True,
244
- 'is_admin': True
245
- }
246
-
247
- self.save_users(users)
248
- print("DEBUG: Default admin user created successfully")
249
-
250
- return {
251
- 'success': True,
252
- 'message': 'Default admin user created',
253
- 'username': admin_username,
254
- 'password': 'admin123',
255
- 'note': 'Please change the default password after first login'
256
- }
 
257
 
258
  def load_active_sessions(self) -> Dict[str, Any]:
259
- """Load active sessions from file"""
260
- if not self.session_file:
261
- return {}
262
-
263
  try:
264
- if os.path.exists(self.session_file):
265
  with open(self.session_file, 'r') as f:
266
  return json.load(f)
267
  except Exception as e:
268
- print(f"ERROR: Failed to load active sessions: {e}")
269
  return {}
270
 
271
  def save_active_sessions(self, sessions: Dict[str, Any]):
272
- """Save active sessions to file"""
273
- if not self.session_file:
 
274
  return
275
-
276
  try:
277
  with open(self.session_file, 'w') as f:
278
  json.dump(sessions, f, indent=2)
279
  except Exception as e:
280
- print(f"ERROR: Failed to save active sessions: {e}")
281
 
282
  def add_active_session(self, user_id: str, token: str):
283
  """Add an active session"""
@@ -288,132 +267,86 @@ class AuthManager:
288
  'last_activity': datetime.now().isoformat()
289
  }
290
  self.save_active_sessions(sessions)
291
- print(f"DEBUG: Added active session for user_id: {user_id}")
292
-
293
- def remove_active_session(self, user_id: str):
294
- """Remove an active session"""
295
- sessions = self.load_active_sessions()
296
- if user_id in sessions:
297
- del sessions[user_id]
298
- self.save_active_sessions(sessions)
299
- print(f"DEBUG: Removed active session for user_id: {user_id}")
300
 
301
  def is_session_active(self, user_id: str, token: str) -> bool:
302
  """Check if a session is active"""
303
  sessions = self.load_active_sessions()
304
  if user_id not in sessions:
305
- print(f"DEBUG: No session found for user_id: {user_id}")
306
  return False
307
 
308
  session = sessions[user_id]
309
  if session.get('token') != token:
310
- print(f"DEBUG: Token mismatch for user_id: {user_id}")
311
  return False
312
 
313
  # Check if session is expired
314
  try:
315
  created_at = datetime.fromisoformat(session['created_at'])
316
  if datetime.now() - created_at > timedelta(hours=8):
317
- print(f"DEBUG: Session expired for user_id: {user_id}")
318
  self.remove_active_session(user_id)
319
  return False
320
- except Exception as e:
321
- print(f"ERROR: Failed to check session expiry: {e}")
322
  return False
323
 
324
  return True
325
 
 
 
 
 
 
 
 
326
  def update_session_activity(self, user_id: str):
327
  """Update last activity time for a session"""
328
  sessions = self.load_active_sessions()
329
  if user_id in sessions:
330
  sessions[user_id]['last_activity'] = datetime.now().isoformat()
331
  self.save_active_sessions(sessions)
332
-
333
- # Global auth manager
334
- auth_manager = AuthManager()
335
-
336
- def require_auth(f):
337
- """Decorator to require authentication"""
338
- @wraps(f)
339
- def decorated_function(*args, **kwargs):
340
- if not FLASK_AVAILABLE:
341
- return f(*args, **kwargs)
342
-
343
- user = auth_manager.get_current_user(request)
344
- if not user:
345
- print("DEBUG: Authentication required but no user found")
346
- if request.is_json:
347
- return jsonify({'success': False, 'error': 'Authentication required'}), 401
348
- else:
349
- return redirect(url_for('login'))
350
- return f(*args, **kwargs)
351
- return decorated_function
352
-
353
- def get_current_user() -> Optional[Dict[str, Any]]:
354
- """Get current authenticated user"""
355
- if not FLASK_AVAILABLE:
356
- return None
357
- return auth_manager.get_current_user(request)
358
-
359
- # ADDITIONAL DEBUGGING FUNCTIONS
360
-
361
- def debug_auth_status():
362
- """Debug function to check authentication status"""
363
- print("=== AUTH DEBUG STATUS ===")
364
- users = auth_manager.load_users()
365
- print(f"Total users: {len(users)}")
366
- for username, user_data in users.items():
367
- print(f" - {username}: {user_data.get('user_id', 'No ID')}")
368
 
369
- sessions = auth_manager.load_active_sessions()
370
- print(f"Active sessions: {len(sessions)}")
371
- for user_id, session_data in sessions.items():
372
- print(f" - {user_id}: {session_data.get('created_at', 'No timestamp')}")
373
 
374
- print(f"Users file exists: {os.path.exists(auth_manager.users_file) if auth_manager.users_file else 'No file path'}")
375
- print(f"Session file exists: {os.path.exists(auth_manager.session_file) if auth_manager.session_file else 'No file path'}")
376
- print("========================")
377
-
378
- # FLASK ROUTE EXAMPLE WITH DEBUG
379
- def create_debug_login_route(app):
380
- """Example login route with debugging"""
381
- @app.route('/api/auth/login', methods=['POST'])
382
- def login():
383
- print("DEBUG: Login endpoint called")
384
 
385
- try:
386
- data = request.get_json()
387
- print(f"DEBUG: Request data: {data}")
388
-
389
- if not data:
390
- print("DEBUG: No JSON data received")
391
- return jsonify({'success': False, 'error': 'No data provided'}), 400
392
-
393
- username = data.get('username')
394
- password = data.get('password')
395
-
396
- print(f"DEBUG: Login attempt for username: {username}")
397
-
398
- if not username or not password:
399
- print("DEBUG: Missing username or password")
400
- return jsonify({'success': False, 'error': 'Username and password required'}), 400
401
-
402
- # Ensure admin user exists
403
- admin_result = auth_manager.create_default_admin()
404
- print(f"DEBUG: Admin creation result: {admin_result}")
405
-
406
- # Authenticate user
407
- result = auth_manager.authenticate_user(username, password)
408
- print(f"DEBUG: Authentication result: {result}")
409
-
410
- if result['success']:
411
- print("DEBUG: Authentication successful")
412
- return jsonify(result), 200
413
- else:
414
- print("DEBUG: Authentication failed")
415
- return jsonify(result), 401
416
-
417
- except Exception as e:
418
- print(f"ERROR: Login route exception: {e}")
419
- return jsonify({'success': False, 'error': 'Internal server error'}), 500
 
 
 
1
  """
2
+ FastAPI compatible authentication module
3
  """
4
 
5
  import jwt
 
8
  import os
9
  from datetime import datetime, timedelta
10
  from typing import Optional, Dict, Any
11
+ from pathlib import Path
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
  class AuthManager:
14
  def __init__(self, secret_key: str = None):
15
  self.secret_key = secret_key or os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production')
16
 
17
+ # Use absolute paths and handle Hugging Face Spaces
18
+ self.base_dir = Path(__file__).parent.parent # Go up from src/components
19
+ self.data_dir = self.base_dir / 'data'
20
+
21
+ # Fallback to current directory if base_dir doesn't work
22
+ if not self.data_dir.exists():
23
+ self.data_dir = Path.cwd() / 'data'
24
+
25
+ self.users_file = self.data_dir / 'users.json'
26
+ self.session_file = self.data_dir / 'active_sessions.json'
27
+
28
+ # In-memory fallback for read-only environments
29
+ self.users_memory = {}
30
+ self.sessions_memory = {}
31
+ self.use_memory = False
32
 
 
33
  self.ensure_users_file()
34
+ self.ensure_admin_user() # Ensure admin exists on startup
35
 
36
+ print(f"✅ AuthManager initialized")
37
+ print(f"📁 Data directory: {self.data_dir}")
38
+ print(f"📄 Users file: {self.users_file}")
39
+ print(f"💾 Using memory storage: {self.use_memory}")
 
40
 
41
  def ensure_users_file(self):
42
  """Ensure users file exists with better error handling"""
43
  try:
44
+ self.data_dir.mkdir(parents=True, exist_ok=True)
 
45
 
46
+ if not self.users_file.exists():
47
  with open(self.users_file, 'w') as f:
48
  json.dump({}, f)
49
+ print(f" Created users file: {self.users_file}")
50
+
51
+ # Test write permissions
52
+ test_data = self.load_users()
53
+ self.save_users(test_data)
54
+ print(f"✅ Write permissions confirmed")
55
+
56
  except Exception as e:
57
+ print(f"⚠️ File system error: {e}")
58
+ print(f"🔄 Switching to in-memory storage")
59
+ self.use_memory = True
60
+ self.users_memory = {}
61
+ self.sessions_memory = {}
62
 
63
+ def ensure_admin_user(self):
64
+ """Ensure admin user exists on startup"""
65
  try:
66
+ result = self.create_default_admin()
67
+ if result.get('success'):
68
+ print(f"✅ Admin user ready: {result.get('message', 'Available')}")
69
  except Exception as e:
70
+ print(f"⚠️ Admin user creation failed: {e}")
71
+
72
+ def hash_password(self, password: str) -> str:
73
+ """Hash password with bcrypt"""
74
+ return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
75
 
76
  def verify_password(self, password: str, hashed: str) -> bool:
77
  """Verify password against hash"""
78
  try:
79
  return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
80
  except Exception as e:
81
+ print(f" Password verification error: {e}")
82
  return False
83
 
84
  def load_users(self) -> Dict[str, Any]:
85
+ """Load users from file or memory"""
86
+ if self.use_memory:
87
+ return self.users_memory.copy()
88
+
 
89
  try:
90
+ if self.users_file.exists():
91
+ with open(self.users_file, 'r') as f:
92
+ users = json.load(f)
93
+ return users
 
 
94
  return {}
95
  except Exception as e:
96
+ print(f" Error loading users: {e}")
97
  return {}
98
 
99
  def save_users(self, users: Dict[str, Any]):
100
+ """Save users to file or memory"""
101
+ if self.use_memory:
102
+ self.users_memory = users.copy()
103
  return
104
+
105
  try:
106
  with open(self.users_file, 'w') as f:
107
  json.dump(users, f, indent=2)
 
108
  except Exception as e:
109
+ print(f" Error saving users: {e}")
110
 
111
  def create_user(self, username: str, email: str, password: str) -> Dict[str, Any]:
112
+ """Create a new user"""
 
 
113
  users = self.load_users()
114
 
115
  if username in users:
 
116
  return {'success': False, 'error': 'Username already exists'}
117
 
118
  # Check if email already exists
119
  for user_data in users.values():
120
  if user_data.get('email') == email:
 
121
  return {'success': False, 'error': 'Email already registered'}
122
 
123
  # Create user
 
131
  }
132
 
133
  self.save_users(users)
 
134
  return {'success': True, 'user_id': user_id}
135
 
136
  def authenticate_user(self, username: str, password: str) -> Dict[str, Any]:
137
+ """Authenticate user credentials with detailed logging"""
138
+ print(f"🔐 Authentication attempt for: {username}")
139
 
140
  users = self.load_users()
141
+ print(f"📊 Total users in database: {len(users)}")
 
142
 
143
  if username not in users:
144
+ print(f" Username '{username}' not found")
145
+ print(f"📋 Available usernames: {list(users.keys())}")
146
  return {'success': False, 'error': 'Invalid username or password'}
147
 
148
  user = users[username]
 
149
 
150
  if not self.verify_password(password, user['password_hash']):
151
+ print(f" Invalid password for '{username}'")
152
  return {'success': False, 'error': 'Invalid username or password'}
153
 
154
  if not user.get('is_active', True):
155
+ print(f" User '{username}' is not active")
156
  return {'success': False, 'error': 'Account is disabled'}
157
 
 
 
158
  # Generate JWT token
159
  try:
160
  token = jwt.encode({
 
163
  'exp': datetime.utcnow() + timedelta(hours=8)
164
  }, self.secret_key, algorithm='HS256')
165
 
 
 
166
  # Track active session
167
  self.add_active_session(user['user_id'], token)
168
 
169
+ print(f"✅ Authentication successful for '{username}'")
170
  return {
171
  'success': True,
172
  'token': token,
 
174
  'username': username
175
  }
176
  except Exception as e:
177
+ print(f" JWT generation failed: {e}")
178
  return {'success': False, 'error': 'Authentication failed'}
179
 
180
  def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
181
+ """Verify JWT token"""
 
 
182
  try:
183
  payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
184
  user_id = payload.get('user_id')
 
185
 
186
  # Check if session is still active
187
  if not self.is_session_active(user_id, token):
 
188
  return None
189
 
190
  # Update session activity
191
  self.update_session_activity(user_id)
 
 
192
  return payload
193
  except jwt.ExpiredSignatureError:
194
+ print("🕐 Token expired")
195
  return None
196
+ except jwt.InvalidTokenError:
197
+ print(" Invalid token")
198
  return None
199
  except Exception as e:
200
+ print(f" Token verification error: {e}")
201
  return None
202
 
203
  def create_default_admin(self) -> Dict[str, Any]:
204
  """Create default admin user if it doesn't exist"""
 
 
205
  users = self.load_users()
206
 
207
  admin_username = "admin"
 
209
 
210
  # Check if admin already exists
211
  if admin_username in users:
 
212
  return {'success': True, 'message': 'Admin user already exists'}
213
 
 
 
 
 
 
 
214
  # Create admin user
215
+ try:
216
+ users[admin_username] = {
217
+ 'user_id': admin_user_id,
218
+ 'email': 'admin@researchmate.local',
219
+ 'password_hash': self.hash_password('admin123'),
220
+ 'created_at': datetime.now().isoformat(),
221
+ 'is_active': True,
222
+ 'is_admin': True
223
+ }
224
+
225
+ self.save_users(users)
226
+ return {
227
+ 'success': True,
228
+ 'message': 'Default admin user created',
229
+ 'username': admin_username,
230
+ 'password': 'admin123'
231
+ }
232
+ except Exception as e:
233
+ print(f"❌ Failed to create admin user: {e}")
234
+ return {'success': False, 'error': str(e)}
235
 
236
  def load_active_sessions(self) -> Dict[str, Any]:
237
+ """Load active sessions"""
238
+ if self.use_memory:
239
+ return self.sessions_memory.copy()
240
+
241
  try:
242
+ if self.session_file.exists():
243
  with open(self.session_file, 'r') as f:
244
  return json.load(f)
245
  except Exception as e:
246
+ print(f" Error loading sessions: {e}")
247
  return {}
248
 
249
  def save_active_sessions(self, sessions: Dict[str, Any]):
250
+ """Save active sessions"""
251
+ if self.use_memory:
252
+ self.sessions_memory = sessions.copy()
253
  return
254
+
255
  try:
256
  with open(self.session_file, 'w') as f:
257
  json.dump(sessions, f, indent=2)
258
  except Exception as e:
259
+ print(f" Error saving sessions: {e}")
260
 
261
  def add_active_session(self, user_id: str, token: str):
262
  """Add an active session"""
 
267
  'last_activity': datetime.now().isoformat()
268
  }
269
  self.save_active_sessions(sessions)
 
 
 
 
 
 
 
 
 
270
 
271
  def is_session_active(self, user_id: str, token: str) -> bool:
272
  """Check if a session is active"""
273
  sessions = self.load_active_sessions()
274
  if user_id not in sessions:
 
275
  return False
276
 
277
  session = sessions[user_id]
278
  if session.get('token') != token:
 
279
  return False
280
 
281
  # Check if session is expired
282
  try:
283
  created_at = datetime.fromisoformat(session['created_at'])
284
  if datetime.now() - created_at > timedelta(hours=8):
 
285
  self.remove_active_session(user_id)
286
  return False
287
+ except:
 
288
  return False
289
 
290
  return True
291
 
292
+ def remove_active_session(self, user_id: str):
293
+ """Remove an active session"""
294
+ sessions = self.load_active_sessions()
295
+ if user_id in sessions:
296
+ del sessions[user_id]
297
+ self.save_active_sessions(sessions)
298
+
299
  def update_session_activity(self, user_id: str):
300
  """Update last activity time for a session"""
301
  sessions = self.load_active_sessions()
302
  if user_id in sessions:
303
  sessions[user_id]['last_activity'] = datetime.now().isoformat()
304
  self.save_active_sessions(sessions)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
305
 
306
+ def logout_user(self, user_id: str):
307
+ """Logout user and invalidate session"""
308
+ self.remove_active_session(user_id)
309
+ return {'success': True, 'message': 'Logged out successfully'}
310
 
311
+ def cleanup_expired_sessions(self):
312
+ """Clean up expired sessions"""
313
+ sessions = self.load_active_sessions()
314
+ current_time = datetime.now()
 
 
 
 
 
 
315
 
316
+ expired_sessions = []
317
+ for user_id, session in sessions.items():
318
+ try:
319
+ created_at = datetime.fromisoformat(session['created_at'])
320
+ if current_time - created_at > timedelta(hours=8):
321
+ expired_sessions.append(user_id)
322
+ except:
323
+ expired_sessions.append(user_id)
324
+
325
+ for user_id in expired_sessions:
326
+ del sessions[user_id]
327
+
328
+ if expired_sessions:
329
+ self.save_active_sessions(sessions)
330
+
331
+ return len(expired_sessions)
332
+
333
+ def debug_status(self):
334
+ """Debug authentication status"""
335
+ users = self.load_users()
336
+ sessions = self.load_active_sessions()
337
+
338
+ print("=== AUTH DEBUG STATUS ===")
339
+ print(f"Storage mode: {'Memory' if self.use_memory else 'File'}")
340
+ print(f"Users file exists: {self.users_file.exists() if not self.use_memory else 'N/A'}")
341
+ print(f"Total users: {len(users)}")
342
+ print(f"Active sessions: {len(sessions)}")
343
+
344
+ if users:
345
+ print("Users:")
346
+ for username, user_data in users.items():
347
+ print(f" - {username}: {user_data.get('user_id', 'No ID')}")
348
+
349
+ print("========================")
350
+
351
+ # Global auth manager instance
352
+ auth_manager = AuthManager()