from passlib.context import CryptContext from sqlmodel import Session, select from typing import Optional from models.user import User, UserCreate from models.db import get_session_context # Using context manager for direct use from services.logger import app_logger pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) def get_user(db: Session, username: str) -> Optional[User]: statement = select(User).where(User.username == username) user = db.exec(statement).first() return user def create_user_in_db(user_data: UserCreate) -> Optional[User]: hashed_password = get_password_hash(user_data.password) db_user = User( username=user_data.username, hashed_password=hashed_password, email=user_data.email, full_name=user_data.full_name ) try: with get_session_context() as db: # Check if user already exists existing_user = get_user(db, user_data.username) if existing_user: app_logger.warning(f"User {user_data.username} already exists.") return None # Or raise an exception db.add(db_user) db.commit() # Commit is handled by context manager, but explicit commit for return db.refresh(db_user) return db_user except Exception as e: app_logger.error(f"Error creating user {user_data.username}: {e}") return None def authenticate_user(username: str, password: str) -> Optional[User]: with get_session_context() as db: user = get_user(db, username) if not user: return None if not verify_password(password, user.hashed_password): return None return user