File size: 5,753 Bytes
779e761 78dfc96 779e761 78dfc96 779e761 78dfc96 779e761 78dfc96 779e761 78dfc96 779e761 acf2fb5 779e761 acf2fb5 779e761 78dfc96 779e761 acf2fb5 779e761 3e53907 acf2fb5 779e761 3e53907 779e761 3e53907 779e761 acf2fb5 779e761 acf2fb5 779e761 acf2fb5 779e761 acf2fb5 779e761 acf2fb5 779e761 acf2fb5 779e761 acf2fb5 779e761 acf2fb5 779e761 acf2fb5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# /home/user/app/services/auth.py
from typing import Optional
import bcrypt # For password hashing
from sqlmodel import select
from models import User, UserCreate, get_session_context # Your SQLModel User and session
from services.logger import app_logger # Your application logger
# --- Password Hashing Utilities ---
def hash_password(password: str) -> str:
"""Hashes a plain-text password using bcrypt."""
try:
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
hashed_bytes = bcrypt.hashpw(password_bytes, salt)
return hashed_bytes.decode('utf-8')
except Exception as e:
app_logger.error(f"Error during password hashing: {e}", exc_info=True)
# Re-raise or return a specific error indicator if preferred,
# but for security, failing to hash should prevent account creation.
raise ValueError("Password hashing failed") from e
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifies a plain-text password against a stored bcrypt hash."""
try:
plain_password_bytes = plain_password.encode('utf-8')
hashed_password_bytes = hashed_password.encode('utf-8')
return bcrypt.checkpw(plain_password_bytes, hashed_password_bytes)
except Exception as e:
app_logger.error(f"Error during password verification: {e}", exc_info=True)
# If bcrypt itself is broken, verification will fail.
return False
# --- User Creation ---
def create_user_in_db(user_create_data: UserCreate) -> Optional[User]:
"""
Creates a new user in the database with a hashed password.
Returns the User object if successful, None otherwise.
"""
app_logger.info(f"Attempting to create user: {user_create_data.username}")
try:
with get_session_context() as session:
# Check if username already exists
statement_username = select(User).where(User.username == user_create_data.username)
existing_user_by_username = session.exec(statement_username).first()
if existing_user_by_username:
app_logger.warning(f"Signup failed: Username '{user_create_data.username}' already exists.")
return None
# Check if email already exists (if provided and should be unique)
if user_create_data.email:
statement_email = select(User).where(User.email == user_create_data.email)
existing_user_by_email = session.exec(statement_email).first()
if existing_user_by_email:
app_logger.warning(f"Signup failed: Email '{user_create_data.email}' already exists.")
return None
try:
hashed_pw = hash_password(user_create_data.password)
except ValueError: # Catch hashing specific error
app_logger.error(f"Could not hash password for user {user_create_data.username} during signup.")
return None
# Create the new user object
db_user = User(
username=user_create_data.username,
email=user_create_data.email,
full_name=user_create_data.full_name, # Assuming UserCreate and User models have this
disabled=user_create_data.disabled if hasattr(user_create_data, 'disabled') else False,
hashed_password=hashed_pw
)
session.add(db_user)
# The commit is handled by the get_session_context manager upon successful exit.
# We need to refresh to get DB-generated values like ID before the session closes.
session.refresh(db_user)
app_logger.info(f"User '{db_user.username}' (ID: {db_user.id}) created successfully in DB.")
return db_user
except Exception as e:
app_logger.error(f"Database error during user creation for '{user_create_data.username}': {e}", exc_info=True)
# session.rollback() is handled by get_session_context
return None
# --- User Authentication ---
def authenticate_user(username_in: str, password_in: str) -> Optional[User]:
"""
Authenticates a user by username and password.
Returns the User object if authentication is successful, None otherwise.
"""
app_logger.info(f"Attempting to authenticate user: {username_in}")
try:
with get_session_context() as session:
statement = select(User).where(User.username == username_in)
user = session.exec(statement).first()
if not user:
app_logger.warning(f"Authentication failed: User '{username_in}' not found.")
return None
if user.disabled: # Assuming your User model has a 'disabled' field
app_logger.warning(f"Authentication failed: User '{username_in}' is disabled.")
return None
if not verify_password(password_in, user.hashed_password):
app_logger.warning(f"Authentication failed: Invalid password for user '{username_in}'.")
return None
# If login is successful, you might want to update a last_login timestamp here
# user.last_login_at = datetime.utcnow()
# session.add(user)
# session.commit() # (handled by context manager)
# session.refresh(user)
app_logger.info(f"User '{user.username}' (ID: {user.id}) authenticated successfully.")
return user # Return the ORM object
except Exception as e:
app_logger.error(f"Database or unexpected error during authentication for '{username_in}': {e}", exc_info=True)
return None |