mgbam commited on
Commit
5fcb473
·
verified ·
1 Parent(s): 779e761

Update services/auth.py

Browse files
Files changed (1) hide show
  1. services/auth.py +104 -51
services/auth.py CHANGED
@@ -1,94 +1,153 @@
1
  # /home/user/app/services/auth.py
2
  from typing import Optional
3
- import bcrypt # For password hashing
4
- from sqlmodel import select
 
5
 
6
- from models import User, UserCreate, get_session_context # Your SQLModel User and session
7
- from services.logger import app_logger # Your application logger
8
 
9
- # --- Password Hashing Utilities ---
10
  def hash_password(password: str) -> str:
11
- """Hashes a plain-text password using bcrypt."""
12
  try:
13
  password_bytes = password.encode('utf-8')
14
  salt = bcrypt.gensalt()
15
  hashed_bytes = bcrypt.hashpw(password_bytes, salt)
16
- return hashed_bytes.decode('utf-8')
 
 
17
  except Exception as e:
18
- app_logger.error(f"Error during password hashing: {e}", exc_info=True)
19
- # Re-raise or return a specific error indicator if preferred,
20
- # but for security, failing to hash should prevent account creation.
21
- raise ValueError("Password hashing failed") from e
22
 
23
  def verify_password(plain_password: str, hashed_password: str) -> bool:
24
- """Verifies a plain-text password against a stored bcrypt hash."""
 
25
  try:
26
  plain_password_bytes = plain_password.encode('utf-8')
27
  hashed_password_bytes = hashed_password.encode('utf-8')
28
- return bcrypt.checkpw(plain_password_bytes, hashed_password_bytes)
 
 
29
  except Exception as e:
30
- app_logger.error(f"Error during password verification: {e}", exc_info=True)
31
- # If bcrypt itself is broken, verification will fail.
32
  return False
33
 
34
- # --- User Creation ---
35
  def create_user_in_db(user_create_data: UserCreate) -> Optional[User]:
36
- """
37
- Creates a new user in the database with a hashed password.
38
- Returns the User object if successful, None otherwise.
39
- """
40
- app_logger.info(f"Attempting to create user: {user_create_data.username}")
 
 
 
41
  try:
42
- with get_session_context() as session:
43
- # Check if username already exists
 
 
 
44
  statement_username = select(User).where(User.username == user_create_data.username)
45
  existing_user_by_username = session.exec(statement_username).first()
46
  if existing_user_by_username:
47
  app_logger.warning(f"Signup failed: Username '{user_create_data.username}' already exists.")
48
- return None
 
49
 
50
- # Check if email already exists (if provided and should be unique)
51
  if user_create_data.email:
 
52
  statement_email = select(User).where(User.email == user_create_data.email)
53
  existing_user_by_email = session.exec(statement_email).first()
54
  if existing_user_by_email:
55
  app_logger.warning(f"Signup failed: Email '{user_create_data.email}' already exists.")
56
  return None
57
 
 
 
58
  try:
59
  hashed_pw = hash_password(user_create_data.password)
60
- except ValueError: # Catch hashing specific error
61
- app_logger.error(f"Could not hash password for user {user_create_data.username} during signup.")
62
- return None
63
-
64
 
65
- # Create the new user object
 
66
  db_user = User(
67
  username=user_create_data.username,
68
  email=user_create_data.email,
69
- full_name=user_create_data.full_name, # Assuming UserCreate and User models have this
70
- disabled=user_create_data.disabled if hasattr(user_create_data, 'disabled') else False,
 
71
  hashed_password=hashed_pw
72
  )
73
 
 
 
74
  session.add(db_user)
75
- # The commit is handled by the get_session_context manager upon successful exit.
76
- # We need to refresh to get DB-generated values like ID before the session closes.
77
- session.refresh(db_user)
78
- app_logger.info(f"User '{db_user.username}' (ID: {db_user.id}) created successfully in DB.")
79
- return db_user
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
  except Exception as e:
82
- app_logger.error(f"Database error during user creation for '{user_create_data.username}': {e}", exc_info=True)
 
83
  # session.rollback() is handled by get_session_context
84
  return None
85
 
86
- # --- User Authentication ---
87
  def authenticate_user(username_in: str, password_in: str) -> Optional[User]:
88
- """
89
- Authenticates a user by username and password.
90
- Returns the User object if authentication is successful, None otherwise.
91
- """
92
  app_logger.info(f"Attempting to authenticate user: {username_in}")
93
  try:
94
  with get_session_context() as session:
@@ -99,7 +158,7 @@ def authenticate_user(username_in: str, password_in: str) -> Optional[User]:
99
  app_logger.warning(f"Authentication failed: User '{username_in}' not found.")
100
  return None
101
 
102
- if user.disabled: # Assuming your User model has a 'disabled' field
103
  app_logger.warning(f"Authentication failed: User '{username_in}' is disabled.")
104
  return None
105
 
@@ -107,14 +166,8 @@ def authenticate_user(username_in: str, password_in: str) -> Optional[User]:
107
  app_logger.warning(f"Authentication failed: Invalid password for user '{username_in}'.")
108
  return None
109
 
110
- # If login is successful, you might want to update a last_login timestamp here
111
- # user.last_login_at = datetime.utcnow()
112
- # session.add(user)
113
- # session.commit() # (handled by context manager)
114
- # session.refresh(user)
115
-
116
  app_logger.info(f"User '{user.username}' (ID: {user.id}) authenticated successfully.")
117
- return user # Return the ORM object
118
 
119
  except Exception as e:
120
  app_logger.error(f"Database or unexpected error during authentication for '{username_in}': {e}", exc_info=True)
 
1
  # /home/user/app/services/auth.py
2
  from typing import Optional
3
+ import bcrypt
4
+ from sqlmodel import select, Session # Assuming Session is also imported if used for type hints
5
+ from sqlalchemy.exc import IntegrityError # For catching DB unique constraint violations
6
 
7
+ from models import User, UserCreate, get_session_context
8
+ from services.logger import app_logger
9
 
10
+ # --- Password Hashing Utilities (ensure these are robust) ---
11
  def hash_password(password: str) -> str:
12
+ app_logger.debug("Attempting to hash password.")
13
  try:
14
  password_bytes = password.encode('utf-8')
15
  salt = bcrypt.gensalt()
16
  hashed_bytes = bcrypt.hashpw(password_bytes, salt)
17
+ hashed_password_str = hashed_bytes.decode('utf-8')
18
+ app_logger.debug("Password hashed successfully.")
19
+ return hashed_password_str
20
  except Exception as e:
21
+ app_logger.error(f"CRITICAL: Password hashing failed: {e}", exc_info=True)
22
+ # This exception should be caught by the caller (create_user_in_db)
23
+ raise ValueError("Password hashing process failed") from e
 
24
 
25
  def verify_password(plain_password: str, hashed_password: str) -> bool:
26
+ # (This function is for login, ensure it's also robust)
27
+ app_logger.debug("Attempting to verify password.")
28
  try:
29
  plain_password_bytes = plain_password.encode('utf-8')
30
  hashed_password_bytes = hashed_password.encode('utf-8')
31
+ is_valid = bcrypt.checkpw(plain_password_bytes, hashed_password_bytes)
32
+ app_logger.debug(f"Password verification result: {is_valid}")
33
+ return is_valid
34
  except Exception as e:
35
+ app_logger.error(f"CRITICAL: Password verification failed: {e}", exc_info=True)
 
36
  return False
37
 
38
+ # --- User Creation with Enhanced Error Handling & Logging ---
39
  def create_user_in_db(user_create_data: UserCreate) -> Optional[User]:
40
+ app_logger.info(f"Attempting to create user in DB: Username='{user_create_data.username}', Email='{user_create_data.email}'")
41
+
42
+ # Pre-validation (optional, but good practice)
43
+ if not user_create_data.username or not user_create_data.password:
44
+ app_logger.warning("Signup attempt with empty username or password.")
45
+ # This should ideally be caught by frontend validation, but good to have a server-side check.
46
+ return None # Or raise a specific validation error
47
+
48
  try:
49
+ with get_session_context() as session: # `session` is a SQLModel Session
50
+ app_logger.debug("Database session obtained for user creation.")
51
+
52
+ # 1. Check if username already exists
53
+ app_logger.debug(f"Checking for existing username: {user_create_data.username}")
54
  statement_username = select(User).where(User.username == user_create_data.username)
55
  existing_user_by_username = session.exec(statement_username).first()
56
  if existing_user_by_username:
57
  app_logger.warning(f"Signup failed: Username '{user_create_data.username}' already exists.")
58
+ # Consider returning a more specific error indicator or raising a custom exception
59
+ return None
60
 
61
+ # 2. Check if email already exists (if provided and should be unique)
62
  if user_create_data.email:
63
+ app_logger.debug(f"Checking for existing email: {user_create_data.email}")
64
  statement_email = select(User).where(User.email == user_create_data.email)
65
  existing_user_by_email = session.exec(statement_email).first()
66
  if existing_user_by_email:
67
  app_logger.warning(f"Signup failed: Email '{user_create_data.email}' already exists.")
68
  return None
69
 
70
+ # 3. Hash the password
71
+ app_logger.debug("Attempting to hash password for new user.")
72
  try:
73
  hashed_pw = hash_password(user_create_data.password)
74
+ except ValueError as e_hash: # Catch specific hashing failure
75
+ app_logger.error(f"Password hashing failed for user '{user_create_data.username}': {e_hash}", exc_info=True)
76
+ return None # Fail signup if password cannot be hashed
 
77
 
78
+ # 4. Create the new user object
79
+ app_logger.debug("Creating User ORM object.")
80
  db_user = User(
81
  username=user_create_data.username,
82
  email=user_create_data.email,
83
+ # Ensure UserCreate and User models have these fields if you use them
84
+ full_name=getattr(user_create_data, 'full_name', None),
85
+ disabled=getattr(user_create_data, 'disabled', False),
86
  hashed_password=hashed_pw
87
  )
88
 
89
+ # 5. Add to session and attempt commit (via context manager)
90
+ app_logger.debug(f"Adding new user ORM object to session for username: {db_user.username}")
91
  session.add(db_user)
92
+
93
+ # The commit will be attempted when the 'get_session_context' exits.
94
+ # If IntegrityError (like unique constraint) occurs, it will be caught by the outer try-except.
95
+ # We need to refresh to get DB-generated values like ID.
96
+ # This refresh should happen *before* the commit if the ID is needed immediately
97
+ # by the caller, but in SQLModel, the refresh is often done after a successful commit
98
+ # to get all DB state. Since the context manager handles the commit,
99
+ # we can try to refresh here, anticipating the commit.
100
+ # However, if the commit fails, this refresh might also be problematic.
101
+ # A common pattern is: add, then let context manager commit, then if successful, re-fetch or use passed-in data.
102
+ # For now, let's assume the context manager handles commit on exit.
103
+ # If the user object is needed with its ID *immediately after this function returns successfully*,
104
+ # and before another session, then a refresh after commit is essential.
105
+ # The current design: returns User obj, app.py uses primitive data from form.
106
+
107
+ # For now, let's log before the context manager attempts commit.
108
+ app_logger.info(f"User object for '{db_user.username}' added to session. Commit will be attempted by context manager.")
109
+ # If you need the ID right away, you'd commit explicitly and refresh here:
110
+ # session.commit()
111
+ # session.refresh(db_user)
112
 
113
+ # The `get_session_context` will handle the commit. If it fails (e.g. IntegrityError),
114
+ # it will rollback and raise the exception, caught by the outer `except` block.
115
+ # If successful, we need to ensure the returned object has its ID.
116
+ # A common way to ensure the ID is populated is to flush and then refresh,
117
+ # or ensure the session is configured to load IDs after insert.
118
+ # SQLModel typically handles this well if primary_key=True, default=None.
119
+ # Let's rely on the session returning the object with its ID after add and successful commit.
120
+ # To be absolutely sure the ID is available if the commit in context manager works:
121
+ # One strategy is to commit here then refresh.
122
+ # Another is to let the context handle commit, then if this function must return the ID,
123
+ # it should re-fetch, or the calling code should handle that if it needs a "live" object later.
124
+ # Since `app.py` uses the input username for the success message, this is less critical *immediately*.
125
+
126
+ # The object `db_user` should have its ID populated after the context manager successfully commits.
127
+ # If `create_user_in_db` is expected to return a fully usable object with ID,
128
+ # an explicit commit and refresh *within* the `with` block (before returning) is safest.
129
+ session.flush() # Flushes to DB, assigns ID if auto-increment
130
+ session.refresh(db_user) # Refreshes from DB state, ensuring ID and other defaults are loaded
131
+
132
+ app_logger.info(f"User '{db_user.username}' (ID: {db_user.id}) prepared for commit. Returning object.")
133
+ return db_user # This object will be committed by the context manager if no errors.
134
+
135
+ except IntegrityError as ie: # Catch specific database integrity errors (like unique constraints not caught above)
136
+ app_logger.error(f"Database IntegrityError during user creation for '{user_create_data.username}': {ie}", exc_info=True)
137
+ # session.rollback() is handled by get_session_context
138
+ return None
139
+ except ValueError as ve: # Catch value errors, e.g. from hashing if not caught internally
140
+ app_logger.error(f"ValueError during user creation for '{user_create_data.username}': {ve}", exc_info=True)
141
+ return None
142
  except Exception as e:
143
+ # This is a catch-all for any other unexpected errors.
144
+ app_logger.error(f"CRITICAL UNEXPECTED error during user creation for '{user_create_data.username}': {e}", exc_info=True)
145
  # session.rollback() is handled by get_session_context
146
  return None
147
 
148
+ # --- User Authentication (ensure this is robust too) ---
149
  def authenticate_user(username_in: str, password_in: str) -> Optional[User]:
150
+ # (Keep the robust version of authenticate_user from previous response)
 
 
 
151
  app_logger.info(f"Attempting to authenticate user: {username_in}")
152
  try:
153
  with get_session_context() as session:
 
158
  app_logger.warning(f"Authentication failed: User '{username_in}' not found.")
159
  return None
160
 
161
+ if hasattr(user, 'disabled') and user.disabled:
162
  app_logger.warning(f"Authentication failed: User '{username_in}' is disabled.")
163
  return None
164
 
 
166
  app_logger.warning(f"Authentication failed: Invalid password for user '{username_in}'.")
167
  return None
168
 
 
 
 
 
 
 
169
  app_logger.info(f"User '{user.username}' (ID: {user.id}) authenticated successfully.")
170
+ return user
171
 
172
  except Exception as e:
173
  app_logger.error(f"Database or unexpected error during authentication for '{username_in}': {e}", exc_info=True)