|
from passlib.context import CryptContext |
|
from sqlmodel import Session, select |
|
from typing import Optional |
|
|
|
from models.user import User, UserCreate |
|
from models.db import get_session_context |
|
from services.logger import app_logger |
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool: |
|
return pwd_context.verify(plain_password, hashed_password) |
|
|
|
def get_password_hash(password: str) -> str: |
|
return pwd_context.hash(password) |
|
|
|
def get_user(db: Session, username: str) -> Optional[User]: |
|
statement = select(User).where(User.username == username) |
|
user = db.exec(statement).first() |
|
return user |
|
|
|
def create_user_in_db(user_data: UserCreate) -> Optional[User]: |
|
hashed_password = get_password_hash(user_data.password) |
|
db_user = User( |
|
username=user_data.username, |
|
hashed_password=hashed_password, |
|
email=user_data.email, |
|
full_name=user_data.full_name |
|
) |
|
try: |
|
with get_session_context() as db: |
|
|
|
existing_user = get_user(db, user_data.username) |
|
if existing_user: |
|
app_logger.warning(f"User {user_data.username} already exists.") |
|
return None |
|
|
|
db.add(db_user) |
|
db.commit() |
|
db.refresh(db_user) |
|
return db_user |
|
except Exception as e: |
|
app_logger.error(f"Error creating user {user_data.username}: {e}") |
|
return None |
|
|
|
def authenticate_user(username: str, password: str) -> Optional[User]: |
|
with get_session_context() as db: |
|
user = get_user(db, username) |
|
if not user: |
|
return None |
|
if not verify_password(password, user.hashed_password): |
|
return None |
|
return user |