MedQA / services /auth.py
mgbam's picture
Update services/auth.py
779e761 verified
raw
history blame
5.75 kB
# /home/user/app/services/auth.py
from typing import Optional
import bcrypt # For password hashing
from sqlmodel import select
from models import User, UserCreate, get_session_context # Your SQLModel User and session
from services.logger import app_logger # Your application logger
# --- Password Hashing Utilities ---
def hash_password(password: str) -> str:
"""Hashes a plain-text password using bcrypt."""
try:
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
hashed_bytes = bcrypt.hashpw(password_bytes, salt)
return hashed_bytes.decode('utf-8')
except Exception as e:
app_logger.error(f"Error during password hashing: {e}", exc_info=True)
# Re-raise or return a specific error indicator if preferred,
# but for security, failing to hash should prevent account creation.
raise ValueError("Password hashing failed") from e
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifies a plain-text password against a stored bcrypt hash."""
try:
plain_password_bytes = plain_password.encode('utf-8')
hashed_password_bytes = hashed_password.encode('utf-8')
return bcrypt.checkpw(plain_password_bytes, hashed_password_bytes)
except Exception as e:
app_logger.error(f"Error during password verification: {e}", exc_info=True)
# If bcrypt itself is broken, verification will fail.
return False
# --- User Creation ---
def create_user_in_db(user_create_data: UserCreate) -> Optional[User]:
"""
Creates a new user in the database with a hashed password.
Returns the User object if successful, None otherwise.
"""
app_logger.info(f"Attempting to create user: {user_create_data.username}")
try:
with get_session_context() as session:
# Check if username already exists
statement_username = select(User).where(User.username == user_create_data.username)
existing_user_by_username = session.exec(statement_username).first()
if existing_user_by_username:
app_logger.warning(f"Signup failed: Username '{user_create_data.username}' already exists.")
return None
# Check if email already exists (if provided and should be unique)
if user_create_data.email:
statement_email = select(User).where(User.email == user_create_data.email)
existing_user_by_email = session.exec(statement_email).first()
if existing_user_by_email:
app_logger.warning(f"Signup failed: Email '{user_create_data.email}' already exists.")
return None
try:
hashed_pw = hash_password(user_create_data.password)
except ValueError: # Catch hashing specific error
app_logger.error(f"Could not hash password for user {user_create_data.username} during signup.")
return None
# Create the new user object
db_user = User(
username=user_create_data.username,
email=user_create_data.email,
full_name=user_create_data.full_name, # Assuming UserCreate and User models have this
disabled=user_create_data.disabled if hasattr(user_create_data, 'disabled') else False,
hashed_password=hashed_pw
)
session.add(db_user)
# The commit is handled by the get_session_context manager upon successful exit.
# We need to refresh to get DB-generated values like ID before the session closes.
session.refresh(db_user)
app_logger.info(f"User '{db_user.username}' (ID: {db_user.id}) created successfully in DB.")
return db_user
except Exception as e:
app_logger.error(f"Database error during user creation for '{user_create_data.username}': {e}", exc_info=True)
# session.rollback() is handled by get_session_context
return None
# --- User Authentication ---
def authenticate_user(username_in: str, password_in: str) -> Optional[User]:
"""
Authenticates a user by username and password.
Returns the User object if authentication is successful, None otherwise.
"""
app_logger.info(f"Attempting to authenticate user: {username_in}")
try:
with get_session_context() as session:
statement = select(User).where(User.username == username_in)
user = session.exec(statement).first()
if not user:
app_logger.warning(f"Authentication failed: User '{username_in}' not found.")
return None
if user.disabled: # Assuming your User model has a 'disabled' field
app_logger.warning(f"Authentication failed: User '{username_in}' is disabled.")
return None
if not verify_password(password_in, user.hashed_password):
app_logger.warning(f"Authentication failed: Invalid password for user '{username_in}'.")
return None
# If login is successful, you might want to update a last_login timestamp here
# user.last_login_at = datetime.utcnow()
# session.add(user)
# session.commit() # (handled by context manager)
# session.refresh(user)
app_logger.info(f"User '{user.username}' (ID: {user.id}) authenticated successfully.")
return user # Return the ORM object
except Exception as e:
app_logger.error(f"Database or unexpected error during authentication for '{username_in}': {e}", exc_info=True)
return None