mgbam commited on
Commit
779e761
·
verified ·
1 Parent(s): e690628

Update services/auth.py

Browse files
Files changed (1) hide show
  1. services/auth.py +97 -108
services/auth.py CHANGED
@@ -1,132 +1,121 @@
1
- from passlib.context import CryptContext
2
- from sqlmodel import Session, select
3
  from typing import Optional
 
 
4
 
5
- from models.user import User, UserCreate # Direct import
6
- from models.db import get_session_context
7
- from services.logger import app_logger
8
 
9
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 
 
 
 
 
 
 
 
 
 
 
 
10
 
11
  def verify_password(plain_password: str, hashed_password: str) -> bool:
12
- return pwd_context.verify(plain_password, hashed_password)
13
-
14
- def get_password_hash(password: str) -> str:
15
- return pwd_context.hash(password)
16
-
17
- def get_user_by_username(db: Session, username: str) -> Optional[User]:
18
- """Fetches a user by username."""
19
- statement = select(User).where(User.username == username)
20
- user = db.exec(statement).first()
21
- return user
22
 
23
- def create_user_in_db(user_data: UserCreate) -> Optional[User]:
 
24
  """
25
- Creates a new user in the database.
26
- Returns the User object with basic attributes loaded, suitable for immediate
27
- use even after the session creating it has closed.
28
  """
29
- hashed_password = get_password_hash(user_data.password)
30
-
31
- # Create the Python object instance.
32
- # Initialize relationships like chat_sessions to an empty list if appropriate,
33
- # as they won't be populated from the DB for a new user yet.
34
- new_user_instance = User(
35
- username=user_data.username,
36
- hashed_password=hashed_password,
37
- email=user_data.email,
38
- full_name=user_data.full_name,
39
- chat_sessions=[] # Good practice to initialize for new instances
40
- )
41
-
42
- created_user_id: Optional[int] = None
43
-
44
  try:
45
- with get_session_context() as db:
46
- # Check if user already exists
47
- existing_user = get_user_by_username(db, user_data.username)
48
- if existing_user:
49
- app_logger.warning(f"User {user_data.username} already exists.")
50
- return None # User already exists
51
-
52
- db.add(new_user_instance)
53
- db.commit() # Commit to save the user and allow DB to generate ID
54
- db.refresh(new_user_instance) # Refresh to get all attributes, especially the DB-generated ID
55
-
56
- # Store the ID so we can re-fetch a "clean" instance if needed,
57
- # or to ensure we are working with the persisted state.
58
- created_user_id = new_user_instance.id
59
-
60
- # To ensure the returned object is safe for use after this session closes,
61
- # especially for attributes that might be displayed immediately,
62
- # we can load them explicitly or re-fetch.
63
- # For simple attributes, refresh should be enough.
64
- # We will return a freshly fetched instance to be absolutely sure.
65
 
66
- except Exception as e:
67
- app_logger.error(f"Error during database operation for user {user_data.username}: {e}")
68
- return None # DB operation failed
 
 
 
 
 
 
 
 
 
 
69
 
70
- # If user was created, fetch a fresh instance to return.
71
- # This new instance will be detached but will have its basic attributes loaded.
72
- if created_user_id is not None:
73
- try:
74
- with get_session_context() as db:
75
- # .get() is efficient for fetching by primary key
76
- final_user_to_return = db.get(User, created_user_id)
77
- if final_user_to_return:
78
- # "Touch" attributes that will be used immediately after this function returns
79
- # to ensure they are loaded before this (new) session closes.
80
- _ = final_user_to_return.id
81
- _ = final_user_to_return.username
82
- _ = final_user_to_return.email
83
- _ = final_user_to_return.full_name
84
- # Do NOT try to access final_user_to_return.chat_sessions here unless you
85
- # intend to load them, which might be a heavy operation.
86
- # They will be lazy-loaded when accessed within an active session later.
87
- return final_user_to_return
88
- except Exception as e:
89
- app_logger.error(f"Error re-fetching created user {created_user_id}: {e}")
90
- return None # Failed to re-fetch
91
 
92
- return None # Fallback if user was not created or couldn't be re-fetched
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93
 
 
 
 
 
94
 
95
- def authenticate_user(username: str, password: str) -> Optional[User]:
 
96
  """
97
- Authenticates a user.
98
- Returns the User object with basic attributes loaded.
99
  """
 
100
  try:
101
- with get_session_context() as db:
102
- user = get_user_by_username(db, username)
 
 
103
  if not user:
104
- return None # User not found
105
- if not verify_password(password, user.hashed_password):
106
- return None # Invalid password
107
 
108
- # User is authenticated. Before returning the user object,
109
- # ensure any attributes that will be immediately accessed by the caller
110
- # (e.g., when setting st.session_state.authenticated_user) are loaded.
111
- # This helps avoid DetachedInstanceError if Streamlit or other parts
112
- # of the app try to access these attributes after this session closes.
 
 
113
 
114
- user_id_to_return = user.id # Get ID to re-fetch
 
 
 
 
115
 
116
- # Re-fetch the user in a new session context to return a "clean" detached object
117
- # This is a robust way to prevent issues with detached instances being used across session boundaries.
118
- if user_id_to_return is not None:
119
- with get_session_context() as db:
120
- authenticated_user = db.get(User, user_id_to_return)
121
- if authenticated_user:
122
- # "Touch" attributes
123
- _ = authenticated_user.id
124
- _ = authenticated_user.username
125
- _ = authenticated_user.email
126
- # Do not touch relationships unless intended for eager load here.
127
- return authenticated_user
128
- return None # Should not happen if authenticated
129
 
130
  except Exception as e:
131
- app_logger.error(f"Error during authentication for user {username}: {e}")
132
  return None
 
1
+ # /home/user/app/services/auth.py
 
2
  from typing import Optional
3
+ import bcrypt # For password hashing
4
+ from sqlmodel import select
5
 
6
+ from models import User, UserCreate, get_session_context # Your SQLModel User and session
7
+ from services.logger import app_logger # Your application logger
 
8
 
9
+ # --- Password Hashing Utilities ---
10
+ def hash_password(password: str) -> str:
11
+ """Hashes a plain-text password using bcrypt."""
12
+ try:
13
+ password_bytes = password.encode('utf-8')
14
+ salt = bcrypt.gensalt()
15
+ hashed_bytes = bcrypt.hashpw(password_bytes, salt)
16
+ return hashed_bytes.decode('utf-8')
17
+ except Exception as e:
18
+ app_logger.error(f"Error during password hashing: {e}", exc_info=True)
19
+ # Re-raise or return a specific error indicator if preferred,
20
+ # but for security, failing to hash should prevent account creation.
21
+ raise ValueError("Password hashing failed") from e
22
 
23
  def verify_password(plain_password: str, hashed_password: str) -> bool:
24
+ """Verifies a plain-text password against a stored bcrypt hash."""
25
+ try:
26
+ plain_password_bytes = plain_password.encode('utf-8')
27
+ hashed_password_bytes = hashed_password.encode('utf-8')
28
+ return bcrypt.checkpw(plain_password_bytes, hashed_password_bytes)
29
+ except Exception as e:
30
+ app_logger.error(f"Error during password verification: {e}", exc_info=True)
31
+ # If bcrypt itself is broken, verification will fail.
32
+ return False
 
33
 
34
+ # --- User Creation ---
35
+ def create_user_in_db(user_create_data: UserCreate) -> Optional[User]:
36
  """
37
+ Creates a new user in the database with a hashed password.
38
+ Returns the User object if successful, None otherwise.
 
39
  """
40
+ app_logger.info(f"Attempting to create user: {user_create_data.username}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  try:
42
+ with get_session_context() as session:
43
+ # Check if username already exists
44
+ statement_username = select(User).where(User.username == user_create_data.username)
45
+ existing_user_by_username = session.exec(statement_username).first()
46
+ if existing_user_by_username:
47
+ app_logger.warning(f"Signup failed: Username '{user_create_data.username}' already exists.")
48
+ return None
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
+ # Check if email already exists (if provided and should be unique)
51
+ if user_create_data.email:
52
+ statement_email = select(User).where(User.email == user_create_data.email)
53
+ existing_user_by_email = session.exec(statement_email).first()
54
+ if existing_user_by_email:
55
+ app_logger.warning(f"Signup failed: Email '{user_create_data.email}' already exists.")
56
+ return None
57
+
58
+ try:
59
+ hashed_pw = hash_password(user_create_data.password)
60
+ except ValueError: # Catch hashing specific error
61
+ app_logger.error(f"Could not hash password for user {user_create_data.username} during signup.")
62
+ return None
63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
 
65
+ # Create the new user object
66
+ db_user = User(
67
+ username=user_create_data.username,
68
+ email=user_create_data.email,
69
+ full_name=user_create_data.full_name, # Assuming UserCreate and User models have this
70
+ disabled=user_create_data.disabled if hasattr(user_create_data, 'disabled') else False,
71
+ hashed_password=hashed_pw
72
+ )
73
+
74
+ session.add(db_user)
75
+ # The commit is handled by the get_session_context manager upon successful exit.
76
+ # We need to refresh to get DB-generated values like ID before the session closes.
77
+ session.refresh(db_user)
78
+ app_logger.info(f"User '{db_user.username}' (ID: {db_user.id}) created successfully in DB.")
79
+ return db_user
80
 
81
+ except Exception as e:
82
+ app_logger.error(f"Database error during user creation for '{user_create_data.username}': {e}", exc_info=True)
83
+ # session.rollback() is handled by get_session_context
84
+ return None
85
 
86
+ # --- User Authentication ---
87
+ def authenticate_user(username_in: str, password_in: str) -> Optional[User]:
88
  """
89
+ Authenticates a user by username and password.
90
+ Returns the User object if authentication is successful, None otherwise.
91
  """
92
+ app_logger.info(f"Attempting to authenticate user: {username_in}")
93
  try:
94
+ with get_session_context() as session:
95
+ statement = select(User).where(User.username == username_in)
96
+ user = session.exec(statement).first()
97
+
98
  if not user:
99
+ app_logger.warning(f"Authentication failed: User '{username_in}' not found.")
100
+ return None
 
101
 
102
+ if user.disabled: # Assuming your User model has a 'disabled' field
103
+ app_logger.warning(f"Authentication failed: User '{username_in}' is disabled.")
104
+ return None
105
+
106
+ if not verify_password(password_in, user.hashed_password):
107
+ app_logger.warning(f"Authentication failed: Invalid password for user '{username_in}'.")
108
+ return None
109
 
110
+ # If login is successful, you might want to update a last_login timestamp here
111
+ # user.last_login_at = datetime.utcnow()
112
+ # session.add(user)
113
+ # session.commit() # (handled by context manager)
114
+ # session.refresh(user)
115
 
116
+ app_logger.info(f"User '{user.username}' (ID: {user.id}) authenticated successfully.")
117
+ return user # Return the ORM object
 
 
 
 
 
 
 
 
 
 
 
118
 
119
  except Exception as e:
120
+ app_logger.error(f"Database or unexpected error during authentication for '{username_in}': {e}", exc_info=True)
121
  return None