File size: 1,961 Bytes
78dfc96 5fb2d40 78dfc96 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
from passlib.context import CryptContext
from sqlmodel import Session, select
from typing import Optional
from models.user import User, UserCreate
from models.db import get_session_context # Using context manager for direct use
from services.logger import app_logger
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def get_user(db: Session, username: str) -> Optional[User]:
statement = select(User).where(User.username == username)
user = db.exec(statement).first()
return user
def create_user_in_db(user_data: UserCreate) -> Optional[User]:
hashed_password = get_password_hash(user_data.password)
db_user = User(
username=user_data.username,
hashed_password=hashed_password,
email=user_data.email,
full_name=user_data.full_name
)
try:
with get_session_context() as db:
# Check if user already exists
existing_user = get_user(db, user_data.username)
if existing_user:
app_logger.warning(f"User {user_data.username} already exists.")
return None # Or raise an exception
db.add(db_user)
db.commit() # Commit is handled by context manager, but explicit commit for return
db.refresh(db_user)
return db_user
except Exception as e:
app_logger.error(f"Error creating user {user_data.username}: {e}")
return None
def authenticate_user(username: str, password: str) -> Optional[User]:
with get_session_context() as db:
user = get_user(db, username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user |