Spaces:
Runtime error
Runtime error
# ORM-based CRUD operations | |
# All functions are now synchronous and expect a SQLAlchemy Session | |
from sqlalchemy.orm import Session as SQLAlchemySessionType | |
from sqlalchemy import func, and_ # Add other SQLAlchemy functions as needed | |
from datetime import datetime, timedelta | |
import secrets | |
import bcrypt | |
from fastapi import HTTPException, status | |
from typing import List, Optional, Union # Added Union | |
from src import models # ORM models and Pydantic models | |
from src.database import initialize_student_clearance_statuses_orm # ORM based init | |
# --- Password Hashing --- | |
def hash_password(password: str) -> str: | |
"""Hashes a password using bcrypt.""" | |
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') | |
def verify_password(plain_password: str, hashed_password_str: str) -> bool: | |
"""Verifies a plain password against a hashed password.""" | |
if not plain_password or not hashed_password_str: | |
return False | |
return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password_str.encode('utf-8')) | |
# --- Tag Uniqueness Checks (ORM) --- | |
def is_tag_id_unique_for_student(db: SQLAlchemySessionType, tag_id: str, exclude_student_pk: Optional[int] = None) -> bool: | |
"""Checks if tag_id is unique among students, optionally excluding one by PK.""" | |
query = db.query(models.Student).filter(models.Student.tag_id == tag_id) | |
if exclude_student_pk: | |
query = query.filter(models.Student.id != exclude_student_pk) | |
return query.first() is None | |
def is_tag_id_unique_for_user(db: SQLAlchemySessionType, tag_id: str, exclude_user_pk: Optional[int] = None) -> bool: | |
"""Checks if tag_id is unique among users, optionally excluding one by PK.""" | |
query = db.query(models.User).filter(models.User.tag_id == tag_id) | |
if exclude_user_pk: | |
query = query.filter(models.User.id != exclude_user_pk) | |
return query.first() is None | |
def check_tag_id_globally_unique_for_target( | |
db: SQLAlchemySessionType, | |
tag_id: str, | |
target_type: models.TargetUserType, # Expecting the enum member | |
target_pk: Optional[int] = None | |
) -> None: | |
""" | |
Raises HTTPException if tag_id is not globally unique, excluding the target if provided. | |
""" | |
if target_type == models.TargetUserType.STUDENT: | |
if not is_tag_id_unique_for_student(db, tag_id, exclude_student_pk=target_pk): | |
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Tag ID '{tag_id}' is already assigned to another student.") | |
if not is_tag_id_unique_for_user(db, tag_id): | |
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Tag ID '{tag_id}' is already assigned to a user.") | |
elif target_type == models.TargetUserType.STAFF_ADMIN: | |
if not is_tag_id_unique_for_user(db, tag_id, exclude_user_pk=target_pk): | |
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Tag ID '{tag_id}' is already assigned to another user.") | |
if not is_tag_id_unique_for_student(db, tag_id): | |
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Tag ID '{tag_id}' is already assigned to a student.") | |
# --- Student CRUD (ORM) --- | |
def create_student(db: SQLAlchemySessionType, student_data: models.StudentCreate) -> models.Student: | |
existing_student_by_id = db.query(models.Student).filter(models.Student.student_id == student_data.student_id).first() | |
if existing_student_by_id: | |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Student ID already registered") | |
if student_data.email: | |
existing_student_by_email = db.query(models.Student).filter(models.Student.email == student_data.email).first() | |
if existing_student_by_email: | |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered") | |
if student_data.tag_id: | |
check_tag_id_globally_unique_for_target(db, student_data.tag_id, models.TargetUserType.STUDENT) | |
db_student = models.Student( | |
student_id=student_data.student_id, | |
name=student_data.name, | |
email=student_data.email, | |
department=student_data.department, | |
tag_id=student_data.tag_id, | |
created_at=datetime.utcnow(), | |
updated_at=datetime.utcnow() | |
) | |
db.add(db_student) | |
db.commit() | |
db.refresh(db_student) | |
initialize_student_clearance_statuses_orm(db, db_student.student_id) | |
return db_student | |
def get_all_students(db: SQLAlchemySessionType, skip: int = 0, limit: int = 100) -> List[models.Student]: | |
return db.query(models.Student).offset(skip).limit(limit).all() | |
def get_student_by_pk(db: SQLAlchemySessionType, student_pk: int) -> Optional[models.Student]: | |
return db.query(models.Student).filter(models.Student.id == student_pk).first() | |
def get_student_by_student_id(db: SQLAlchemySessionType, student_id: str) -> Optional[models.Student]: | |
return db.query(models.Student).filter(models.Student.student_id == student_id).first() | |
def get_student_by_tag_id(db: SQLAlchemySessionType, tag_id: str) -> Optional[models.Student]: | |
return db.query(models.Student).filter(models.Student.tag_id == tag_id).first() | |
def update_student_tag_id(db: SQLAlchemySessionType, student_id_str: str, new_tag_id: str) -> models.Student: | |
student = get_student_by_student_id(db, student_id_str) | |
if not student: | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Student not found") | |
if student.tag_id == new_tag_id: | |
return student | |
check_tag_id_globally_unique_for_target(db, new_tag_id, models.TargetUserType.STUDENT, target_pk=student.id) | |
student.tag_id = new_tag_id | |
student.updated_at = datetime.utcnow() | |
db.commit() | |
db.refresh(student) | |
return student | |
# --- User (Staff/Admin) CRUD (ORM) --- | |
def create_user(db: SQLAlchemySessionType, user_data: models.UserCreate) -> models.User: | |
existing_user = db.query(models.User).filter(models.User.username == user_data.username).first() | |
if existing_user: | |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered") | |
if user_data.tag_id: | |
check_tag_id_globally_unique_for_target(db, user_data.tag_id, models.TargetUserType.STAFF_ADMIN) | |
hashed_pass = hash_password(user_data.password) | |
db_user = models.User( | |
username=user_data.username, | |
hashed_password=hashed_pass, | |
role=user_data.role, # Pass the enum member directly | |
department=user_data.department, # Pass the enum member directly (or None) | |
tag_id=user_data.tag_id, | |
is_active=user_data.is_active if user_data.is_active is not None else True, | |
created_at=datetime.utcnow(), | |
updated_at=datetime.utcnow() | |
) | |
db.add(db_user) | |
db.commit() | |
db.refresh(db_user) | |
return db_user | |
def get_users(db: SQLAlchemySessionType, skip: int = 0, limit: int = 100) -> List[models.User]: | |
"""Retrieves all staff/admin users with pagination.""" | |
return db.query(models.User).offset(skip).limit(limit).all() | |
def get_user_by_pk(db: SQLAlchemySessionType, user_pk: int) -> Optional[models.User]: | |
return db.query(models.User).filter(models.User.id == user_pk).first() | |
def get_user_by_username(db: SQLAlchemySessionType, username: str) -> Optional[models.User]: | |
return db.query(models.User).filter(models.User.username == username).first() | |
def get_user_by_tag_id(db: SQLAlchemySessionType, tag_id: str) -> Optional[models.User]: | |
return db.query(models.User).filter(models.User.tag_id == tag_id, models.User.is_active == True).first() | |
def update_user_tag_id(db: SQLAlchemySessionType, username_str: str, new_tag_id: str) -> models.User: | |
user = get_user_by_username(db, username_str) | |
if not user: | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") | |
if not user.is_active: | |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot update tag for an inactive user.") | |
if user.tag_id == new_tag_id: | |
return user | |
check_tag_id_globally_unique_for_target(db, new_tag_id, models.TargetUserType.STAFF_ADMIN, target_pk=user.id) | |
user.tag_id = new_tag_id | |
user.updated_at = datetime.utcnow() | |
db.commit() | |
db.refresh(user) | |
return user | |
# --- Clearance Status CRUD (ORM) --- | |
def create_or_update_clearance_status( | |
db: SQLAlchemySessionType, | |
status_data: models.ClearanceStatusCreate, # Pydantic model with enum members | |
cleared_by_user_pk: Optional[int] = None | |
) -> models.ClearanceStatus: | |
existing_status = db.query(models.ClearanceStatus).filter( | |
models.ClearanceStatus.student_id == status_data.student_id, | |
models.ClearanceStatus.department == status_data.department # Comparing enum member from Pydantic to ORM field | |
).first() | |
current_time = datetime.utcnow() | |
if existing_status: | |
existing_status.status = status_data.status # Assigning enum member, SQLAlchemy handles value | |
existing_status.remarks = status_data.remarks | |
existing_status.cleared_by = cleared_by_user_pk | |
existing_status.updated_at = current_time | |
db_model = existing_status | |
else: | |
db_model = models.ClearanceStatus( | |
student_id=status_data.student_id, | |
department=status_data.department, # Assign Pydantic enum member | |
status=status_data.status, # Assign Pydantic enum member | |
remarks=status_data.remarks, | |
cleared_by=cleared_by_user_pk, | |
created_at=current_time, | |
updated_at=current_time | |
) | |
db.add(db_model) | |
db.commit() | |
db.refresh(db_model) | |
return db_model | |
def get_clearance_statuses_by_student_id(db: SQLAlchemySessionType, student_id_str: str) -> List[models.ClearanceStatus]: | |
return db.query(models.ClearanceStatus).filter(models.ClearanceStatus.student_id == student_id_str).all() | |
# --- Device CRUD (ORM) --- | |
def create_device(db: SQLAlchemySessionType, device_data: models.DeviceCreateAdmin) -> models.Device: | |
if device_data.device_id: | |
existing_device_hw_id = db.query(models.Device).filter(models.Device.device_id == device_data.device_id).first() | |
if existing_device_hw_id: | |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Device with hardware ID '{device_data.device_id}' already exists.") | |
api_key = secrets.token_urlsafe(32) | |
db_device = models.Device( | |
name=device_data.name, | |
description=device_data.description, | |
api_key=api_key, | |
device_id=device_data.device_id, | |
location=device_data.location, | |
is_active=True, | |
created_at=datetime.utcnow(), | |
updated_at=datetime.utcnow() | |
) | |
db.add(db_device) | |
db.commit() | |
db.refresh(db_device) | |
return db_device | |
def register_device_esp(db: SQLAlchemySessionType, device_data: models.DeviceRegister) -> models.Device: | |
api_key = secrets.token_urlsafe(32) | |
current_time = datetime.utcnow() | |
existing_device = db.query(models.Device).filter(models.Device.device_id == device_data.device_id).first() | |
if existing_device: | |
existing_device.location = device_data.location | |
existing_device.api_key = api_key | |
existing_device.last_seen = current_time | |
existing_device.updated_at = current_time | |
existing_device.is_active = True | |
db_model = existing_device | |
else: | |
db_model = models.Device( | |
device_id=device_data.device_id, | |
location=device_data.location, | |
api_key=api_key, | |
is_active=True, | |
last_seen=current_time, | |
created_at=current_time, | |
updated_at=current_time | |
) | |
db.add(db_model) | |
db.commit() | |
db.refresh(db_model) | |
return db_model | |
def get_device_by_pk(db: SQLAlchemySessionType, device_pk: int) -> Optional[models.Device]: | |
return db.query(models.Device).filter(models.Device.id == device_pk).first() | |
def get_device_by_api_key(db: SQLAlchemySessionType, api_key: str) -> Optional[models.Device]: | |
return db.query(models.Device).filter(models.Device.api_key == api_key).first() | |
def get_device_by_hardware_id(db: SQLAlchemySessionType, hardware_id: str) -> Optional[models.Device]: | |
return db.query(models.Device).filter(models.Device.device_id == hardware_id).first() | |
def get_all_devices(db: SQLAlchemySessionType, skip: int = 0, limit: int = 100) -> List[models.Device]: | |
return db.query(models.Device).offset(skip).limit(limit).all() | |
def update_device_last_seen(db: SQLAlchemySessionType, device_pk: int): | |
device = get_device_by_pk(db, device_pk) | |
if device: | |
device.last_seen = datetime.utcnow() | |
device.updated_at = datetime.utcnow() | |
db.commit() | |
# --- Device Log CRUD (ORM) --- | |
def create_device_log( | |
db: SQLAlchemySessionType, | |
device_pk: Optional[int], | |
action: str, | |
scanned_tag_id: Optional[str] = None, | |
user_type: Optional[str] = "unknown", # This remains a string as UserRole doesn't cover 'unknown' | |
actual_device_id_str: Optional[str] = None | |
): | |
db_log = models.DeviceLog( | |
device_fk_id=device_pk, | |
actual_device_id_str=actual_device_id_str, | |
tag_id_scanned=scanned_tag_id, | |
user_type=user_type, | |
action=action, | |
timestamp=datetime.utcnow() | |
) | |
db.add(db_log) | |
db.commit() | |
return db_log | |
# --- Pending Tag Link CRUD (ORM) --- | |
def create_pending_tag_link( | |
db: SQLAlchemySessionType, | |
device_pk: int, | |
target_user_type: models.TargetUserType, # Pydantic validated enum member | |
target_identifier: str, | |
initiated_by_user_pk: int, | |
expires_in_minutes: int = 5 | |
) -> models.PendingTagLink: | |
device = get_device_by_pk(db, device_pk) | |
if not device: | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Device with PK {device_pk} not found.") | |
if not device.is_active: | |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Device '{device.name or device.device_id}' is not active.") | |
if target_user_type == models.TargetUserType.STUDENT: | |
student_target = get_student_by_student_id(db, target_identifier) | |
if not student_target: | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Target student with ID '{target_identifier}' not found.") | |
if student_target.tag_id: | |
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Student '{target_identifier}' already has a tag linked.") | |
elif target_user_type == models.TargetUserType.STAFF_ADMIN: | |
user_target = get_user_by_username(db, target_identifier) | |
if not user_target: | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Target user with username '{target_identifier}' not found.") | |
if not user_target.is_active: | |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Target user '{target_identifier}' is not active.") | |
if user_target.tag_id: | |
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"User '{target_identifier}' already has a tag linked.") | |
existing_link = db.query(models.PendingTagLink).filter( | |
models.PendingTagLink.device_id_fk == device_pk, | |
models.PendingTagLink.expires_at > datetime.utcnow() | |
).first() | |
if existing_link: | |
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"Device '{device.name or device.device_id}' is already awaiting a tag scan.") | |
expires_at = datetime.utcnow() + timedelta(minutes=expires_in_minutes) | |
db_pending_link = models.PendingTagLink( | |
device_id_fk=device_pk, | |
target_user_type=target_user_type, # Assign Pydantic enum member | |
target_identifier=target_identifier, | |
initiated_by_user_id=initiated_by_user_pk, | |
expires_at=expires_at, | |
created_at=datetime.utcnow() | |
) | |
db.add(db_pending_link) | |
db.commit() | |
db.refresh(db_pending_link) | |
return db_pending_link | |
def get_active_pending_tag_link_by_device_pk(db: SQLAlchemySessionType, device_pk: int) -> Optional[models.PendingTagLink]: | |
return db.query(models.PendingTagLink).filter( | |
models.PendingTagLink.device_id_fk == device_pk, | |
models.PendingTagLink.expires_at > datetime.utcnow() | |
).order_by(models.PendingTagLink.created_at.desc()).first() | |
def delete_pending_tag_link(db: SQLAlchemySessionType, pending_link_pk: int): | |
link = db.query(models.PendingTagLink).filter(models.PendingTagLink.id == pending_link_pk).first() | |
if link: | |
db.delete(link) | |
db.commit() | |