File size: 6,607 Bytes
b7ed26f
 
 
 
 
2e8a105
b7ed26f
 
2e8a105
 
 
 
b7ed26f
 
37c6354
 
 
b7ed26f
 
37c6354
 
 
 
 
 
 
 
 
 
 
b7ed26f
2e8a105
1771391
 
 
 
 
2e8a105
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37c6354
 
 
 
 
 
 
b7ed26f
37c6354
 
b7ed26f
37c6354
2e8a105
 
 
 
 
 
 
 
 
37c6354
2e8a105
37c6354
 
 
 
 
b7ed26f
37c6354
 
 
 
b7ed26f
37c6354
 
b7ed26f
37c6354
 
 
b7ed26f
37c6354
 
 
 
 
b7ed26f
37c6354
 
 
b7ed26f
37c6354
2e8a105
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37c6354
 
 
 
 
 
 
 
 
 
b7ed26f
 
37c6354
b7ed26f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2e8a105
 
 
 
 
 
 
b7ed26f
 
 
 
37c6354
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""
CRUD operations for Users (Admins, Staff).
"""
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
import bcrypt

from src import models

def hash_password(password: str) -> str:
    """Hashes a password using bcrypt."""
    return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')

def get_user_by_username(db: Session, username: str) -> models.User | None:
    """
    Retrieves a user by their username.
    """
    return db.query(models.User).filter(models.User.username == username).first()

def get_user_by_id(db: Session, user_id: int) -> models.User | None:
    """
    Retrieves a user by their ID.
    """
    return db.query(models.User).filter(models.User.id == user_id).first()

def get_all_users(db: Session, skip: int = 0, limit: int = 100) -> list[models.User]:
    """
    Retrieves all users with pagination.
    """
    return db.query(models.User).offset(skip).limit(limit).all()

def get_user_by_tag_id(db: Session, tag_id: str) -> models.User | None:
    """
    Retrieves a user by their TAG_ID.
    """
    return db.query(models.User).filter(models.User.tag_id == tag_id).first()

def update_user_tag_id(db: Session, username: str, new_tag_id: str) -> models.User:
    """
    Updates a user's tag_id.
    """
    # Check if the new tag_id is already in use
    existing_user_with_tag = get_user_by_tag_id(db, new_tag_id)
    if existing_user_with_tag:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"Tag ID '{new_tag_id}' is already assigned to another user."
        )
    
    # Get the user to update
    user = get_user_by_username(db, username)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User '{username}' not found."
        )
    
    # Update the tag_id
    user.tag_id = new_tag_id
    db.commit()
    db.refresh(user)
    
    return user

def create_user(db: Session, user_data: models.UserCreate) -> models.User:
    """
    Creates a new user account.
    """
    # Check if username already exists
    existing_user = get_user_by_username(db, user_data.username)
    if existing_user:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"Username '{user_data.username}' is already registered."
        )

    # Check if tag_id is already in use (if provided)
    if user_data.tag_id:
        existing_tag_user = get_user_by_tag_id(db, user_data.tag_id)
        if existing_tag_user:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=f"Tag ID '{user_data.tag_id}' is already assigned to another user."
            )

    # Hash the password
    hashed_password = hash_password(user_data.password)
    
    # Create new user
    db_user = models.User(
        username=user_data.username,
        name=user_data.name,
        hashed_password=hashed_password,
        role=user_data.role,
        department=user_data.department,
        tag_id=user_data.tag_id,
        is_active=user_data.is_active if user_data.is_active is not None else True
    )
    
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    
    return db_user

def update_user(db: Session, user_id: int, user_update: models.UserCreate) -> models.User:
    """
    Updates an existing user.
    """
    db_user = get_user_by_id(db, user_id)
    if not db_user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with ID {user_id} not found."
        )

    # Check if new username is already taken (if changed)
    if user_update.username != db_user.username:
        existing_user = get_user_by_username(db, user_update.username)
        if existing_user:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=f"Username '{user_update.username}' is already taken."
            )

    # Check if new tag_id is already in use (if changed and provided)
    if user_update.tag_id and user_update.tag_id != db_user.tag_id:
        existing_tag_user = get_user_by_tag_id(db, user_update.tag_id)
        if existing_tag_user:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=f"Tag ID '{user_update.tag_id}' is already assigned to another user."
            )

    # Update fields
    db_user.username = user_update.username
    db_user.name = user_update.name
    if user_update.password:  # Only update password if provided
        db_user.hashed_password = hash_password(user_update.password)
    db_user.role = user_update.role
    db_user.department = user_update.department
    db_user.tag_id = user_update.tag_id
    db_user.is_active = user_update.is_active if user_update.is_active is not None else True

    db.commit()
    db.refresh(db_user)
    
    return db_user

def delete_user(db: Session, username_to_delete: str, current_admin: models.User) -> models.User:
    """
    Deletes a user, ensuring an admin cannot delete themselves or the last admin.
    """
    if username_to_delete == current_admin.username:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Admins cannot delete their own account."
        )

    user_to_delete = get_user_by_username(db, username_to_delete)
    if not user_to_delete:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User '{username_to_delete}' not found."
        )

    # Prevent deleting the last admin account
    if user_to_delete.role == models.UserRole.ADMIN:
        admin_count = db.query(models.User).filter(models.User.role == models.UserRole.ADMIN).count()
        if admin_count <= 1:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Cannot delete the last remaining admin account."
            )

    # Handle dependencies: set foreign keys to NULL where a user is referenced
    db.query(models.ClearanceStatus).filter(models.ClearanceStatus.cleared_by == user_to_delete.id).update({"cleared_by": None})
    
    # Check if PendingTagLink model exists before trying to delete
    try:
        db.query(models.PendingTagLink).filter(models.PendingTagLink.initiated_by_user_id == user_to_delete.id).delete()
    except AttributeError:
        # PendingTagLink model doesn't exist, skip this cleanup
        pass

    db.delete(user_to_delete)
    db.commit()
    
    return user_to_delete