File size: 10,416 Bytes
779e761
78dfc96
5fcb473
 
 
78dfc96
5fcb473
 
78dfc96
5fcb473
779e761
5fcb473
779e761
 
 
 
5fcb473
 
 
779e761
5fcb473
 
 
78dfc96
 
5fcb473
 
779e761
 
 
5fcb473
 
 
779e761
5fcb473
779e761
78dfc96
5fcb473
779e761
5fcb473
 
 
 
 
 
 
 
78dfc96
5fcb473
 
 
 
 
779e761
 
 
 
5fcb473
 
acf2fb5
5fcb473
779e761
5fcb473
779e761
 
 
 
 
 
5fcb473
 
779e761
 
5fcb473
 
 
acf2fb5
5fcb473
 
779e761
 
 
5fcb473
 
 
779e761
 
 
5fcb473
 
779e761
5fcb473
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e53907
5fcb473
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
779e761
5fcb473
 
779e761
 
3e53907
5fcb473
779e761
5fcb473
779e761
acf2fb5
779e761
 
 
 
acf2fb5
779e761
 
acf2fb5
5fcb473
779e761
 
 
 
 
 
acf2fb5
779e761
5fcb473
acf2fb5
 
779e761
acf2fb5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# /home/user/app/services/auth.py
from typing import Optional
import bcrypt
from sqlmodel import select, Session # Assuming Session is also imported if used for type hints
from sqlalchemy.exc import IntegrityError # For catching DB unique constraint violations

from models import User, UserCreate, get_session_context
from services.logger import app_logger

# --- Password Hashing Utilities (ensure these are robust) ---
def hash_password(password: str) -> str:
    app_logger.debug("Attempting to hash password.")
    try:
        password_bytes = password.encode('utf-8')
        salt = bcrypt.gensalt()
        hashed_bytes = bcrypt.hashpw(password_bytes, salt)
        hashed_password_str = hashed_bytes.decode('utf-8')
        app_logger.debug("Password hashed successfully.")
        return hashed_password_str
    except Exception as e:
        app_logger.error(f"CRITICAL: Password hashing failed: {e}", exc_info=True)
        # This exception should be caught by the caller (create_user_in_db)
        raise ValueError("Password hashing process failed") from e

def verify_password(plain_password: str, hashed_password: str) -> bool:
    # (This function is for login, ensure it's also robust)
    app_logger.debug("Attempting to verify password.")
    try:
        plain_password_bytes = plain_password.encode('utf-8')
        hashed_password_bytes = hashed_password.encode('utf-8')
        is_valid = bcrypt.checkpw(plain_password_bytes, hashed_password_bytes)
        app_logger.debug(f"Password verification result: {is_valid}")
        return is_valid
    except Exception as e:
        app_logger.error(f"CRITICAL: Password verification failed: {e}", exc_info=True)
        return False

# --- User Creation with Enhanced Error Handling & Logging ---
def create_user_in_db(user_create_data: UserCreate) -> Optional[User]:
    app_logger.info(f"Attempting to create user in DB: Username='{user_create_data.username}', Email='{user_create_data.email}'")
    
    # Pre-validation (optional, but good practice)
    if not user_create_data.username or not user_create_data.password:
        app_logger.warning("Signup attempt with empty username or password.")
        # This should ideally be caught by frontend validation, but good to have a server-side check.
        return None # Or raise a specific validation error

    try:
        with get_session_context() as session: # `session` is a SQLModel Session
            app_logger.debug("Database session obtained for user creation.")

            # 1. Check if username already exists
            app_logger.debug(f"Checking for existing username: {user_create_data.username}")
            statement_username = select(User).where(User.username == user_create_data.username)
            existing_user_by_username = session.exec(statement_username).first()
            if existing_user_by_username:
                app_logger.warning(f"Signup failed: Username '{user_create_data.username}' already exists.")
                # Consider returning a more specific error indicator or raising a custom exception
                return None 

            # 2. Check if email already exists (if provided and should be unique)
            if user_create_data.email:
                app_logger.debug(f"Checking for existing email: {user_create_data.email}")
                statement_email = select(User).where(User.email == user_create_data.email)
                existing_user_by_email = session.exec(statement_email).first()
                if existing_user_by_email:
                    app_logger.warning(f"Signup failed: Email '{user_create_data.email}' already exists.")
                    return None
            
            # 3. Hash the password
            app_logger.debug("Attempting to hash password for new user.")
            try:
                hashed_pw = hash_password(user_create_data.password)
            except ValueError as e_hash: # Catch specific hashing failure
                app_logger.error(f"Password hashing failed for user '{user_create_data.username}': {e_hash}", exc_info=True)
                return None # Fail signup if password cannot be hashed

            # 4. Create the new user object
            app_logger.debug("Creating User ORM object.")
            db_user = User(
                username=user_create_data.username,
                email=user_create_data.email,
                # Ensure UserCreate and User models have these fields if you use them
                full_name=getattr(user_create_data, 'full_name', None),
                disabled=getattr(user_create_data, 'disabled', False),
                hashed_password=hashed_pw
            )
            
            # 5. Add to session and attempt commit (via context manager)
            app_logger.debug(f"Adding new user ORM object to session for username: {db_user.username}")
            session.add(db_user)
            
            # The commit will be attempted when the 'get_session_context' exits.
            # If IntegrityError (like unique constraint) occurs, it will be caught by the outer try-except.
            # We need to refresh to get DB-generated values like ID.
            # This refresh should happen *before* the commit if the ID is needed immediately
            # by the caller, but in SQLModel, the refresh is often done after a successful commit
            # to get all DB state. Since the context manager handles the commit,
            # we can try to refresh here, anticipating the commit.
            # However, if the commit fails, this refresh might also be problematic.
            # A common pattern is: add, then let context manager commit, then if successful, re-fetch or use passed-in data.
            # For now, let's assume the context manager handles commit on exit.
            # If the user object is needed with its ID *immediately after this function returns successfully*,
            # and before another session, then a refresh after commit is essential.
            # The current design: returns User obj, app.py uses primitive data from form.

            # For now, let's log before the context manager attempts commit.
            app_logger.info(f"User object for '{db_user.username}' added to session. Commit will be attempted by context manager.")
            # If you need the ID right away, you'd commit explicitly and refresh here:
            # session.commit()
            # session.refresh(db_user)

            # The `get_session_context` will handle the commit. If it fails (e.g. IntegrityError),
            # it will rollback and raise the exception, caught by the outer `except` block.
            # If successful, we need to ensure the returned object has its ID.
            # A common way to ensure the ID is populated is to flush and then refresh,
            # or ensure the session is configured to load IDs after insert.
            # SQLModel typically handles this well if primary_key=True, default=None.
            # Let's rely on the session returning the object with its ID after add and successful commit.
            # To be absolutely sure the ID is available if the commit in context manager works:
            # One strategy is to commit here then refresh.
            # Another is to let the context handle commit, then if this function must return the ID,
            # it should re-fetch, or the calling code should handle that if it needs a "live" object later.
            # Since `app.py` uses the input username for the success message, this is less critical *immediately*.

            # The object `db_user` should have its ID populated after the context manager successfully commits.
            # If `create_user_in_db` is expected to return a fully usable object with ID,
            # an explicit commit and refresh *within* the `with` block (before returning) is safest.
            session.flush() # Flushes to DB, assigns ID if auto-increment
            session.refresh(db_user) # Refreshes from DB state, ensuring ID and other defaults are loaded

            app_logger.info(f"User '{db_user.username}' (ID: {db_user.id}) prepared for commit. Returning object.")
            return db_user # This object will be committed by the context manager if no errors.

    except IntegrityError as ie: # Catch specific database integrity errors (like unique constraints not caught above)
        app_logger.error(f"Database IntegrityError during user creation for '{user_create_data.username}': {ie}", exc_info=True)
        # session.rollback() is handled by get_session_context
        return None
    except ValueError as ve: # Catch value errors, e.g. from hashing if not caught internally
        app_logger.error(f"ValueError during user creation for '{user_create_data.username}': {ve}", exc_info=True)
        return None
    except Exception as e:
        # This is a catch-all for any other unexpected errors.
        app_logger.error(f"CRITICAL UNEXPECTED error during user creation for '{user_create_data.username}': {e}", exc_info=True)
        # session.rollback() is handled by get_session_context
        return None

# --- User Authentication (ensure this is robust too) ---
def authenticate_user(username_in: str, password_in: str) -> Optional[User]:
    # (Keep the robust version of authenticate_user from previous response)
    app_logger.info(f"Attempting to authenticate user: {username_in}")
    try:
        with get_session_context() as session:
            statement = select(User).where(User.username == username_in)
            user = session.exec(statement).first()

            if not user:
                app_logger.warning(f"Authentication failed: User '{username_in}' not found.")
                return None
            
            if hasattr(user, 'disabled') and user.disabled:
                app_logger.warning(f"Authentication failed: User '{username_in}' is disabled.")
                return None

            if not verify_password(password_in, user.hashed_password):
                app_logger.warning(f"Authentication failed: Invalid password for user '{username_in}'.")
                return None
            
            app_logger.info(f"User '{user.username}' (ID: {user.id}) authenticated successfully.")
            return user

    except Exception as e:
        app_logger.error(f"Database or unexpected error during authentication for '{username_in}': {e}", exc_info=True)
        return None