""" CRUD operations for Users (Admins, Staff). """ from sqlalchemy.orm import Session from fastapi import HTTPException, status import bcrypt from src import models def hash_password(password: str) -> str: """Hashes a password using bcrypt.""" return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') def get_user_by_username(db: Session, username: str) -> models.User | None: """ Retrieves a user by their username. """ return db.query(models.User).filter(models.User.username == username).first() def get_user_by_id(db: Session, user_id: int) -> models.User | None: """ Retrieves a user by their ID. """ return db.query(models.User).filter(models.User.id == user_id).first() def get_all_users(db: Session, skip: int = 0, limit: int = 100) -> list[models.User]: """ Retrieves all users with pagination. """ return db.query(models.User).offset(skip).limit(limit).all() def get_user_by_tag_id(db: Session, tag_id: str) -> models.User | None: """ Retrieves a user by their TAG_ID. """ return db.query(models.User).filter(models.User.tag_id == tag_id).first() def update_user_tag_id(db: Session, username: str, new_tag_id: str) -> models.User: """ Updates a user's tag_id. """ # Check if the new tag_id is already in use existing_user_with_tag = get_user_by_tag_id(db, new_tag_id) if existing_user_with_tag: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Tag ID '{new_tag_id}' is already assigned to another user." ) # Get the user to update user = get_user_by_username(db, username) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User '{username}' not found." ) # Update the tag_id user.tag_id = new_tag_id db.commit() db.refresh(user) return user def create_user(db: Session, user_data: models.UserCreate) -> models.User: """ Creates a new user account. """ # Check if username already exists existing_user = get_user_by_username(db, user_data.username) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Username '{user_data.username}' is already registered." ) # Check if tag_id is already in use (if provided) if user_data.tag_id: existing_tag_user = get_user_by_tag_id(db, user_data.tag_id) if existing_tag_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Tag ID '{user_data.tag_id}' is already assigned to another user." ) # Hash the password hashed_password = hash_password(user_data.password) # Create new user db_user = models.User( username=user_data.username, name=user_data.name, hashed_password=hashed_password, role=user_data.role, department=user_data.department, tag_id=user_data.tag_id, is_active=user_data.is_active if user_data.is_active is not None else True ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def update_user(db: Session, user_id: int, user_update: models.UserCreate) -> models.User: """ Updates an existing user. """ db_user = get_user_by_id(db, user_id) if not db_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with ID {user_id} not found." ) # Check if new username is already taken (if changed) if user_update.username != db_user.username: existing_user = get_user_by_username(db, user_update.username) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Username '{user_update.username}' is already taken." ) # Check if new tag_id is already in use (if changed and provided) if user_update.tag_id and user_update.tag_id != db_user.tag_id: existing_tag_user = get_user_by_tag_id(db, user_update.tag_id) if existing_tag_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Tag ID '{user_update.tag_id}' is already assigned to another user." ) # Update fields db_user.username = user_update.username db_user.name = user_update.name if user_update.password: # Only update password if provided db_user.hashed_password = hash_password(user_update.password) db_user.role = user_update.role db_user.department = user_update.department db_user.tag_id = user_update.tag_id db_user.is_active = user_update.is_active if user_update.is_active is not None else True db.commit() db.refresh(db_user) return db_user def delete_user(db: Session, username_to_delete: str, current_admin: models.User) -> models.User: """ Deletes a user, ensuring an admin cannot delete themselves or the last admin. """ if username_to_delete == current_admin.username: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admins cannot delete their own account." ) user_to_delete = get_user_by_username(db, username_to_delete) if not user_to_delete: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User '{username_to_delete}' not found." ) # Prevent deleting the last admin account if user_to_delete.role == models.UserRole.ADMIN: admin_count = db.query(models.User).filter(models.User.role == models.UserRole.ADMIN).count() if admin_count <= 1: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot delete the last remaining admin account." ) # Handle dependencies: set foreign keys to NULL where a user is referenced db.query(models.ClearanceStatus).filter(models.ClearanceStatus.cleared_by == user_to_delete.id).update({"cleared_by": None}) # Check if PendingTagLink model exists before trying to delete try: db.query(models.PendingTagLink).filter(models.PendingTagLink.initiated_by_user_id == user_to_delete.id).delete() except AttributeError: # PendingTagLink model doesn't exist, skip this cleanup pass db.delete(user_to_delete) db.commit() return user_to_delete