MedQA / services /auth.py
mgbam's picture
Update services/auth.py
acf2fb5 verified
raw
history blame
6.1 kB
from passlib.context import CryptContext
from sqlmodel import Session, select
from typing import Optional
from models.user import User, UserCreate # Direct import
from models.db import get_session_context
from services.logger import app_logger
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def get_user_by_username(db: Session, username: str) -> Optional[User]:
"""Fetches a user by username."""
statement = select(User).where(User.username == username)
user = db.exec(statement).first()
return user
def create_user_in_db(user_data: UserCreate) -> Optional[User]:
"""
Creates a new user in the database.
Returns the User object with basic attributes loaded, suitable for immediate
use even after the session creating it has closed.
"""
hashed_password = get_password_hash(user_data.password)
# Create the Python object instance.
# Initialize relationships like chat_sessions to an empty list if appropriate,
# as they won't be populated from the DB for a new user yet.
new_user_instance = User(
username=user_data.username,
hashed_password=hashed_password,
email=user_data.email,
full_name=user_data.full_name,
chat_sessions=[] # Good practice to initialize for new instances
)
created_user_id: Optional[int] = None
try:
with get_session_context() as db:
# Check if user already exists
existing_user = get_user_by_username(db, user_data.username)
if existing_user:
app_logger.warning(f"User {user_data.username} already exists.")
return None # User already exists
db.add(new_user_instance)
db.commit() # Commit to save the user and allow DB to generate ID
db.refresh(new_user_instance) # Refresh to get all attributes, especially the DB-generated ID
# Store the ID so we can re-fetch a "clean" instance if needed,
# or to ensure we are working with the persisted state.
created_user_id = new_user_instance.id
# To ensure the returned object is safe for use after this session closes,
# especially for attributes that might be displayed immediately,
# we can load them explicitly or re-fetch.
# For simple attributes, refresh should be enough.
# We will return a freshly fetched instance to be absolutely sure.
except Exception as e:
app_logger.error(f"Error during database operation for user {user_data.username}: {e}")
return None # DB operation failed
# If user was created, fetch a fresh instance to return.
# This new instance will be detached but will have its basic attributes loaded.
if created_user_id is not None:
try:
with get_session_context() as db:
# .get() is efficient for fetching by primary key
final_user_to_return = db.get(User, created_user_id)
if final_user_to_return:
# "Touch" attributes that will be used immediately after this function returns
# to ensure they are loaded before this (new) session closes.
_ = final_user_to_return.id
_ = final_user_to_return.username
_ = final_user_to_return.email
_ = final_user_to_return.full_name
# Do NOT try to access final_user_to_return.chat_sessions here unless you
# intend to load them, which might be a heavy operation.
# They will be lazy-loaded when accessed within an active session later.
return final_user_to_return
except Exception as e:
app_logger.error(f"Error re-fetching created user {created_user_id}: {e}")
return None # Failed to re-fetch
return None # Fallback if user was not created or couldn't be re-fetched
def authenticate_user(username: str, password: str) -> Optional[User]:
"""
Authenticates a user.
Returns the User object with basic attributes loaded.
"""
try:
with get_session_context() as db:
user = get_user_by_username(db, username)
if not user:
return None # User not found
if not verify_password(password, user.hashed_password):
return None # Invalid password
# User is authenticated. Before returning the user object,
# ensure any attributes that will be immediately accessed by the caller
# (e.g., when setting st.session_state.authenticated_user) are loaded.
# This helps avoid DetachedInstanceError if Streamlit or other parts
# of the app try to access these attributes after this session closes.
user_id_to_return = user.id # Get ID to re-fetch
# Re-fetch the user in a new session context to return a "clean" detached object
# This is a robust way to prevent issues with detached instances being used across session boundaries.
if user_id_to_return is not None:
with get_session_context() as db:
authenticated_user = db.get(User, user_id_to_return)
if authenticated_user:
# "Touch" attributes
_ = authenticated_user.id
_ = authenticated_user.username
_ = authenticated_user.email
# Do not touch relationships unless intended for eager load here.
return authenticated_user
return None # Should not happen if authenticated
except Exception as e:
app_logger.error(f"Error during authentication for user {username}: {e}")
return None