Ananthakr1shnan's picture
Update src/components/auth.py
6fef50e verified
raw
history blame
12.9 kB
"""
FastAPI compatible authentication module
"""
import jwt
import bcrypt
import json
import os
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from pathlib import Path
class AuthManager:
def __init__(self, secret_key: str = None):
self.secret_key = secret_key or os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production')
# Use absolute paths and handle Hugging Face Spaces
self.base_dir = Path(__file__).parent.parent # Go up from src/components
self.data_dir = self.base_dir / 'data'
# Fallback to current directory if base_dir doesn't work
if not self.data_dir.exists():
self.data_dir = Path.cwd() / 'data'
self.users_file = self.data_dir / 'users.json'
self.session_file = self.data_dir / 'active_sessions.json'
# In-memory fallback for read-only environments
self.users_memory = {}
self.sessions_memory = {}
self.use_memory = False
self.ensure_users_file()
self.ensure_admin_user() # Ensure admin exists on startup
print(f"βœ… AuthManager initialized")
print(f"πŸ“ Data directory: {self.data_dir}")
print(f"πŸ“„ Users file: {self.users_file}")
print(f"πŸ’Ύ Using memory storage: {self.use_memory}")
def ensure_users_file(self):
"""Ensure users file exists with better error handling"""
try:
self.data_dir.mkdir(parents=True, exist_ok=True)
if not self.users_file.exists():
with open(self.users_file, 'w') as f:
json.dump({}, f)
print(f"βœ… Created users file: {self.users_file}")
# Test write permissions
test_data = self.load_users()
self.save_users(test_data)
print(f"βœ… Write permissions confirmed")
except Exception as e:
print(f"⚠️ File system error: {e}")
print(f"πŸ”„ Switching to in-memory storage")
self.use_memory = True
self.users_memory = {}
self.sessions_memory = {}
def ensure_admin_user(self):
"""Ensure admin user exists on startup"""
try:
result = self.create_default_admin()
if result.get('success'):
print(f"βœ… Admin user ready: {result.get('message', 'Available')}")
except Exception as e:
print(f"⚠️ Admin user creation failed: {e}")
def hash_password(self, password: str) -> str:
"""Hash password with bcrypt"""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(self, password: str, hashed: str) -> bool:
"""Verify password against hash"""
try:
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
except Exception as e:
print(f"❌ Password verification error: {e}")
return False
def load_users(self) -> Dict[str, Any]:
"""Load users from file or memory"""
if self.use_memory:
return self.users_memory.copy()
try:
if self.users_file.exists():
with open(self.users_file, 'r') as f:
users = json.load(f)
return users
return {}
except Exception as e:
print(f"❌ Error loading users: {e}")
return {}
def save_users(self, users: Dict[str, Any]):
"""Save users to file or memory"""
if self.use_memory:
self.users_memory = users.copy()
return
try:
with open(self.users_file, 'w') as f:
json.dump(users, f, indent=2)
except Exception as e:
print(f"❌ Error saving users: {e}")
def create_user(self, username: str, email: str, password: str) -> Dict[str, Any]:
"""Create a new user"""
users = self.load_users()
if username in users:
return {'success': False, 'error': 'Username already exists'}
# Check if email already exists
for user_data in users.values():
if user_data.get('email') == email:
return {'success': False, 'error': 'Email already registered'}
# Create user
user_id = f"user_{len(users) + 1}"
users[username] = {
'user_id': user_id,
'email': email,
'password_hash': self.hash_password(password),
'created_at': datetime.now().isoformat(),
'is_active': True
}
self.save_users(users)
return {'success': True, 'user_id': user_id}
def authenticate_user(self, username: str, password: str) -> Dict[str, Any]:
"""Authenticate user credentials with detailed logging"""
print(f"πŸ” Authentication attempt for: {username}")
users = self.load_users()
print(f"πŸ“Š Total users in database: {len(users)}")
if username not in users:
print(f"❌ Username '{username}' not found")
print(f"πŸ“‹ Available usernames: {list(users.keys())}")
return {'success': False, 'error': 'Invalid username or password'}
user = users[username]
if not self.verify_password(password, user['password_hash']):
print(f"❌ Invalid password for '{username}'")
return {'success': False, 'error': 'Invalid username or password'}
if not user.get('is_active', True):
print(f"❌ User '{username}' is not active")
return {'success': False, 'error': 'Account is disabled'}
# Generate JWT token
try:
token = jwt.encode({
'user_id': user['user_id'],
'username': username,
'exp': datetime.utcnow() + timedelta(hours=8)
}, self.secret_key, algorithm='HS256')
# Track active session
self.add_active_session(user['user_id'], token)
print(f"βœ… Authentication successful for '{username}'")
return {
'success': True,
'token': token,
'user_id': user['user_id'],
'username': username
}
except Exception as e:
print(f"❌ JWT generation failed: {e}")
return {'success': False, 'error': 'Authentication failed'}
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Verify JWT token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
user_id = payload.get('user_id')
# Check if session is still active
if not self.is_session_active(user_id, token):
return None
# Update session activity
self.update_session_activity(user_id)
return payload
except jwt.ExpiredSignatureError:
print("πŸ• Token expired")
return None
except jwt.InvalidTokenError:
print("❌ Invalid token")
return None
except Exception as e:
print(f"❌ Token verification error: {e}")
return None
def create_default_admin(self) -> Dict[str, Any]:
"""Create default admin user if it doesn't exist"""
users = self.load_users()
admin_username = "admin"
admin_user_id = "admin_user"
# Check if admin already exists
if admin_username in users:
return {'success': True, 'message': 'Admin user already exists'}
# Create admin user
try:
users[admin_username] = {
'user_id': admin_user_id,
'email': '[email protected]',
'password_hash': self.hash_password('admin123'),
'created_at': datetime.now().isoformat(),
'is_active': True,
'is_admin': True
}
self.save_users(users)
return {
'success': True,
'message': 'Default admin user created',
'username': admin_username,
'password': 'admin123'
}
except Exception as e:
print(f"❌ Failed to create admin user: {e}")
return {'success': False, 'error': str(e)}
def load_active_sessions(self) -> Dict[str, Any]:
"""Load active sessions"""
if self.use_memory:
return self.sessions_memory.copy()
try:
if self.session_file.exists():
with open(self.session_file, 'r') as f:
return json.load(f)
except Exception as e:
print(f"❌ Error loading sessions: {e}")
return {}
def save_active_sessions(self, sessions: Dict[str, Any]):
"""Save active sessions"""
if self.use_memory:
self.sessions_memory = sessions.copy()
return
try:
with open(self.session_file, 'w') as f:
json.dump(sessions, f, indent=2)
except Exception as e:
print(f"❌ Error saving sessions: {e}")
def add_active_session(self, user_id: str, token: str):
"""Add an active session"""
sessions = self.load_active_sessions()
sessions[user_id] = {
'token': token,
'created_at': datetime.now().isoformat(),
'last_activity': datetime.now().isoformat()
}
self.save_active_sessions(sessions)
def is_session_active(self, user_id: str, token: str) -> bool:
"""Check if a session is active"""
sessions = self.load_active_sessions()
if user_id not in sessions:
return False
session = sessions[user_id]
if session.get('token') != token:
return False
# Check if session is expired
try:
created_at = datetime.fromisoformat(session['created_at'])
if datetime.now() - created_at > timedelta(hours=8):
self.remove_active_session(user_id)
return False
except:
return False
return True
def remove_active_session(self, user_id: str):
"""Remove an active session"""
sessions = self.load_active_sessions()
if user_id in sessions:
del sessions[user_id]
self.save_active_sessions(sessions)
def update_session_activity(self, user_id: str):
"""Update last activity time for a session"""
sessions = self.load_active_sessions()
if user_id in sessions:
sessions[user_id]['last_activity'] = datetime.now().isoformat()
self.save_active_sessions(sessions)
def logout_user(self, user_id: str):
"""Logout user and invalidate session"""
self.remove_active_session(user_id)
return {'success': True, 'message': 'Logged out successfully'}
def cleanup_expired_sessions(self):
"""Clean up expired sessions"""
sessions = self.load_active_sessions()
current_time = datetime.now()
expired_sessions = []
for user_id, session in sessions.items():
try:
created_at = datetime.fromisoformat(session['created_at'])
if current_time - created_at > timedelta(hours=8):
expired_sessions.append(user_id)
except:
expired_sessions.append(user_id)
for user_id in expired_sessions:
del sessions[user_id]
if expired_sessions:
self.save_active_sessions(sessions)
return len(expired_sessions)
def debug_status(self):
"""Debug authentication status"""
users = self.load_users()
sessions = self.load_active_sessions()
print("=== AUTH DEBUG STATUS ===")
print(f"Storage mode: {'Memory' if self.use_memory else 'File'}")
print(f"Users file exists: {self.users_file.exists() if not self.use_memory else 'N/A'}")
print(f"Total users: {len(users)}")
print(f"Active sessions: {len(sessions)}")
if users:
print("Users:")
for username, user_data in users.items():
print(f" - {username}: {user_data.get('user_id', 'No ID')}")
print("========================")
# Global auth manager instance
auth_manager = AuthManager()