MedQA / services /auth.py
mgbam's picture
Update services/auth.py
78dfc96 verified
raw
history blame
1.96 kB
from passlib.context import CryptContext
from sqlmodel import Session, select
from typing import Optional
from models.user import User, UserCreate
from models.db import get_session_context # Using context manager for direct use
from services.logger import app_logger
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def get_user(db: Session, username: str) -> Optional[User]:
statement = select(User).where(User.username == username)
user = db.exec(statement).first()
return user
def create_user_in_db(user_data: UserCreate) -> Optional[User]:
hashed_password = get_password_hash(user_data.password)
db_user = User(
username=user_data.username,
hashed_password=hashed_password,
email=user_data.email,
full_name=user_data.full_name
)
try:
with get_session_context() as db:
# Check if user already exists
existing_user = get_user(db, user_data.username)
if existing_user:
app_logger.warning(f"User {user_data.username} already exists.")
return None # Or raise an exception
db.add(db_user)
db.commit() # Commit is handled by context manager, but explicit commit for return
db.refresh(db_user)
return db_user
except Exception as e:
app_logger.error(f"Error creating user {user_data.username}: {e}")
return None
def authenticate_user(username: str, password: str) -> Optional[User]:
with get_session_context() as db:
user = get_user(db, username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user