MedQA / services /auth.py
mgbam's picture
Update services/auth.py
5fcb473 verified
raw
history blame
10.4 kB
# /home/user/app/services/auth.py
from typing import Optional
import bcrypt
from sqlmodel import select, Session # Assuming Session is also imported if used for type hints
from sqlalchemy.exc import IntegrityError # For catching DB unique constraint violations
from models import User, UserCreate, get_session_context
from services.logger import app_logger
# --- Password Hashing Utilities (ensure these are robust) ---
def hash_password(password: str) -> str:
app_logger.debug("Attempting to hash password.")
try:
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
hashed_bytes = bcrypt.hashpw(password_bytes, salt)
hashed_password_str = hashed_bytes.decode('utf-8')
app_logger.debug("Password hashed successfully.")
return hashed_password_str
except Exception as e:
app_logger.error(f"CRITICAL: Password hashing failed: {e}", exc_info=True)
# This exception should be caught by the caller (create_user_in_db)
raise ValueError("Password hashing process failed") from e
def verify_password(plain_password: str, hashed_password: str) -> bool:
# (This function is for login, ensure it's also robust)
app_logger.debug("Attempting to verify password.")
try:
plain_password_bytes = plain_password.encode('utf-8')
hashed_password_bytes = hashed_password.encode('utf-8')
is_valid = bcrypt.checkpw(plain_password_bytes, hashed_password_bytes)
app_logger.debug(f"Password verification result: {is_valid}")
return is_valid
except Exception as e:
app_logger.error(f"CRITICAL: Password verification failed: {e}", exc_info=True)
return False
# --- User Creation with Enhanced Error Handling & Logging ---
def create_user_in_db(user_create_data: UserCreate) -> Optional[User]:
app_logger.info(f"Attempting to create user in DB: Username='{user_create_data.username}', Email='{user_create_data.email}'")
# Pre-validation (optional, but good practice)
if not user_create_data.username or not user_create_data.password:
app_logger.warning("Signup attempt with empty username or password.")
# This should ideally be caught by frontend validation, but good to have a server-side check.
return None # Or raise a specific validation error
try:
with get_session_context() as session: # `session` is a SQLModel Session
app_logger.debug("Database session obtained for user creation.")
# 1. Check if username already exists
app_logger.debug(f"Checking for existing username: {user_create_data.username}")
statement_username = select(User).where(User.username == user_create_data.username)
existing_user_by_username = session.exec(statement_username).first()
if existing_user_by_username:
app_logger.warning(f"Signup failed: Username '{user_create_data.username}' already exists.")
# Consider returning a more specific error indicator or raising a custom exception
return None
# 2. Check if email already exists (if provided and should be unique)
if user_create_data.email:
app_logger.debug(f"Checking for existing email: {user_create_data.email}")
statement_email = select(User).where(User.email == user_create_data.email)
existing_user_by_email = session.exec(statement_email).first()
if existing_user_by_email:
app_logger.warning(f"Signup failed: Email '{user_create_data.email}' already exists.")
return None
# 3. Hash the password
app_logger.debug("Attempting to hash password for new user.")
try:
hashed_pw = hash_password(user_create_data.password)
except ValueError as e_hash: # Catch specific hashing failure
app_logger.error(f"Password hashing failed for user '{user_create_data.username}': {e_hash}", exc_info=True)
return None # Fail signup if password cannot be hashed
# 4. Create the new user object
app_logger.debug("Creating User ORM object.")
db_user = User(
username=user_create_data.username,
email=user_create_data.email,
# Ensure UserCreate and User models have these fields if you use them
full_name=getattr(user_create_data, 'full_name', None),
disabled=getattr(user_create_data, 'disabled', False),
hashed_password=hashed_pw
)
# 5. Add to session and attempt commit (via context manager)
app_logger.debug(f"Adding new user ORM object to session for username: {db_user.username}")
session.add(db_user)
# The commit will be attempted when the 'get_session_context' exits.
# If IntegrityError (like unique constraint) occurs, it will be caught by the outer try-except.
# We need to refresh to get DB-generated values like ID.
# This refresh should happen *before* the commit if the ID is needed immediately
# by the caller, but in SQLModel, the refresh is often done after a successful commit
# to get all DB state. Since the context manager handles the commit,
# we can try to refresh here, anticipating the commit.
# However, if the commit fails, this refresh might also be problematic.
# A common pattern is: add, then let context manager commit, then if successful, re-fetch or use passed-in data.
# For now, let's assume the context manager handles commit on exit.
# If the user object is needed with its ID *immediately after this function returns successfully*,
# and before another session, then a refresh after commit is essential.
# The current design: returns User obj, app.py uses primitive data from form.
# For now, let's log before the context manager attempts commit.
app_logger.info(f"User object for '{db_user.username}' added to session. Commit will be attempted by context manager.")
# If you need the ID right away, you'd commit explicitly and refresh here:
# session.commit()
# session.refresh(db_user)
# The `get_session_context` will handle the commit. If it fails (e.g. IntegrityError),
# it will rollback and raise the exception, caught by the outer `except` block.
# If successful, we need to ensure the returned object has its ID.
# A common way to ensure the ID is populated is to flush and then refresh,
# or ensure the session is configured to load IDs after insert.
# SQLModel typically handles this well if primary_key=True, default=None.
# Let's rely on the session returning the object with its ID after add and successful commit.
# To be absolutely sure the ID is available if the commit in context manager works:
# One strategy is to commit here then refresh.
# Another is to let the context handle commit, then if this function must return the ID,
# it should re-fetch, or the calling code should handle that if it needs a "live" object later.
# Since `app.py` uses the input username for the success message, this is less critical *immediately*.
# The object `db_user` should have its ID populated after the context manager successfully commits.
# If `create_user_in_db` is expected to return a fully usable object with ID,
# an explicit commit and refresh *within* the `with` block (before returning) is safest.
session.flush() # Flushes to DB, assigns ID if auto-increment
session.refresh(db_user) # Refreshes from DB state, ensuring ID and other defaults are loaded
app_logger.info(f"User '{db_user.username}' (ID: {db_user.id}) prepared for commit. Returning object.")
return db_user # This object will be committed by the context manager if no errors.
except IntegrityError as ie: # Catch specific database integrity errors (like unique constraints not caught above)
app_logger.error(f"Database IntegrityError during user creation for '{user_create_data.username}': {ie}", exc_info=True)
# session.rollback() is handled by get_session_context
return None
except ValueError as ve: # Catch value errors, e.g. from hashing if not caught internally
app_logger.error(f"ValueError during user creation for '{user_create_data.username}': {ve}", exc_info=True)
return None
except Exception as e:
# This is a catch-all for any other unexpected errors.
app_logger.error(f"CRITICAL UNEXPECTED error during user creation for '{user_create_data.username}': {e}", exc_info=True)
# session.rollback() is handled by get_session_context
return None
# --- User Authentication (ensure this is robust too) ---
def authenticate_user(username_in: str, password_in: str) -> Optional[User]:
# (Keep the robust version of authenticate_user from previous response)
app_logger.info(f"Attempting to authenticate user: {username_in}")
try:
with get_session_context() as session:
statement = select(User).where(User.username == username_in)
user = session.exec(statement).first()
if not user:
app_logger.warning(f"Authentication failed: User '{username_in}' not found.")
return None
if hasattr(user, 'disabled') and user.disabled:
app_logger.warning(f"Authentication failed: User '{username_in}' is disabled.")
return None
if not verify_password(password_in, user.hashed_password):
app_logger.warning(f"Authentication failed: Invalid password for user '{username_in}'.")
return None
app_logger.info(f"User '{user.username}' (ID: {user.id}) authenticated successfully.")
return user
except Exception as e:
app_logger.error(f"Database or unexpected error during authentication for '{username_in}': {e}", exc_info=True)
return None