File size: 10,416 Bytes
779e761 78dfc96 5fcb473 78dfc96 5fcb473 78dfc96 5fcb473 779e761 5fcb473 779e761 5fcb473 779e761 5fcb473 78dfc96 5fcb473 779e761 5fcb473 779e761 5fcb473 779e761 78dfc96 5fcb473 779e761 5fcb473 78dfc96 5fcb473 779e761 5fcb473 acf2fb5 5fcb473 779e761 5fcb473 779e761 5fcb473 779e761 5fcb473 acf2fb5 5fcb473 779e761 5fcb473 779e761 5fcb473 779e761 5fcb473 3e53907 5fcb473 779e761 5fcb473 779e761 3e53907 5fcb473 779e761 5fcb473 779e761 acf2fb5 779e761 acf2fb5 779e761 acf2fb5 5fcb473 779e761 acf2fb5 779e761 5fcb473 acf2fb5 779e761 acf2fb5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# /home/user/app/services/auth.py
from typing import Optional
import bcrypt
from sqlmodel import select, Session # Assuming Session is also imported if used for type hints
from sqlalchemy.exc import IntegrityError # For catching DB unique constraint violations
from models import User, UserCreate, get_session_context
from services.logger import app_logger
# --- Password Hashing Utilities (ensure these are robust) ---
def hash_password(password: str) -> str:
app_logger.debug("Attempting to hash password.")
try:
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
hashed_bytes = bcrypt.hashpw(password_bytes, salt)
hashed_password_str = hashed_bytes.decode('utf-8')
app_logger.debug("Password hashed successfully.")
return hashed_password_str
except Exception as e:
app_logger.error(f"CRITICAL: Password hashing failed: {e}", exc_info=True)
# This exception should be caught by the caller (create_user_in_db)
raise ValueError("Password hashing process failed") from e
def verify_password(plain_password: str, hashed_password: str) -> bool:
# (This function is for login, ensure it's also robust)
app_logger.debug("Attempting to verify password.")
try:
plain_password_bytes = plain_password.encode('utf-8')
hashed_password_bytes = hashed_password.encode('utf-8')
is_valid = bcrypt.checkpw(plain_password_bytes, hashed_password_bytes)
app_logger.debug(f"Password verification result: {is_valid}")
return is_valid
except Exception as e:
app_logger.error(f"CRITICAL: Password verification failed: {e}", exc_info=True)
return False
# --- User Creation with Enhanced Error Handling & Logging ---
def create_user_in_db(user_create_data: UserCreate) -> Optional[User]:
app_logger.info(f"Attempting to create user in DB: Username='{user_create_data.username}', Email='{user_create_data.email}'")
# Pre-validation (optional, but good practice)
if not user_create_data.username or not user_create_data.password:
app_logger.warning("Signup attempt with empty username or password.")
# This should ideally be caught by frontend validation, but good to have a server-side check.
return None # Or raise a specific validation error
try:
with get_session_context() as session: # `session` is a SQLModel Session
app_logger.debug("Database session obtained for user creation.")
# 1. Check if username already exists
app_logger.debug(f"Checking for existing username: {user_create_data.username}")
statement_username = select(User).where(User.username == user_create_data.username)
existing_user_by_username = session.exec(statement_username).first()
if existing_user_by_username:
app_logger.warning(f"Signup failed: Username '{user_create_data.username}' already exists.")
# Consider returning a more specific error indicator or raising a custom exception
return None
# 2. Check if email already exists (if provided and should be unique)
if user_create_data.email:
app_logger.debug(f"Checking for existing email: {user_create_data.email}")
statement_email = select(User).where(User.email == user_create_data.email)
existing_user_by_email = session.exec(statement_email).first()
if existing_user_by_email:
app_logger.warning(f"Signup failed: Email '{user_create_data.email}' already exists.")
return None
# 3. Hash the password
app_logger.debug("Attempting to hash password for new user.")
try:
hashed_pw = hash_password(user_create_data.password)
except ValueError as e_hash: # Catch specific hashing failure
app_logger.error(f"Password hashing failed for user '{user_create_data.username}': {e_hash}", exc_info=True)
return None # Fail signup if password cannot be hashed
# 4. Create the new user object
app_logger.debug("Creating User ORM object.")
db_user = User(
username=user_create_data.username,
email=user_create_data.email,
# Ensure UserCreate and User models have these fields if you use them
full_name=getattr(user_create_data, 'full_name', None),
disabled=getattr(user_create_data, 'disabled', False),
hashed_password=hashed_pw
)
# 5. Add to session and attempt commit (via context manager)
app_logger.debug(f"Adding new user ORM object to session for username: {db_user.username}")
session.add(db_user)
# The commit will be attempted when the 'get_session_context' exits.
# If IntegrityError (like unique constraint) occurs, it will be caught by the outer try-except.
# We need to refresh to get DB-generated values like ID.
# This refresh should happen *before* the commit if the ID is needed immediately
# by the caller, but in SQLModel, the refresh is often done after a successful commit
# to get all DB state. Since the context manager handles the commit,
# we can try to refresh here, anticipating the commit.
# However, if the commit fails, this refresh might also be problematic.
# A common pattern is: add, then let context manager commit, then if successful, re-fetch or use passed-in data.
# For now, let's assume the context manager handles commit on exit.
# If the user object is needed with its ID *immediately after this function returns successfully*,
# and before another session, then a refresh after commit is essential.
# The current design: returns User obj, app.py uses primitive data from form.
# For now, let's log before the context manager attempts commit.
app_logger.info(f"User object for '{db_user.username}' added to session. Commit will be attempted by context manager.")
# If you need the ID right away, you'd commit explicitly and refresh here:
# session.commit()
# session.refresh(db_user)
# The `get_session_context` will handle the commit. If it fails (e.g. IntegrityError),
# it will rollback and raise the exception, caught by the outer `except` block.
# If successful, we need to ensure the returned object has its ID.
# A common way to ensure the ID is populated is to flush and then refresh,
# or ensure the session is configured to load IDs after insert.
# SQLModel typically handles this well if primary_key=True, default=None.
# Let's rely on the session returning the object with its ID after add and successful commit.
# To be absolutely sure the ID is available if the commit in context manager works:
# One strategy is to commit here then refresh.
# Another is to let the context handle commit, then if this function must return the ID,
# it should re-fetch, or the calling code should handle that if it needs a "live" object later.
# Since `app.py` uses the input username for the success message, this is less critical *immediately*.
# The object `db_user` should have its ID populated after the context manager successfully commits.
# If `create_user_in_db` is expected to return a fully usable object with ID,
# an explicit commit and refresh *within* the `with` block (before returning) is safest.
session.flush() # Flushes to DB, assigns ID if auto-increment
session.refresh(db_user) # Refreshes from DB state, ensuring ID and other defaults are loaded
app_logger.info(f"User '{db_user.username}' (ID: {db_user.id}) prepared for commit. Returning object.")
return db_user # This object will be committed by the context manager if no errors.
except IntegrityError as ie: # Catch specific database integrity errors (like unique constraints not caught above)
app_logger.error(f"Database IntegrityError during user creation for '{user_create_data.username}': {ie}", exc_info=True)
# session.rollback() is handled by get_session_context
return None
except ValueError as ve: # Catch value errors, e.g. from hashing if not caught internally
app_logger.error(f"ValueError during user creation for '{user_create_data.username}': {ve}", exc_info=True)
return None
except Exception as e:
# This is a catch-all for any other unexpected errors.
app_logger.error(f"CRITICAL UNEXPECTED error during user creation for '{user_create_data.username}': {e}", exc_info=True)
# session.rollback() is handled by get_session_context
return None
# --- User Authentication (ensure this is robust too) ---
def authenticate_user(username_in: str, password_in: str) -> Optional[User]:
# (Keep the robust version of authenticate_user from previous response)
app_logger.info(f"Attempting to authenticate user: {username_in}")
try:
with get_session_context() as session:
statement = select(User).where(User.username == username_in)
user = session.exec(statement).first()
if not user:
app_logger.warning(f"Authentication failed: User '{username_in}' not found.")
return None
if hasattr(user, 'disabled') and user.disabled:
app_logger.warning(f"Authentication failed: User '{username_in}' is disabled.")
return None
if not verify_password(password_in, user.hashed_password):
app_logger.warning(f"Authentication failed: Invalid password for user '{username_in}'.")
return None
app_logger.info(f"User '{user.username}' (ID: {user.id}) authenticated successfully.")
return user
except Exception as e:
app_logger.error(f"Database or unexpected error during authentication for '{username_in}': {e}", exc_info=True)
return None |