Spaces:
Sleeping
Sleeping
""" | |
FastAPI compatible authentication module | |
""" | |
import jwt | |
import bcrypt | |
import json | |
import os | |
from datetime import datetime, timedelta | |
from typing import Optional, Dict, Any | |
from pathlib import Path | |
class AuthManager: | |
def __init__(self, secret_key: str = None): | |
self.secret_key = secret_key or os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production') | |
# Use absolute paths and handle Hugging Face Spaces | |
self.base_dir = Path(__file__).parent.parent # Go up from src/components | |
self.data_dir = self.base_dir / 'data' | |
# Fallback to current directory if base_dir doesn't work | |
if not self.data_dir.exists(): | |
self.data_dir = Path.cwd() / 'data' | |
self.users_file = self.data_dir / 'users.json' | |
self.session_file = self.data_dir / 'active_sessions.json' | |
# In-memory fallback for read-only environments | |
self.users_memory = {} | |
self.sessions_memory = {} | |
self.use_memory = False | |
self.ensure_users_file() | |
self.ensure_admin_user() # Ensure admin exists on startup | |
print(f"β AuthManager initialized") | |
print(f"π Data directory: {self.data_dir}") | |
print(f"π Users file: {self.users_file}") | |
print(f"πΎ Using memory storage: {self.use_memory}") | |
def ensure_users_file(self): | |
"""Ensure users file exists with better error handling""" | |
try: | |
self.data_dir.mkdir(parents=True, exist_ok=True) | |
if not self.users_file.exists(): | |
with open(self.users_file, 'w') as f: | |
json.dump({}, f) | |
print(f"β Created users file: {self.users_file}") | |
# Test write permissions | |
test_data = self.load_users() | |
self.save_users(test_data) | |
print(f"β Write permissions confirmed") | |
except Exception as e: | |
print(f"β οΈ File system error: {e}") | |
print(f"π Switching to in-memory storage") | |
self.use_memory = True | |
self.users_memory = {} | |
self.sessions_memory = {} | |
def ensure_admin_user(self): | |
"""Ensure admin user exists on startup""" | |
try: | |
result = self.create_default_admin() | |
if result.get('success'): | |
print(f"β Admin user ready: {result.get('message', 'Available')}") | |
except Exception as e: | |
print(f"β οΈ Admin user creation failed: {e}") | |
def hash_password(self, password: str) -> str: | |
"""Hash password with bcrypt""" | |
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') | |
def verify_password(self, password: str, hashed: str) -> bool: | |
"""Verify password against hash""" | |
try: | |
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) | |
except Exception as e: | |
print(f"β Password verification error: {e}") | |
return False | |
def load_users(self) -> Dict[str, Any]: | |
"""Load users from file or memory""" | |
if self.use_memory: | |
return self.users_memory.copy() | |
try: | |
if self.users_file.exists(): | |
with open(self.users_file, 'r') as f: | |
users = json.load(f) | |
return users | |
return {} | |
except Exception as e: | |
print(f"β Error loading users: {e}") | |
return {} | |
def save_users(self, users: Dict[str, Any]): | |
"""Save users to file or memory""" | |
if self.use_memory: | |
self.users_memory = users.copy() | |
return | |
try: | |
with open(self.users_file, 'w') as f: | |
json.dump(users, f, indent=2) | |
except Exception as e: | |
print(f"β Error saving users: {e}") | |
def create_user(self, username: str, email: str, password: str) -> Dict[str, Any]: | |
"""Create a new user""" | |
users = self.load_users() | |
if username in users: | |
return {'success': False, 'error': 'Username already exists'} | |
# Check if email already exists | |
for user_data in users.values(): | |
if user_data.get('email') == email: | |
return {'success': False, 'error': 'Email already registered'} | |
# Create user | |
user_id = f"user_{len(users) + 1}" | |
users[username] = { | |
'user_id': user_id, | |
'email': email, | |
'password_hash': self.hash_password(password), | |
'created_at': datetime.now().isoformat(), | |
'is_active': True | |
} | |
self.save_users(users) | |
return {'success': True, 'user_id': user_id} | |
def authenticate_user(self, username: str, password: str) -> Dict[str, Any]: | |
"""Authenticate user credentials with detailed logging""" | |
print(f"π Authentication attempt for: {username}") | |
users = self.load_users() | |
print(f"π Total users in database: {len(users)}") | |
if username not in users: | |
print(f"β Username '{username}' not found") | |
print(f"π Available usernames: {list(users.keys())}") | |
return {'success': False, 'error': 'Invalid username or password'} | |
user = users[username] | |
if not self.verify_password(password, user['password_hash']): | |
print(f"β Invalid password for '{username}'") | |
return {'success': False, 'error': 'Invalid username or password'} | |
if not user.get('is_active', True): | |
print(f"β User '{username}' is not active") | |
return {'success': False, 'error': 'Account is disabled'} | |
# Generate JWT token | |
try: | |
token = jwt.encode({ | |
'user_id': user['user_id'], | |
'username': username, | |
'exp': datetime.utcnow() + timedelta(hours=8) | |
}, self.secret_key, algorithm='HS256') | |
# Track active session | |
self.add_active_session(user['user_id'], token) | |
print(f"β Authentication successful for '{username}'") | |
return { | |
'success': True, | |
'token': token, | |
'user_id': user['user_id'], | |
'username': username | |
} | |
except Exception as e: | |
print(f"β JWT generation failed: {e}") | |
return {'success': False, 'error': 'Authentication failed'} | |
def verify_token(self, token: str) -> Optional[Dict[str, Any]]: | |
"""Verify JWT token""" | |
try: | |
payload = jwt.decode(token, self.secret_key, algorithms=['HS256']) | |
user_id = payload.get('user_id') | |
# Check if session is still active | |
if not self.is_session_active(user_id, token): | |
return None | |
# Update session activity | |
self.update_session_activity(user_id) | |
return payload | |
except jwt.ExpiredSignatureError: | |
print("π Token expired") | |
return None | |
except jwt.InvalidTokenError: | |
print("β Invalid token") | |
return None | |
except Exception as e: | |
print(f"β Token verification error: {e}") | |
return None | |
def create_default_admin(self) -> Dict[str, Any]: | |
"""Create default admin user if it doesn't exist""" | |
users = self.load_users() | |
admin_username = "admin" | |
admin_user_id = "admin_user" | |
# Check if admin already exists | |
if admin_username in users: | |
return {'success': True, 'message': 'Admin user already exists'} | |
# Create admin user | |
try: | |
users[admin_username] = { | |
'user_id': admin_user_id, | |
'email': '[email protected]', | |
'password_hash': self.hash_password('admin123'), | |
'created_at': datetime.now().isoformat(), | |
'is_active': True, | |
'is_admin': True | |
} | |
self.save_users(users) | |
return { | |
'success': True, | |
'message': 'Default admin user created', | |
'username': admin_username, | |
'password': 'admin123' | |
} | |
except Exception as e: | |
print(f"β Failed to create admin user: {e}") | |
return {'success': False, 'error': str(e)} | |
def load_active_sessions(self) -> Dict[str, Any]: | |
"""Load active sessions""" | |
if self.use_memory: | |
return self.sessions_memory.copy() | |
try: | |
if self.session_file.exists(): | |
with open(self.session_file, 'r') as f: | |
return json.load(f) | |
except Exception as e: | |
print(f"β Error loading sessions: {e}") | |
return {} | |
def save_active_sessions(self, sessions: Dict[str, Any]): | |
"""Save active sessions""" | |
if self.use_memory: | |
self.sessions_memory = sessions.copy() | |
return | |
try: | |
with open(self.session_file, 'w') as f: | |
json.dump(sessions, f, indent=2) | |
except Exception as e: | |
print(f"β Error saving sessions: {e}") | |
def add_active_session(self, user_id: str, token: str): | |
"""Add an active session""" | |
sessions = self.load_active_sessions() | |
sessions[user_id] = { | |
'token': token, | |
'created_at': datetime.now().isoformat(), | |
'last_activity': datetime.now().isoformat() | |
} | |
self.save_active_sessions(sessions) | |
def is_session_active(self, user_id: str, token: str) -> bool: | |
"""Check if a session is active""" | |
sessions = self.load_active_sessions() | |
if user_id not in sessions: | |
return False | |
session = sessions[user_id] | |
if session.get('token') != token: | |
return False | |
# Check if session is expired | |
try: | |
created_at = datetime.fromisoformat(session['created_at']) | |
if datetime.now() - created_at > timedelta(hours=8): | |
self.remove_active_session(user_id) | |
return False | |
except: | |
return False | |
return True | |
def remove_active_session(self, user_id: str): | |
"""Remove an active session""" | |
sessions = self.load_active_sessions() | |
if user_id in sessions: | |
del sessions[user_id] | |
self.save_active_sessions(sessions) | |
def update_session_activity(self, user_id: str): | |
"""Update last activity time for a session""" | |
sessions = self.load_active_sessions() | |
if user_id in sessions: | |
sessions[user_id]['last_activity'] = datetime.now().isoformat() | |
self.save_active_sessions(sessions) | |
def logout_user(self, user_id: str): | |
"""Logout user and invalidate session""" | |
self.remove_active_session(user_id) | |
return {'success': True, 'message': 'Logged out successfully'} | |
def cleanup_expired_sessions(self): | |
"""Clean up expired sessions""" | |
sessions = self.load_active_sessions() | |
current_time = datetime.now() | |
expired_sessions = [] | |
for user_id, session in sessions.items(): | |
try: | |
created_at = datetime.fromisoformat(session['created_at']) | |
if current_time - created_at > timedelta(hours=8): | |
expired_sessions.append(user_id) | |
except: | |
expired_sessions.append(user_id) | |
for user_id in expired_sessions: | |
del sessions[user_id] | |
if expired_sessions: | |
self.save_active_sessions(sessions) | |
return len(expired_sessions) | |
def debug_status(self): | |
"""Debug authentication status""" | |
users = self.load_users() | |
sessions = self.load_active_sessions() | |
print("=== AUTH DEBUG STATUS ===") | |
print(f"Storage mode: {'Memory' if self.use_memory else 'File'}") | |
print(f"Users file exists: {self.users_file.exists() if not self.use_memory else 'N/A'}") | |
print(f"Total users: {len(users)}") | |
print(f"Active sessions: {len(sessions)}") | |
if users: | |
print("Users:") | |
for username, user_data in users.items(): | |
print(f" - {username}: {user_data.get('user_id', 'No ID')}") | |
print("========================") | |
# Global auth manager instance | |
auth_manager = AuthManager() |