Spaces:
Runtime error
Runtime error
File size: 16,698 Bytes
96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c a09ee49 96b013c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
# ORM-based CRUD operations
# All functions are now synchronous and expect a SQLAlchemy Session
from sqlalchemy.orm import Session as SQLAlchemySessionType
from sqlalchemy import func, and_ # Add other SQLAlchemy functions as needed
from datetime import datetime, timedelta
import secrets
import bcrypt
from fastapi import HTTPException, status
from typing import List, Optional, Union # Added Union
from src import models # ORM models and Pydantic models
from src.database import initialize_student_clearance_statuses_orm # ORM based init
# --- Password Hashing ---
def hash_password(password: str) -> str:
"""Hashes a password using bcrypt."""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(plain_password: str, hashed_password_str: str) -> bool:
"""Verifies a plain password against a hashed password."""
if not plain_password or not hashed_password_str:
return False
return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password_str.encode('utf-8'))
# --- Tag Uniqueness Checks (ORM) ---
def is_tag_id_unique_for_student(db: SQLAlchemySessionType, tag_id: str, exclude_student_pk: Optional[int] = None) -> bool:
"""Checks if tag_id is unique among students, optionally excluding one by PK."""
query = db.query(models.Student).filter(models.Student.tag_id == tag_id)
if exclude_student_pk:
query = query.filter(models.Student.id != exclude_student_pk)
return query.first() is None
def is_tag_id_unique_for_user(db: SQLAlchemySessionType, tag_id: str, exclude_user_pk: Optional[int] = None) -> bool:
"""Checks if tag_id is unique among users, optionally excluding one by PK."""
query = db.query(models.User).filter(models.User.tag_id == tag_id)
if exclude_user_pk:
query = query.filter(models.User.id != exclude_user_pk)
return query.first() is None
def check_tag_id_globally_unique_for_target(
db: SQLAlchemySessionType,
tag_id: str,
target_type: models.TargetUserType, # Expecting the enum member
target_pk: Optional[int] = None
) -> None:
"""
Raises HTTPException if tag_id is not globally unique, excluding the target if provided.
"""
if target_type == models.TargetUserType.STUDENT:
if not is_tag_id_unique_for_student(db, tag_id, exclude_student_pk=target_pk):
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Tag ID '{tag_id}' is already assigned to another student.")
if not is_tag_id_unique_for_user(db, tag_id):
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Tag ID '{tag_id}' is already assigned to a user.")
elif target_type == models.TargetUserType.STAFF_ADMIN:
if not is_tag_id_unique_for_user(db, tag_id, exclude_user_pk=target_pk):
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Tag ID '{tag_id}' is already assigned to another user.")
if not is_tag_id_unique_for_student(db, tag_id):
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Tag ID '{tag_id}' is already assigned to a student.")
# --- Student CRUD (ORM) ---
def create_student(db: SQLAlchemySessionType, student_data: models.StudentCreate) -> models.Student:
existing_student_by_id = db.query(models.Student).filter(models.Student.student_id == student_data.student_id).first()
if existing_student_by_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Student ID already registered")
if student_data.email:
existing_student_by_email = db.query(models.Student).filter(models.Student.email == student_data.email).first()
if existing_student_by_email:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")
if student_data.tag_id:
check_tag_id_globally_unique_for_target(db, student_data.tag_id, models.TargetUserType.STUDENT)
db_student = models.Student(
student_id=student_data.student_id,
name=student_data.name,
email=student_data.email,
department=student_data.department,
tag_id=student_data.tag_id,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
db.add(db_student)
db.commit()
db.refresh(db_student)
initialize_student_clearance_statuses_orm(db, db_student.student_id)
return db_student
def get_all_students(db: SQLAlchemySessionType, skip: int = 0, limit: int = 100) -> List[models.Student]:
return db.query(models.Student).offset(skip).limit(limit).all()
def get_student_by_pk(db: SQLAlchemySessionType, student_pk: int) -> Optional[models.Student]:
return db.query(models.Student).filter(models.Student.id == student_pk).first()
def get_student_by_student_id(db: SQLAlchemySessionType, student_id: str) -> Optional[models.Student]:
return db.query(models.Student).filter(models.Student.student_id == student_id).first()
def get_student_by_tag_id(db: SQLAlchemySessionType, tag_id: str) -> Optional[models.Student]:
return db.query(models.Student).filter(models.Student.tag_id == tag_id).first()
def update_student_tag_id(db: SQLAlchemySessionType, student_id_str: str, new_tag_id: str) -> models.Student:
student = get_student_by_student_id(db, student_id_str)
if not student:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Student not found")
if student.tag_id == new_tag_id:
return student
check_tag_id_globally_unique_for_target(db, new_tag_id, models.TargetUserType.STUDENT, target_pk=student.id)
student.tag_id = new_tag_id
student.updated_at = datetime.utcnow()
db.commit()
db.refresh(student)
return student
# --- User (Staff/Admin) CRUD (ORM) ---
def create_user(db: SQLAlchemySessionType, user_data: models.UserCreate) -> models.User:
existing_user = db.query(models.User).filter(models.User.username == user_data.username).first()
if existing_user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")
if user_data.tag_id:
check_tag_id_globally_unique_for_target(db, user_data.tag_id, models.TargetUserType.STAFF_ADMIN)
hashed_pass = hash_password(user_data.password)
db_user = models.User(
username=user_data.username,
hashed_password=hashed_pass,
role=user_data.role, # Pass the enum member directly
department=user_data.department, # Pass the enum member directly (or None)
tag_id=user_data.tag_id,
is_active=user_data.is_active if user_data.is_active is not None else True,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_users(db: SQLAlchemySessionType, skip: int = 0, limit: int = 100) -> List[models.User]:
"""Retrieves all staff/admin users with pagination."""
return db.query(models.User).offset(skip).limit(limit).all()
def get_user_by_pk(db: SQLAlchemySessionType, user_pk: int) -> Optional[models.User]:
return db.query(models.User).filter(models.User.id == user_pk).first()
def get_user_by_username(db: SQLAlchemySessionType, username: str) -> Optional[models.User]:
return db.query(models.User).filter(models.User.username == username).first()
def get_user_by_tag_id(db: SQLAlchemySessionType, tag_id: str) -> Optional[models.User]:
return db.query(models.User).filter(models.User.tag_id == tag_id, models.User.is_active == True).first()
def update_user_tag_id(db: SQLAlchemySessionType, username_str: str, new_tag_id: str) -> models.User:
user = get_user_by_username(db, username_str)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
if not user.is_active:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot update tag for an inactive user.")
if user.tag_id == new_tag_id:
return user
check_tag_id_globally_unique_for_target(db, new_tag_id, models.TargetUserType.STAFF_ADMIN, target_pk=user.id)
user.tag_id = new_tag_id
user.updated_at = datetime.utcnow()
db.commit()
db.refresh(user)
return user
# --- Clearance Status CRUD (ORM) ---
def create_or_update_clearance_status(
db: SQLAlchemySessionType,
status_data: models.ClearanceStatusCreate, # Pydantic model with enum members
cleared_by_user_pk: Optional[int] = None
) -> models.ClearanceStatus:
existing_status = db.query(models.ClearanceStatus).filter(
models.ClearanceStatus.student_id == status_data.student_id,
models.ClearanceStatus.department == status_data.department # Comparing enum member from Pydantic to ORM field
).first()
current_time = datetime.utcnow()
if existing_status:
existing_status.status = status_data.status # Assigning enum member, SQLAlchemy handles value
existing_status.remarks = status_data.remarks
existing_status.cleared_by = cleared_by_user_pk
existing_status.updated_at = current_time
db_model = existing_status
else:
db_model = models.ClearanceStatus(
student_id=status_data.student_id,
department=status_data.department, # Assign Pydantic enum member
status=status_data.status, # Assign Pydantic enum member
remarks=status_data.remarks,
cleared_by=cleared_by_user_pk,
created_at=current_time,
updated_at=current_time
)
db.add(db_model)
db.commit()
db.refresh(db_model)
return db_model
def get_clearance_statuses_by_student_id(db: SQLAlchemySessionType, student_id_str: str) -> List[models.ClearanceStatus]:
return db.query(models.ClearanceStatus).filter(models.ClearanceStatus.student_id == student_id_str).all()
# --- Device CRUD (ORM) ---
def create_device(db: SQLAlchemySessionType, device_data: models.DeviceCreateAdmin) -> models.Device:
if device_data.device_id:
existing_device_hw_id = db.query(models.Device).filter(models.Device.device_id == device_data.device_id).first()
if existing_device_hw_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Device with hardware ID '{device_data.device_id}' already exists.")
api_key = secrets.token_urlsafe(32)
db_device = models.Device(
name=device_data.name,
description=device_data.description,
api_key=api_key,
device_id=device_data.device_id,
location=device_data.location,
is_active=True,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
db.add(db_device)
db.commit()
db.refresh(db_device)
return db_device
def register_device_esp(db: SQLAlchemySessionType, device_data: models.DeviceRegister) -> models.Device:
api_key = secrets.token_urlsafe(32)
current_time = datetime.utcnow()
existing_device = db.query(models.Device).filter(models.Device.device_id == device_data.device_id).first()
if existing_device:
existing_device.location = device_data.location
existing_device.api_key = api_key
existing_device.last_seen = current_time
existing_device.updated_at = current_time
existing_device.is_active = True
db_model = existing_device
else:
db_model = models.Device(
device_id=device_data.device_id,
location=device_data.location,
api_key=api_key,
is_active=True,
last_seen=current_time,
created_at=current_time,
updated_at=current_time
)
db.add(db_model)
db.commit()
db.refresh(db_model)
return db_model
def get_device_by_pk(db: SQLAlchemySessionType, device_pk: int) -> Optional[models.Device]:
return db.query(models.Device).filter(models.Device.id == device_pk).first()
def get_device_by_api_key(db: SQLAlchemySessionType, api_key: str) -> Optional[models.Device]:
return db.query(models.Device).filter(models.Device.api_key == api_key).first()
def get_device_by_hardware_id(db: SQLAlchemySessionType, hardware_id: str) -> Optional[models.Device]:
return db.query(models.Device).filter(models.Device.device_id == hardware_id).first()
def get_all_devices(db: SQLAlchemySessionType, skip: int = 0, limit: int = 100) -> List[models.Device]:
return db.query(models.Device).offset(skip).limit(limit).all()
def update_device_last_seen(db: SQLAlchemySessionType, device_pk: int):
device = get_device_by_pk(db, device_pk)
if device:
device.last_seen = datetime.utcnow()
device.updated_at = datetime.utcnow()
db.commit()
# --- Device Log CRUD (ORM) ---
def create_device_log(
db: SQLAlchemySessionType,
device_pk: Optional[int],
action: str,
scanned_tag_id: Optional[str] = None,
user_type: Optional[str] = "unknown", # This remains a string as UserRole doesn't cover 'unknown'
actual_device_id_str: Optional[str] = None
):
db_log = models.DeviceLog(
device_fk_id=device_pk,
actual_device_id_str=actual_device_id_str,
tag_id_scanned=scanned_tag_id,
user_type=user_type,
action=action,
timestamp=datetime.utcnow()
)
db.add(db_log)
db.commit()
return db_log
# --- Pending Tag Link CRUD (ORM) ---
def create_pending_tag_link(
db: SQLAlchemySessionType,
device_pk: int,
target_user_type: models.TargetUserType, # Pydantic validated enum member
target_identifier: str,
initiated_by_user_pk: int,
expires_in_minutes: int = 5
) -> models.PendingTagLink:
device = get_device_by_pk(db, device_pk)
if not device:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Device with PK {device_pk} not found.")
if not device.is_active:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Device '{device.name or device.device_id}' is not active.")
if target_user_type == models.TargetUserType.STUDENT:
student_target = get_student_by_student_id(db, target_identifier)
if not student_target:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Target student with ID '{target_identifier}' not found.")
if student_target.tag_id:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Student '{target_identifier}' already has a tag linked.")
elif target_user_type == models.TargetUserType.STAFF_ADMIN:
user_target = get_user_by_username(db, target_identifier)
if not user_target:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Target user with username '{target_identifier}' not found.")
if not user_target.is_active:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Target user '{target_identifier}' is not active.")
if user_target.tag_id:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"User '{target_identifier}' already has a tag linked.")
existing_link = db.query(models.PendingTagLink).filter(
models.PendingTagLink.device_id_fk == device_pk,
models.PendingTagLink.expires_at > datetime.utcnow()
).first()
if existing_link:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Device '{device.name or device.device_id}' is already awaiting a tag scan.")
expires_at = datetime.utcnow() + timedelta(minutes=expires_in_minutes)
db_pending_link = models.PendingTagLink(
device_id_fk=device_pk,
target_user_type=target_user_type, # Assign Pydantic enum member
target_identifier=target_identifier,
initiated_by_user_id=initiated_by_user_pk,
expires_at=expires_at,
created_at=datetime.utcnow()
)
db.add(db_pending_link)
db.commit()
db.refresh(db_pending_link)
return db_pending_link
def get_active_pending_tag_link_by_device_pk(db: SQLAlchemySessionType, device_pk: int) -> Optional[models.PendingTagLink]:
return db.query(models.PendingTagLink).filter(
models.PendingTagLink.device_id_fk == device_pk,
models.PendingTagLink.expires_at > datetime.utcnow()
).order_by(models.PendingTagLink.created_at.desc()).first()
def delete_pending_tag_link(db: SQLAlchemySessionType, pending_link_pk: int):
link = db.query(models.PendingTagLink).filter(models.PendingTagLink.id == pending_link_pk).first()
if link:
db.delete(link)
db.commit()
|