File size: 5,753 Bytes
779e761
78dfc96
779e761
 
78dfc96
779e761
 
78dfc96
779e761
 
 
 
 
 
 
 
 
 
 
 
 
78dfc96
 
779e761
 
 
 
 
 
 
 
 
78dfc96
779e761
 
acf2fb5
779e761
 
acf2fb5
779e761
78dfc96
779e761
 
 
 
 
 
 
acf2fb5
779e761
 
 
 
 
 
 
 
 
 
 
 
 
3e53907
acf2fb5
779e761
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e53907
779e761
 
 
 
3e53907
779e761
 
acf2fb5
779e761
 
acf2fb5
779e761
acf2fb5
779e761
 
 
 
acf2fb5
779e761
 
acf2fb5
779e761
 
 
 
 
 
 
acf2fb5
779e761
 
 
 
 
acf2fb5
779e761
 
acf2fb5
 
779e761
acf2fb5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# /home/user/app/services/auth.py
from typing import Optional
import bcrypt # For password hashing
from sqlmodel import select

from models import User, UserCreate, get_session_context # Your SQLModel User and session
from services.logger import app_logger # Your application logger

# --- Password Hashing Utilities ---
def hash_password(password: str) -> str:
    """Hashes a plain-text password using bcrypt."""
    try:
        password_bytes = password.encode('utf-8')
        salt = bcrypt.gensalt()
        hashed_bytes = bcrypt.hashpw(password_bytes, salt)
        return hashed_bytes.decode('utf-8')
    except Exception as e:
        app_logger.error(f"Error during password hashing: {e}", exc_info=True)
        # Re-raise or return a specific error indicator if preferred,
        # but for security, failing to hash should prevent account creation.
        raise ValueError("Password hashing failed") from e

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verifies a plain-text password against a stored bcrypt hash."""
    try:
        plain_password_bytes = plain_password.encode('utf-8')
        hashed_password_bytes = hashed_password.encode('utf-8')
        return bcrypt.checkpw(plain_password_bytes, hashed_password_bytes)
    except Exception as e:
        app_logger.error(f"Error during password verification: {e}", exc_info=True)
        # If bcrypt itself is broken, verification will fail.
        return False

# --- User Creation ---
def create_user_in_db(user_create_data: UserCreate) -> Optional[User]:
    """
    Creates a new user in the database with a hashed password.
    Returns the User object if successful, None otherwise.
    """
    app_logger.info(f"Attempting to create user: {user_create_data.username}")
    try:
        with get_session_context() as session:
            # Check if username already exists
            statement_username = select(User).where(User.username == user_create_data.username)
            existing_user_by_username = session.exec(statement_username).first()
            if existing_user_by_username:
                app_logger.warning(f"Signup failed: Username '{user_create_data.username}' already exists.")
                return None

            # Check if email already exists (if provided and should be unique)
            if user_create_data.email:
                statement_email = select(User).where(User.email == user_create_data.email)
                existing_user_by_email = session.exec(statement_email).first()
                if existing_user_by_email:
                    app_logger.warning(f"Signup failed: Email '{user_create_data.email}' already exists.")
                    return None
            
            try:
                hashed_pw = hash_password(user_create_data.password)
            except ValueError: # Catch hashing specific error
                app_logger.error(f"Could not hash password for user {user_create_data.username} during signup.")
                return None


            # Create the new user object
            db_user = User(
                username=user_create_data.username,
                email=user_create_data.email,
                full_name=user_create_data.full_name, # Assuming UserCreate and User models have this
                disabled=user_create_data.disabled if hasattr(user_create_data, 'disabled') else False,
                hashed_password=hashed_pw
            )
            
            session.add(db_user)
            # The commit is handled by the get_session_context manager upon successful exit.
            # We need to refresh to get DB-generated values like ID before the session closes.
            session.refresh(db_user)
            app_logger.info(f"User '{db_user.username}' (ID: {db_user.id}) created successfully in DB.")
            return db_user

    except Exception as e:
        app_logger.error(f"Database error during user creation for '{user_create_data.username}': {e}", exc_info=True)
        # session.rollback() is handled by get_session_context
        return None

# --- User Authentication ---
def authenticate_user(username_in: str, password_in: str) -> Optional[User]:
    """
    Authenticates a user by username and password.
    Returns the User object if authentication is successful, None otherwise.
    """
    app_logger.info(f"Attempting to authenticate user: {username_in}")
    try:
        with get_session_context() as session:
            statement = select(User).where(User.username == username_in)
            user = session.exec(statement).first()

            if not user:
                app_logger.warning(f"Authentication failed: User '{username_in}' not found.")
                return None
            
            if user.disabled: # Assuming your User model has a 'disabled' field
                app_logger.warning(f"Authentication failed: User '{username_in}' is disabled.")
                return None

            if not verify_password(password_in, user.hashed_password):
                app_logger.warning(f"Authentication failed: Invalid password for user '{username_in}'.")
                return None
            
            # If login is successful, you might want to update a last_login timestamp here
            # user.last_login_at = datetime.utcnow()
            # session.add(user)
            # session.commit() # (handled by context manager)
            # session.refresh(user)

            app_logger.info(f"User '{user.username}' (ID: {user.id}) authenticated successfully.")
            return user # Return the ORM object

    except Exception as e:
        app_logger.error(f"Database or unexpected error during authentication for '{username_in}': {e}", exc_info=True)
        return None