|
from passlib.context import CryptContext |
|
from sqlmodel import Session, select |
|
from typing import Optional |
|
|
|
from models.user import User, UserCreate |
|
from models.db import get_session_context |
|
from services.logger import app_logger |
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool: |
|
return pwd_context.verify(plain_password, hashed_password) |
|
|
|
def get_password_hash(password: str) -> str: |
|
return pwd_context.hash(password) |
|
|
|
def get_user_by_username(db: Session, username: str) -> Optional[User]: |
|
"""Fetches a user by username.""" |
|
statement = select(User).where(User.username == username) |
|
user = db.exec(statement).first() |
|
return user |
|
|
|
def create_user_in_db(user_data: UserCreate) -> Optional[User]: |
|
""" |
|
Creates a new user in the database. |
|
Returns the User object with basic attributes loaded, suitable for immediate |
|
use even after the session creating it has closed. |
|
""" |
|
hashed_password = get_password_hash(user_data.password) |
|
|
|
|
|
|
|
|
|
new_user_instance = User( |
|
username=user_data.username, |
|
hashed_password=hashed_password, |
|
email=user_data.email, |
|
full_name=user_data.full_name, |
|
chat_sessions=[] |
|
) |
|
|
|
created_user_id: Optional[int] = None |
|
|
|
try: |
|
with get_session_context() as db: |
|
|
|
existing_user = get_user_by_username(db, user_data.username) |
|
if existing_user: |
|
app_logger.warning(f"User {user_data.username} already exists.") |
|
return None |
|
|
|
db.add(new_user_instance) |
|
db.commit() |
|
db.refresh(new_user_instance) |
|
|
|
|
|
|
|
created_user_id = new_user_instance.id |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
app_logger.error(f"Error during database operation for user {user_data.username}: {e}") |
|
return None |
|
|
|
|
|
|
|
if created_user_id is not None: |
|
try: |
|
with get_session_context() as db: |
|
|
|
final_user_to_return = db.get(User, created_user_id) |
|
if final_user_to_return: |
|
|
|
|
|
_ = final_user_to_return.id |
|
_ = final_user_to_return.username |
|
_ = final_user_to_return.email |
|
_ = final_user_to_return.full_name |
|
|
|
|
|
|
|
return final_user_to_return |
|
except Exception as e: |
|
app_logger.error(f"Error re-fetching created user {created_user_id}: {e}") |
|
return None |
|
|
|
return None |
|
|
|
|
|
def authenticate_user(username: str, password: str) -> Optional[User]: |
|
""" |
|
Authenticates a user. |
|
Returns the User object with basic attributes loaded. |
|
""" |
|
try: |
|
with get_session_context() as db: |
|
user = get_user_by_username(db, username) |
|
if not user: |
|
return None |
|
if not verify_password(password, user.hashed_password): |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
user_id_to_return = user.id |
|
|
|
|
|
|
|
if user_id_to_return is not None: |
|
with get_session_context() as db: |
|
authenticated_user = db.get(User, user_id_to_return) |
|
if authenticated_user: |
|
|
|
_ = authenticated_user.id |
|
_ = authenticated_user.username |
|
_ = authenticated_user.email |
|
|
|
return authenticated_user |
|
return None |
|
|
|
except Exception as e: |
|
app_logger.error(f"Error during authentication for user {username}: {e}") |
|
return None |