Spaces:
Runtime error
Runtime error
File size: 6,607 Bytes
b7ed26f 2e8a105 b7ed26f 2e8a105 b7ed26f 37c6354 b7ed26f 37c6354 b7ed26f 2e8a105 1771391 2e8a105 37c6354 b7ed26f 37c6354 b7ed26f 37c6354 2e8a105 37c6354 2e8a105 37c6354 b7ed26f 37c6354 b7ed26f 37c6354 b7ed26f 37c6354 b7ed26f 37c6354 b7ed26f 37c6354 b7ed26f 37c6354 2e8a105 37c6354 b7ed26f 37c6354 b7ed26f 2e8a105 b7ed26f 37c6354 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
"""
CRUD operations for Users (Admins, Staff).
"""
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
import bcrypt
from src import models
def hash_password(password: str) -> str:
"""Hashes a password using bcrypt."""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def get_user_by_username(db: Session, username: str) -> models.User | None:
"""
Retrieves a user by their username.
"""
return db.query(models.User).filter(models.User.username == username).first()
def get_user_by_id(db: Session, user_id: int) -> models.User | None:
"""
Retrieves a user by their ID.
"""
return db.query(models.User).filter(models.User.id == user_id).first()
def get_all_users(db: Session, skip: int = 0, limit: int = 100) -> list[models.User]:
"""
Retrieves all users with pagination.
"""
return db.query(models.User).offset(skip).limit(limit).all()
def get_user_by_tag_id(db: Session, tag_id: str) -> models.User | None:
"""
Retrieves a user by their TAG_ID.
"""
return db.query(models.User).filter(models.User.tag_id == tag_id).first()
def update_user_tag_id(db: Session, username: str, new_tag_id: str) -> models.User:
"""
Updates a user's tag_id.
"""
# Check if the new tag_id is already in use
existing_user_with_tag = get_user_by_tag_id(db, new_tag_id)
if existing_user_with_tag:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Tag ID '{new_tag_id}' is already assigned to another user."
)
# Get the user to update
user = get_user_by_username(db, username)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User '{username}' not found."
)
# Update the tag_id
user.tag_id = new_tag_id
db.commit()
db.refresh(user)
return user
def create_user(db: Session, user_data: models.UserCreate) -> models.User:
"""
Creates a new user account.
"""
# Check if username already exists
existing_user = get_user_by_username(db, user_data.username)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Username '{user_data.username}' is already registered."
)
# Check if tag_id is already in use (if provided)
if user_data.tag_id:
existing_tag_user = get_user_by_tag_id(db, user_data.tag_id)
if existing_tag_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Tag ID '{user_data.tag_id}' is already assigned to another user."
)
# Hash the password
hashed_password = hash_password(user_data.password)
# Create new user
db_user = models.User(
username=user_data.username,
name=user_data.name,
hashed_password=hashed_password,
role=user_data.role,
department=user_data.department,
tag_id=user_data.tag_id,
is_active=user_data.is_active if user_data.is_active is not None else True
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user(db: Session, user_id: int, user_update: models.UserCreate) -> models.User:
"""
Updates an existing user.
"""
db_user = get_user_by_id(db, user_id)
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found."
)
# Check if new username is already taken (if changed)
if user_update.username != db_user.username:
existing_user = get_user_by_username(db, user_update.username)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Username '{user_update.username}' is already taken."
)
# Check if new tag_id is already in use (if changed and provided)
if user_update.tag_id and user_update.tag_id != db_user.tag_id:
existing_tag_user = get_user_by_tag_id(db, user_update.tag_id)
if existing_tag_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Tag ID '{user_update.tag_id}' is already assigned to another user."
)
# Update fields
db_user.username = user_update.username
db_user.name = user_update.name
if user_update.password: # Only update password if provided
db_user.hashed_password = hash_password(user_update.password)
db_user.role = user_update.role
db_user.department = user_update.department
db_user.tag_id = user_update.tag_id
db_user.is_active = user_update.is_active if user_update.is_active is not None else True
db.commit()
db.refresh(db_user)
return db_user
def delete_user(db: Session, username_to_delete: str, current_admin: models.User) -> models.User:
"""
Deletes a user, ensuring an admin cannot delete themselves or the last admin.
"""
if username_to_delete == current_admin.username:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admins cannot delete their own account."
)
user_to_delete = get_user_by_username(db, username_to_delete)
if not user_to_delete:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User '{username_to_delete}' not found."
)
# Prevent deleting the last admin account
if user_to_delete.role == models.UserRole.ADMIN:
admin_count = db.query(models.User).filter(models.User.role == models.UserRole.ADMIN).count()
if admin_count <= 1:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot delete the last remaining admin account."
)
# Handle dependencies: set foreign keys to NULL where a user is referenced
db.query(models.ClearanceStatus).filter(models.ClearanceStatus.cleared_by == user_to_delete.id).update({"cleared_by": None})
# Check if PendingTagLink model exists before trying to delete
try:
db.query(models.PendingTagLink).filter(models.PendingTagLink.initiated_by_user_id == user_to_delete.id).delete()
except AttributeError:
# PendingTagLink model doesn't exist, skip this cleanup
pass
db.delete(user_to_delete)
db.commit()
return user_to_delete |