|
|
|
from typing import Optional |
|
import bcrypt |
|
from sqlmodel import select, Session |
|
from sqlalchemy.exc import IntegrityError |
|
|
|
from models import User, UserCreate, get_session_context |
|
from services.logger import app_logger |
|
|
|
|
|
def hash_password(password: str) -> str: |
|
app_logger.debug("Attempting to hash password.") |
|
try: |
|
password_bytes = password.encode('utf-8') |
|
salt = bcrypt.gensalt() |
|
hashed_bytes = bcrypt.hashpw(password_bytes, salt) |
|
hashed_password_str = hashed_bytes.decode('utf-8') |
|
app_logger.debug("Password hashed successfully.") |
|
return hashed_password_str |
|
except Exception as e: |
|
app_logger.error(f"CRITICAL: Password hashing failed: {e}", exc_info=True) |
|
|
|
raise ValueError("Password hashing process failed") from e |
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool: |
|
|
|
app_logger.debug("Attempting to verify password.") |
|
try: |
|
plain_password_bytes = plain_password.encode('utf-8') |
|
hashed_password_bytes = hashed_password.encode('utf-8') |
|
is_valid = bcrypt.checkpw(plain_password_bytes, hashed_password_bytes) |
|
app_logger.debug(f"Password verification result: {is_valid}") |
|
return is_valid |
|
except Exception as e: |
|
app_logger.error(f"CRITICAL: Password verification failed: {e}", exc_info=True) |
|
return False |
|
|
|
|
|
def create_user_in_db(user_create_data: UserCreate) -> Optional[User]: |
|
app_logger.info(f"Attempting to create user in DB: Username='{user_create_data.username}', Email='{user_create_data.email}'") |
|
|
|
|
|
if not user_create_data.username or not user_create_data.password: |
|
app_logger.warning("Signup attempt with empty username or password.") |
|
|
|
return None |
|
|
|
try: |
|
with get_session_context() as session: |
|
app_logger.debug("Database session obtained for user creation.") |
|
|
|
|
|
app_logger.debug(f"Checking for existing username: {user_create_data.username}") |
|
statement_username = select(User).where(User.username == user_create_data.username) |
|
existing_user_by_username = session.exec(statement_username).first() |
|
if existing_user_by_username: |
|
app_logger.warning(f"Signup failed: Username '{user_create_data.username}' already exists.") |
|
|
|
return None |
|
|
|
|
|
if user_create_data.email: |
|
app_logger.debug(f"Checking for existing email: {user_create_data.email}") |
|
statement_email = select(User).where(User.email == user_create_data.email) |
|
existing_user_by_email = session.exec(statement_email).first() |
|
if existing_user_by_email: |
|
app_logger.warning(f"Signup failed: Email '{user_create_data.email}' already exists.") |
|
return None |
|
|
|
|
|
app_logger.debug("Attempting to hash password for new user.") |
|
try: |
|
hashed_pw = hash_password(user_create_data.password) |
|
except ValueError as e_hash: |
|
app_logger.error(f"Password hashing failed for user '{user_create_data.username}': {e_hash}", exc_info=True) |
|
return None |
|
|
|
|
|
app_logger.debug("Creating User ORM object.") |
|
db_user = User( |
|
username=user_create_data.username, |
|
email=user_create_data.email, |
|
|
|
full_name=getattr(user_create_data, 'full_name', None), |
|
disabled=getattr(user_create_data, 'disabled', False), |
|
hashed_password=hashed_pw |
|
) |
|
|
|
|
|
app_logger.debug(f"Adding new user ORM object to session for username: {db_user.username}") |
|
session.add(db_user) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app_logger.info(f"User object for '{db_user.username}' added to session. Commit will be attempted by context manager.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
session.flush() |
|
session.refresh(db_user) |
|
|
|
app_logger.info(f"User '{db_user.username}' (ID: {db_user.id}) prepared for commit. Returning object.") |
|
return db_user |
|
|
|
except IntegrityError as ie: |
|
app_logger.error(f"Database IntegrityError during user creation for '{user_create_data.username}': {ie}", exc_info=True) |
|
|
|
return None |
|
except ValueError as ve: |
|
app_logger.error(f"ValueError during user creation for '{user_create_data.username}': {ve}", exc_info=True) |
|
return None |
|
except Exception as e: |
|
|
|
app_logger.error(f"CRITICAL UNEXPECTED error during user creation for '{user_create_data.username}': {e}", exc_info=True) |
|
|
|
return None |
|
|
|
|
|
def authenticate_user(username_in: str, password_in: str) -> Optional[User]: |
|
|
|
app_logger.info(f"Attempting to authenticate user: {username_in}") |
|
try: |
|
with get_session_context() as session: |
|
statement = select(User).where(User.username == username_in) |
|
user = session.exec(statement).first() |
|
|
|
if not user: |
|
app_logger.warning(f"Authentication failed: User '{username_in}' not found.") |
|
return None |
|
|
|
if hasattr(user, 'disabled') and user.disabled: |
|
app_logger.warning(f"Authentication failed: User '{username_in}' is disabled.") |
|
return None |
|
|
|
if not verify_password(password_in, user.hashed_password): |
|
app_logger.warning(f"Authentication failed: Invalid password for user '{username_in}'.") |
|
return None |
|
|
|
app_logger.info(f"User '{user.username}' (ID: {user.id}) authenticated successfully.") |
|
return user |
|
|
|
except Exception as e: |
|
app_logger.error(f"Database or unexpected error during authentication for '{username_in}': {e}", exc_info=True) |
|
return None |