""" FastAPI compatible authentication module """ import jwt import bcrypt import json import os from datetime import datetime, timedelta from typing import Optional, Dict, Any from pathlib import Path class AuthManager: def __init__(self, secret_key: str = None): self.secret_key = secret_key or os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production') # Use absolute paths and handle Hugging Face Spaces self.base_dir = Path(__file__).parent.parent # Go up from src/components self.data_dir = self.base_dir / 'data' # Fallback to current directory if base_dir doesn't work if not self.data_dir.exists(): self.data_dir = Path.cwd() / 'data' self.users_file = self.data_dir / 'users.json' self.session_file = self.data_dir / 'active_sessions.json' # In-memory fallback for read-only environments self.users_memory = {} self.sessions_memory = {} self.use_memory = False self.ensure_users_file() self.ensure_admin_user() # Ensure admin exists on startup print(f"✅ AuthManager initialized") print(f"📁 Data directory: {self.data_dir}") print(f"📄 Users file: {self.users_file}") print(f"💾 Using memory storage: {self.use_memory}") def ensure_users_file(self): """Ensure users file exists with better error handling""" try: self.data_dir.mkdir(parents=True, exist_ok=True) if not self.users_file.exists(): with open(self.users_file, 'w') as f: json.dump({}, f) print(f"✅ Created users file: {self.users_file}") # Test write permissions test_data = self.load_users() self.save_users(test_data) print(f"✅ Write permissions confirmed") except Exception as e: print(f"⚠️ File system error: {e}") print(f"🔄 Switching to in-memory storage") self.use_memory = True self.users_memory = {} self.sessions_memory = {} def ensure_admin_user(self): """Ensure admin user exists on startup""" try: result = self.create_default_admin() if result.get('success'): print(f"✅ Admin user ready: {result.get('message', 'Available')}") except Exception as e: print(f"⚠️ Admin user creation failed: {e}") def hash_password(self, password: str) -> str: """Hash password with bcrypt""" return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') def verify_password(self, password: str, hashed: str) -> bool: """Verify password against hash""" try: return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) except Exception as e: print(f"❌ Password verification error: {e}") return False def load_users(self) -> Dict[str, Any]: """Load users from file or memory""" if self.use_memory: return self.users_memory.copy() try: if self.users_file.exists(): with open(self.users_file, 'r') as f: users = json.load(f) return users return {} except Exception as e: print(f"❌ Error loading users: {e}") return {} def save_users(self, users: Dict[str, Any]): """Save users to file or memory""" if self.use_memory: self.users_memory = users.copy() return try: with open(self.users_file, 'w') as f: json.dump(users, f, indent=2) except Exception as e: print(f"❌ Error saving users: {e}") def create_user(self, username: str, email: str, password: str) -> Dict[str, Any]: """Create a new user""" users = self.load_users() if username in users: return {'success': False, 'error': 'Username already exists'} # Check if email already exists for user_data in users.values(): if user_data.get('email') == email: return {'success': False, 'error': 'Email already registered'} # Create user user_id = f"user_{len(users) + 1}" users[username] = { 'user_id': user_id, 'email': email, 'password_hash': self.hash_password(password), 'created_at': datetime.now().isoformat(), 'is_active': True } self.save_users(users) return {'success': True, 'user_id': user_id} def authenticate_user(self, username: str, password: str) -> Dict[str, Any]: """Authenticate user credentials with detailed logging""" print(f"🔐 Authentication attempt for: {username}") users = self.load_users() print(f"📊 Total users in database: {len(users)}") if username not in users: print(f"❌ Username '{username}' not found") print(f"📋 Available usernames: {list(users.keys())}") return {'success': False, 'error': 'Invalid username or password'} user = users[username] if not self.verify_password(password, user['password_hash']): print(f"❌ Invalid password for '{username}'") return {'success': False, 'error': 'Invalid username or password'} if not user.get('is_active', True): print(f"❌ User '{username}' is not active") return {'success': False, 'error': 'Account is disabled'} # Generate JWT token try: token = jwt.encode({ 'user_id': user['user_id'], 'username': username, 'exp': datetime.utcnow() + timedelta(hours=8) }, self.secret_key, algorithm='HS256') # Track active session self.add_active_session(user['user_id'], token) print(f"✅ Authentication successful for '{username}'") return { 'success': True, 'token': token, 'user_id': user['user_id'], 'username': username } except Exception as e: print(f"❌ JWT generation failed: {e}") return {'success': False, 'error': 'Authentication failed'} def verify_token(self, token: str) -> Optional[Dict[str, Any]]: """Verify JWT token""" try: payload = jwt.decode(token, self.secret_key, algorithms=['HS256']) user_id = payload.get('user_id') # Check if session is still active if not self.is_session_active(user_id, token): return None # Update session activity self.update_session_activity(user_id) return payload except jwt.ExpiredSignatureError: print("🕐 Token expired") return None except jwt.InvalidTokenError: print("❌ Invalid token") return None except Exception as e: print(f"❌ Token verification error: {e}") return None def create_default_admin(self) -> Dict[str, Any]: """Create default admin user if it doesn't exist""" users = self.load_users() admin_username = "admin" admin_user_id = "admin_user" # Check if admin already exists if admin_username in users: return {'success': True, 'message': 'Admin user already exists'} # Create admin user try: users[admin_username] = { 'user_id': admin_user_id, 'email': 'admin@researchmate.local', 'password_hash': self.hash_password('admin123'), 'created_at': datetime.now().isoformat(), 'is_active': True, 'is_admin': True } self.save_users(users) return { 'success': True, 'message': 'Default admin user created', 'username': admin_username, 'password': 'admin123' } except Exception as e: print(f"❌ Failed to create admin user: {e}") return {'success': False, 'error': str(e)} def load_active_sessions(self) -> Dict[str, Any]: """Load active sessions""" if self.use_memory: return self.sessions_memory.copy() try: if self.session_file.exists(): with open(self.session_file, 'r') as f: return json.load(f) except Exception as e: print(f"❌ Error loading sessions: {e}") return {} def save_active_sessions(self, sessions: Dict[str, Any]): """Save active sessions""" if self.use_memory: self.sessions_memory = sessions.copy() return try: with open(self.session_file, 'w') as f: json.dump(sessions, f, indent=2) except Exception as e: print(f"❌ Error saving sessions: {e}") def add_active_session(self, user_id: str, token: str): """Add an active session""" sessions = self.load_active_sessions() sessions[user_id] = { 'token': token, 'created_at': datetime.now().isoformat(), 'last_activity': datetime.now().isoformat() } self.save_active_sessions(sessions) def is_session_active(self, user_id: str, token: str) -> bool: """Check if a session is active""" sessions = self.load_active_sessions() if user_id not in sessions: return False session = sessions[user_id] if session.get('token') != token: return False # Check if session is expired try: created_at = datetime.fromisoformat(session['created_at']) if datetime.now() - created_at > timedelta(hours=8): self.remove_active_session(user_id) return False except: return False return True def remove_active_session(self, user_id: str): """Remove an active session""" sessions = self.load_active_sessions() if user_id in sessions: del sessions[user_id] self.save_active_sessions(sessions) def update_session_activity(self, user_id: str): """Update last activity time for a session""" sessions = self.load_active_sessions() if user_id in sessions: sessions[user_id]['last_activity'] = datetime.now().isoformat() self.save_active_sessions(sessions) def logout_user(self, user_id: str): """Logout user and invalidate session""" self.remove_active_session(user_id) return {'success': True, 'message': 'Logged out successfully'} def cleanup_expired_sessions(self): """Clean up expired sessions""" sessions = self.load_active_sessions() current_time = datetime.now() expired_sessions = [] for user_id, session in sessions.items(): try: created_at = datetime.fromisoformat(session['created_at']) if current_time - created_at > timedelta(hours=8): expired_sessions.append(user_id) except: expired_sessions.append(user_id) for user_id in expired_sessions: del sessions[user_id] if expired_sessions: self.save_active_sessions(sessions) return len(expired_sessions) def debug_status(self): """Debug authentication status""" users = self.load_users() sessions = self.load_active_sessions() print("=== AUTH DEBUG STATUS ===") print(f"Storage mode: {'Memory' if self.use_memory else 'File'}") print(f"Users file exists: {self.users_file.exists() if not self.use_memory else 'N/A'}") print(f"Total users: {len(users)}") print(f"Active sessions: {len(sessions)}") if users: print("Users:") for username, user_data in users.items(): print(f" - {username}: {user_data.get('user_id', 'No ID')}") print("========================") # Global auth manager instance auth_manager = AuthManager()