|
|
|
from typing import Optional |
|
import bcrypt |
|
from sqlmodel import select |
|
|
|
from models import User, UserCreate, get_session_context |
|
from services.logger import app_logger |
|
|
|
|
|
def hash_password(password: str) -> str: |
|
"""Hashes a plain-text password using bcrypt.""" |
|
try: |
|
password_bytes = password.encode('utf-8') |
|
salt = bcrypt.gensalt() |
|
hashed_bytes = bcrypt.hashpw(password_bytes, salt) |
|
return hashed_bytes.decode('utf-8') |
|
except Exception as e: |
|
app_logger.error(f"Error during password hashing: {e}", exc_info=True) |
|
|
|
|
|
raise ValueError("Password hashing failed") from e |
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool: |
|
"""Verifies a plain-text password against a stored bcrypt hash.""" |
|
try: |
|
plain_password_bytes = plain_password.encode('utf-8') |
|
hashed_password_bytes = hashed_password.encode('utf-8') |
|
return bcrypt.checkpw(plain_password_bytes, hashed_password_bytes) |
|
except Exception as e: |
|
app_logger.error(f"Error during password verification: {e}", exc_info=True) |
|
|
|
return False |
|
|
|
|
|
def create_user_in_db(user_create_data: UserCreate) -> Optional[User]: |
|
""" |
|
Creates a new user in the database with a hashed password. |
|
Returns the User object if successful, None otherwise. |
|
""" |
|
app_logger.info(f"Attempting to create user: {user_create_data.username}") |
|
try: |
|
with get_session_context() as session: |
|
|
|
statement_username = select(User).where(User.username == user_create_data.username) |
|
existing_user_by_username = session.exec(statement_username).first() |
|
if existing_user_by_username: |
|
app_logger.warning(f"Signup failed: Username '{user_create_data.username}' already exists.") |
|
return None |
|
|
|
|
|
if user_create_data.email: |
|
statement_email = select(User).where(User.email == user_create_data.email) |
|
existing_user_by_email = session.exec(statement_email).first() |
|
if existing_user_by_email: |
|
app_logger.warning(f"Signup failed: Email '{user_create_data.email}' already exists.") |
|
return None |
|
|
|
try: |
|
hashed_pw = hash_password(user_create_data.password) |
|
except ValueError: |
|
app_logger.error(f"Could not hash password for user {user_create_data.username} during signup.") |
|
return None |
|
|
|
|
|
|
|
db_user = User( |
|
username=user_create_data.username, |
|
email=user_create_data.email, |
|
full_name=user_create_data.full_name, |
|
disabled=user_create_data.disabled if hasattr(user_create_data, 'disabled') else False, |
|
hashed_password=hashed_pw |
|
) |
|
|
|
session.add(db_user) |
|
|
|
|
|
session.refresh(db_user) |
|
app_logger.info(f"User '{db_user.username}' (ID: {db_user.id}) created successfully in DB.") |
|
return db_user |
|
|
|
except Exception as e: |
|
app_logger.error(f"Database error during user creation for '{user_create_data.username}': {e}", exc_info=True) |
|
|
|
return None |
|
|
|
|
|
def authenticate_user(username_in: str, password_in: str) -> Optional[User]: |
|
""" |
|
Authenticates a user by username and password. |
|
Returns the User object if authentication is successful, None otherwise. |
|
""" |
|
app_logger.info(f"Attempting to authenticate user: {username_in}") |
|
try: |
|
with get_session_context() as session: |
|
statement = select(User).where(User.username == username_in) |
|
user = session.exec(statement).first() |
|
|
|
if not user: |
|
app_logger.warning(f"Authentication failed: User '{username_in}' not found.") |
|
return None |
|
|
|
if user.disabled: |
|
app_logger.warning(f"Authentication failed: User '{username_in}' is disabled.") |
|
return None |
|
|
|
if not verify_password(password_in, user.hashed_password): |
|
app_logger.warning(f"Authentication failed: Invalid password for user '{username_in}'.") |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app_logger.info(f"User '{user.username}' (ID: {user.id}) authenticated successfully.") |
|
return user |
|
|
|
except Exception as e: |
|
app_logger.error(f"Database or unexpected error during authentication for '{username_in}': {e}", exc_info=True) |
|
return None |