Testys commited on
Commit
71a3948
·
1 Parent(s): 83767ea

FEAT: Backend code completed done

Browse files
src/auth.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import Depends, HTTPException, status
2
+ from fastapi.security import OAuth2PasswordBearer, APIKeyHeader
3
+ from jose import JWTError, jwt
4
+ from passlib.context import CryptContext
5
+ from datetime import datetime, timedelta, timezone
6
+ from typing import Optional
7
+ from sqlmodel import Session, select
8
+
9
+ from src.database import get_session
10
+ from src.models import User
11
+ from src.config import settings
12
+
13
+ # --- Security Configuration ---
14
+
15
+ # Password hashing context using bcrypt
16
+ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
17
+
18
+ # OAuth2 scheme for token-based authentication
19
+ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
20
+
21
+ # API key header for device authentication
22
+ api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True)
23
+
24
+ # --- Password Utilities ---
25
+
26
+ def verify_password(plain_password: str, hashed_password: str) -> bool:
27
+ """Verifies a plain password against a hashed password."""
28
+ return pwd_context.verify(plain_password, hashed_password)
29
+
30
+ def hash_password(password: str) -> str:
31
+ """Hashes a plain password."""
32
+ return pwd_context.hash(password)
33
+
34
+ # --- JWT Token Utilities ---
35
+
36
+ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
37
+ """
38
+ Generates a new JWT access token.
39
+ """
40
+ to_encode = data.copy()
41
+ if expires_delta:
42
+ expire = datetime.now(timezone.utc) + expires_delta
43
+ else:
44
+ # Default expiration time: 15 minutes
45
+ expire = datetime.now(timezone.utc) + timedelta(minutes=15)
46
+
47
+ to_encode.update({"exp": expire})
48
+ encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
49
+ return encoded_jwt
50
+
51
+ # --- User Authentication and Authorization ---
52
+
53
+ async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_session)) -> User:
54
+ """
55
+ Decodes the JWT token to get the current user.
56
+ Raises an exception if the token is invalid or the user doesn't exist.
57
+ """
58
+ credentials_exception = HTTPException(
59
+ status_code=status.HTTP_401_UNAUTHORIZED,
60
+ detail="Could not validate credentials",
61
+ headers={"WWW-Authenticate": "Bearer"},
62
+ )
63
+
64
+ try:
65
+ payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
66
+ username: Optional[str] = payload.get("sub")
67
+ if username is None:
68
+ raise credentials_exception
69
+ except JWTError:
70
+ raise credentials_exception
71
+
72
+ user = db.exec(select(User).where(User.username == username)).first()
73
+ if user is None:
74
+ raise credentials_exception
75
+
76
+ return user
77
+
78
+ async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
79
+ """
80
+ Ensures the user retrieved from the token is active.
81
+ """
82
+ if not current_user.is_active:
83
+ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
84
+ return current_user
85
+
src/config.py ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic_settings import BaseSettings
2
+ import os
3
+ from dotenv import load_dotenv
4
+
5
+ load_dotenv()
6
+
7
+ class Settings(BaseSettings):
8
+ POSTGRES_URI: str
9
+ JWT_SECRET_KEY: str = os.getenv("JWT_SECRET_KEY", "default_secret_key")
10
+ ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
11
+
12
+ class Config:
13
+ env_file = ".env"
14
+
15
+ settings = Settings()
src/crud/__init__.py ADDED
@@ -0,0 +1,83 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ CRUD Package Initializer
3
+
4
+ This file makes the 'crud' directory a Python package and imports all the
5
+ public CRUD functions from the submodules. This allows you to import any
6
+ CRUD function directly from `src.crud` instead of the specific submodule,
7
+ keeping the router imports clean.
8
+ """
9
+
10
+ from .students import (
11
+ create_student,
12
+ get_all_students,
13
+ get_student_by_student_id,
14
+ get_student_by_tag_id,
15
+ update_student_tag_id,
16
+ delete_student,
17
+ )
18
+ from .users import (
19
+ create_user,
20
+ get_user_by_username,
21
+ get_user_by_tag_id,
22
+ update_user_tag_id,
23
+ get_user_by_id,
24
+ delete_user,
25
+ hash_password, # Import hash_password from users module
26
+ get_all_users
27
+ )
28
+ from .devices import (
29
+ get_device_by_id_str,
30
+ get_device_by_api_key,
31
+ create_device_log,
32
+ update_device_last_seen,
33
+ delete_device,
34
+ )
35
+ from .clearance import (
36
+ get_clearance_statuses_by_student_id,
37
+ update_clearance_status,
38
+ delete_clearance_status,
39
+ get_all_clearance_status,
40
+ get_student_clearance_status
41
+ )
42
+ from .tag_linking import (
43
+ create_pending_tag_link,
44
+ get_pending_link_by_id,
45
+ delete_pending_link_by_device_id,
46
+ get_pending_links,
47
+ )
48
+
49
+ # Export all functions
50
+ __all__ = [
51
+ # Users
52
+ 'create_user',
53
+ 'get_user_by_username',
54
+ 'get_user_by_tag_id',
55
+ 'update_user_tag_id',
56
+ 'get_user_by_id',
57
+ 'delete_user',
58
+ 'hash_password',
59
+ 'get_all_users',
60
+ # Students
61
+ 'create_student',
62
+ 'get_all_students',
63
+ 'get_student_by_student_id',
64
+ 'get_student_by_tag_id',
65
+ 'update_student_tag_id',
66
+ 'delete_student',
67
+ # Devices
68
+ 'get_device_by_id_str',
69
+ 'get_device_by_api_key',
70
+ 'create_device_log',
71
+ 'update_device_last_seen',
72
+ 'delete_device',
73
+ # Clearance
74
+ 'get_clearance_statuses_by_student_id',
75
+ 'update_clearance_status',
76
+ 'delete_clearance_status',
77
+ # Tag Linking
78
+ 'create_pending_tag_link',
79
+ 'get_pending_link_by_device_id',
80
+ 'get_pending_link_by_token',
81
+ 'delete_pending_link_by_id',
82
+ 'get_all_pending_links',
83
+ ]
src/crud/clearance.py ADDED
@@ -0,0 +1,58 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlmodel import Session, select
2
+ from typing import List, Optional
3
+
4
+ from src.models import Student, ClearanceStatus, ClearanceUpdate, Department, ClearanceProcess
5
+
6
+ def get_clearance_status_for_student(db: Session, student: Student) -> List[ClearanceStatus]:
7
+ """
8
+ Retrieves all clearance statuses for a given student object.
9
+ """
10
+ return student.clearance_statuses
11
+
12
+ def update_clearance_status(db: Session, clearance_update: ClearanceUpdate) -> Optional[ClearanceStatus]:
13
+ """
14
+ Updates the clearance status for a student in a specific department.
15
+
16
+ This function performs a direct lookup on the ClearanceStatus table, which is
17
+ more efficient than fetching the student and iterating through their statuses.
18
+ """
19
+ # First, find the student to ensure they exist.
20
+ student = db.exec(select(Student).where(Student.matric_no == clearance_update.matric_no)).first()
21
+ if not student:
22
+ return None # Student not found
23
+
24
+ # Directly query for the specific clearance status record.
25
+ statement = select(ClearanceStatus).where(
26
+ ClearanceStatus.student_id == student.id,
27
+ ClearanceStatus.department == clearance_update.department
28
+ )
29
+ status_to_update = db.exec(statement).first()
30
+
31
+ if not status_to_update:
32
+ # This case should theoretically not happen if students are created correctly,
33
+ # but it's a good safeguard.
34
+ return None
35
+
36
+ # Update the status and commit the change.
37
+ status_to_update.status = clearance_update.status
38
+ db.add(status_to_update)
39
+ db.commit()
40
+ db.refresh(status_to_update)
41
+
42
+ return status_to_update
43
+
44
+ def is_student_fully_cleared(db: Session, matric_no: str) -> bool:
45
+ """
46
+ Checks if a student has been approved by all required departments.
47
+ """
48
+ student = db.exec(select(Student).where(Student.matric_no == matric_no)).first()
49
+ if not student:
50
+ return False # Or raise an error, depending on desired behavior
51
+
52
+ # Check if any of the student's clearance statuses are NOT 'approved'.
53
+ for status in student.clearance_statuses:
54
+ if status.status != ClearanceProcess.APPROVED:
55
+ return False
56
+
57
+ # If the loop completes without returning, all statuses are approved.
58
+ return True
src/crud/devices.py ADDED
@@ -0,0 +1,74 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlmodel import Session, select
2
+ import secrets
3
+ from typing import List, Optional
4
+
5
+ from src.models import Device, DeviceCreate, Department
6
+
7
+ def create_device(db: Session, device: DeviceCreate) -> Optional[Device]:
8
+ """
9
+ Creates a new device for a department.
10
+ Generates a unique API key for authentication.
11
+ Returns None if a device with the same name already exists.
12
+ """
13
+ # Check for existing device with the same name to prevent duplicates
14
+ existing_device = db.exec(select(Device).where(Device.device_name == device.device_name)).first()
15
+ if existing_device:
16
+ return None
17
+
18
+ # Generate a secure, URL-safe API key
19
+ api_key = secrets.token_urlsafe(32)
20
+
21
+ db_device = Device(
22
+ device_name=device.device_name,
23
+ department=device.department,
24
+ api_key=api_key,
25
+ is_active=True
26
+ )
27
+
28
+ db.add(db_device)
29
+ db.commit()
30
+ db.refresh(db_device)
31
+ return db_device
32
+
33
+ def get_device_by_api_key(db: Session, api_key: str) -> Optional[Device]:
34
+ """Retrieves an active device by its API key."""
35
+ statement = select(Device).where(Device.api_key == api_key, Device.is_active == True)
36
+ return db.exec(statement).first()
37
+
38
+ def get_device_by_name(db: Session, device_name: str) -> Optional[Device]:
39
+ """Retrieves a device by its unique name."""
40
+ return db.exec(select(Device).where(Device.device_name == device_name)).first()
41
+
42
+ def get_all_devices(db: Session, skip: int = 0, limit: int = 100) -> List[Device]:
43
+ """Retrieves a list of all devices."""
44
+ return db.exec(select(Device).offset(skip).limit(limit)).all()
45
+
46
+ def update_device(db: Session, device_id: int, device_update: dict) -> Optional[Device]:
47
+ """
48
+ Updates a device's mutable properties (e.g., name, active status).
49
+ The API key is immutable and cannot be changed here.
50
+ """
51
+ db_device = db.get(Device, device_id)
52
+ if not db_device:
53
+ return None
54
+
55
+ # Exclude API key from updates for security
56
+ device_update.pop("api_key", None)
57
+
58
+ for key, value in device_update.items():
59
+ setattr(db_device, key, value)
60
+
61
+ db.add(db_device)
62
+ db.commit()
63
+ db.refresh(db_device)
64
+ return db_device
65
+
66
+ def delete_device(db: Session, device_id: int) -> Optional[Device]:
67
+ """Deletes a device from the database."""
68
+ db_device = db.get(Device, device_id)
69
+ if not db_device:
70
+ return None
71
+
72
+ db.delete(db_device)
73
+ db.commit()
74
+ return db_device
src/crud/students.py ADDED
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlmodel import Session, select
2
+ from typing import List, Optional
3
+
4
+ from src.models import Student, StudentCreate, StudentUpdate, RFIDTag, ClearanceStatus, Department, ClearanceProcess
5
+ from src.auth import hash_password
6
+
7
+ # --- Read Operations ---
8
+
9
+ def get_student_by_id(db: Session, student_id: int) -> Optional[Student]:
10
+ """Retrieves a student by their primary key ID."""
11
+ return db.get(Student, student_id)
12
+
13
+ def get_student_by_matric_no(db: Session, matric_no: str) -> Optional[Student]:
14
+ """Retrieves a student by their unique matriculation number."""
15
+ return db.exec(select(Student).where(Student.matric_no == matric_no)).first()
16
+
17
+ def get_student_by_tag_id(db: Session, tag_id: str) -> Optional[Student]:
18
+ """Retrieves a student by their linked RFID tag ID."""
19
+ statement = select(Student).join(RFIDTag).where(RFIDTag.tag_id == tag_id)
20
+ return db.exec(statement).first()
21
+
22
+ def get_all_students(db: Session, skip: int = 0, limit: int = 100) -> List[Student]:
23
+ """Retrieves a paginated list of all students."""
24
+ return db.exec(select(Student).offset(skip).limit(limit)).all()
25
+
26
+ # --- Write Operations ---
27
+
28
+ def create_student(db: Session, student: StudentCreate) -> Student:
29
+ """
30
+ Creates a new student and initializes their clearance statuses.
31
+
32
+ This is a critical business logic function. When a student is created,
33
+ this function automatically creates a 'pending' clearance record for every
34
+ department defined in the `Department` enum.
35
+ """
36
+ hashed_pass = hash_password(student.password)
37
+ db_student = Student(
38
+ matric_no=student.matric_no,
39
+ full_name=student.full_name,
40
+ email=student.email,
41
+ hashed_password=hashed_pass,
42
+ # Department will be set from the StudentCreate model
43
+ department=student.department
44
+ )
45
+
46
+ # --- Auto-populate clearance statuses ---
47
+ initial_statuses = []
48
+ for dept in Department:
49
+ status = ClearanceStatus(
50
+ department=dept,
51
+ status=ClearanceProcess.PENDING
52
+ )
53
+ initial_statuses.append(status)
54
+
55
+ db_student.clearance_statuses = initial_statuses
56
+ # --- End of auto-population ---
57
+
58
+ db.add(db_student)
59
+ db.commit()
60
+ db.refresh(db_student)
61
+ return db_student
62
+
63
+ def update_student(db: Session, student_id: int, student_update: StudentUpdate) -> Optional[Student]:
64
+ """
65
+ Updates a student's information.
66
+ If a new password is provided, it will be hashed.
67
+ """
68
+ db_student = db.get(Student, student_id)
69
+ if not db_student:
70
+ return None
71
+
72
+ update_data = student_update.model_dump(exclude_unset=True)
73
+
74
+ if "password" in update_data:
75
+ update_data["hashed_password"] = hash_password(update_data.pop("password"))
76
+
77
+ for key, value in update_data.items():
78
+ setattr(db_student, key, value)
79
+
80
+ db.add(db_student)
81
+ db.commit()
82
+ db.refresh(db_student)
83
+ return db_student
84
+
85
+ def delete_student(db: Session, student_id: int) -> Optional[Student]:
86
+ """
87
+
88
+ Deletes a student from the database.
89
+ All associated clearance statuses and the linked RFID tag will also be
90
+ deleted automatically due to the cascade settings in the data models.
91
+ """
92
+ db_student = db.get(Student, student_id)
93
+ if not db_student:
94
+ return None
95
+
96
+ db.delete(db_student)
97
+ db.commit()
98
+ return db_student
src/crud/tag_linking.py ADDED
@@ -0,0 +1,65 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlmodel import Session, select
2
+ from typing import Optional, Union
3
+
4
+ from src.models import RFIDTag, User, Student, TagLink
5
+
6
+ def link_tag(db: Session, link_data: TagLink) -> Optional[RFIDTag]:
7
+ """
8
+ Links an RFID tag to a user or student.
9
+
10
+ This function performs several crucial validation checks:
11
+ 1. Ensures the tag is not already linked to another person.
12
+ 2. Ensures the target user/student does not already have a tag.
13
+ 3. Ensures either a matric_no or username is provided.
14
+
15
+ Returns the new RFIDTag object on success, None on failure.
16
+ The calling router is responsible for raising the appropriate HTTP exception.
17
+ """
18
+ # 1. Check if the tag is already in use
19
+ existing_tag = db.exec(select(RFIDTag).where(RFIDTag.tag_id == link_data.tag_id)).first()
20
+ if existing_tag:
21
+ return None # Failure: Tag already exists
22
+
23
+ target_person: Optional[Union[User, Student]] = None
24
+
25
+ if link_data.matric_no:
26
+ target_person = db.exec(select(Student).where(Student.matric_no == link_data.matric_no)).first()
27
+ elif link_data.username:
28
+ target_person = db.exec(select(User).where(User.username == link_data.username)).first()
29
+ else:
30
+ return None # Failure: No identifier provided
31
+
32
+ if not target_person:
33
+ return None # Failure: Target person not found
34
+
35
+ # 2. Check if the person already has a tag linked
36
+ if target_person.rfid_tag:
37
+ return None # Failure: Person already has a tag
38
+
39
+ # Create and link the new tag
40
+ new_tag = RFIDTag(tag_id=link_data.tag_id)
41
+ if isinstance(target_person, Student):
42
+ new_tag.student_id = target_person.id
43
+ else:
44
+ new_tag.user_id = target_person.id
45
+
46
+ db.add(new_tag)
47
+ db.commit()
48
+ db.refresh(new_tag)
49
+
50
+ return new_tag
51
+
52
+ def unlink_tag(db: Session, tag_id: str) -> Optional[RFIDTag]:
53
+ """
54
+ Unlinks an RFID tag, making it available for re-assignment.
55
+ Returns the deleted tag object on success, None if the tag doesn't exist.
56
+ """
57
+ tag_to_delete = db.exec(select(RFIDTag).where(RFIDTag.tag_id == tag_id)).first()
58
+
59
+ if not tag_to_delete:
60
+ return None # Tag not found
61
+
62
+ db.delete(tag_to_delete)
63
+ db.commit()
64
+
65
+ return tag_to_delete
src/crud/users.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlmodel import Session, select
2
+ from typing import List, Optional
3
+
4
+ from src.models import User, UserCreate, UserUpdate, RFIDTag
5
+ from src.auth import hash_password
6
+
7
+ # --- Read Operations ---
8
+
9
+ def get_user_by_id(db: Session, user_id: int) -> Optional[User]:
10
+ """Retrieves a user by their primary key ID."""
11
+ return db.get(User, user_id)
12
+
13
+ def get_user_by_username(db: Session, username: str) -> Optional[User]:
14
+ """Retrieves a user by their unique username."""
15
+ return db.exec(select(User).where(User.username == username)).first()
16
+
17
+ def get_user_by_email(db: Session, email: str) -> Optional[User]:
18
+ """Retrieves a user by their unique email."""
19
+ return db.exec(select(User).where(User.email == email)).first()
20
+
21
+ def get_user_by_tag_id(db: Session, tag_id: str) -> Optional[User]:
22
+ """Retrieves a user by their linked RFID tag ID."""
23
+ statement = select(User).join(RFIDTag).where(RFIDTag.tag_id == tag_id)
24
+ return db.exec(statement).first()
25
+
26
+ def get_all_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
27
+ """Retrieves a paginated list of all users."""
28
+ return db.exec(select(User).offset(skip).limit(limit)).all()
29
+
30
+ # --- Write Operations ---
31
+
32
+ def create_user(db: Session, user: UserCreate) -> User:
33
+ """
34
+ Creates a new user.
35
+ - Hashes the password before saving.
36
+ - The router should handle checks for existing username/email to provide clean HTTP errors.
37
+ """
38
+ hashed_pass = hash_password(user.password)
39
+ db_user = User(
40
+ username=user.username,
41
+ email=user.email,
42
+ full_name=user.full_name,
43
+ hashed_password=hashed_pass,
44
+ role=user.role
45
+ )
46
+ db.add(db_user)
47
+ db.commit()
48
+ db.refresh(db_user)
49
+ return db_user
50
+
51
+ def update_user(db: Session, user_id: int, user_update: UserUpdate) -> Optional[User]:
52
+ """
53
+ Updates a user's information.
54
+ - If a new password is provided, it will be hashed.
55
+ """
56
+ db_user = db.get(User, user_id)
57
+ if not db_user:
58
+ return None
59
+
60
+ update_data = user_update.model_dump(exclude_unset=True)
61
+
62
+ # Hash password if it's being updated
63
+ if "password" in update_data:
64
+ update_data["hashed_password"] = hash_password(update_data.pop("password"))
65
+
66
+ for key, value in update_data.items():
67
+ setattr(db_user, key, value)
68
+
69
+ db.add(db_user)
70
+ db.commit()
71
+ db.refresh(db_user)
72
+ return db_user
73
+
74
+ def delete_user(db: Session, user_id: int) -> Optional[User]:
75
+ """
76
+ Deletes a user from the database.
77
+ The linked RFID tag will also be deleted due to cascade settings.
78
+ """
79
+ db_user = db.get(User, user_id)
80
+ if not db_user:
81
+ return None
82
+
83
+ db.delete(db_user)
84
+ db.commit()
85
+ return db_user
src/crud/utils.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Utility functions for CRUD operations.
3
+ """
4
+ import bcrypt
5
+
6
+ def hash_password(password: str) -> str:
7
+ """Hashes a password using bcrypt."""
8
+ return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
9
+
10
+ def verify_password(plain_password: str, hashed_password_str: str) -> bool:
11
+ """Verifies a plain password against a hashed password."""
12
+ if not plain_password or not hashed_password_str:
13
+ return False
14
+ return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password_str.encode('utf-8'))
src/database.py ADDED
@@ -0,0 +1,35 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlmodel import create_engine, Session, SQLModel
2
+ from src.config import settings
3
+
4
+ # --- Database Engine Setup ---
5
+
6
+ # The database URL is constructed from the application settings.
7
+ # This makes it easy to switch between different database environments (e.g., dev, test, prod).
8
+ DATABASE_URL = settings.POSTGRES_URI
9
+ engine = create_engine(DATABASE_URL, echo=True) # echo=True logs SQL queries, useful for debugging
10
+
11
+ # --- Database Initialization ---
12
+
13
+ def create_db_and_tables():
14
+ """
15
+ Creates all database tables defined by SQLModel metadata.
16
+ This function is called once at application startup.
17
+ """
18
+ print("Initializing database...")
19
+ SQLModel.metadata.create_all(engine)
20
+ print("Database tables created successfully (if they didn't exist).")
21
+
22
+ # --- Database Session Management ---
23
+
24
+ def get_session():
25
+ """
26
+ A FastAPI dependency that provides a database session for each request.
27
+ It ensures that the session is always closed after the request is finished,
28
+ even if an error occurs.
29
+ """
30
+ with Session(engine) as session:
31
+ try:
32
+ yield session
33
+ finally:
34
+ session.close()
35
+
src/models.py ADDED
@@ -0,0 +1,130 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlmodel import Field, Relationship, SQLModel
2
+ from typing import List, Optional
3
+ from enum import Enum as PyEnum
4
+
5
+ # --- Enums for Controlled Vocabularies ---
6
+ # Using enums ensures data consistency for categorical fields.
7
+
8
+ class UserRole(str, PyEnum):
9
+ STUDENT = "student"
10
+ STAFF = "staff"
11
+ ADMIN = "admin"
12
+
13
+ class Department(str, PyEnum):
14
+ LIBRARY = "library"
15
+ BURSARY = "bursary"
16
+ ALUMNI = "alumni"
17
+ DEPARTMENTAL = "departmental"
18
+
19
+ class ClearanceProcess(str, PyEnum):
20
+ PENDING = "pending"
21
+ APPROVED = "approved"
22
+ REJECTED = "rejected"
23
+
24
+ # --- Database Table Models ---
25
+
26
+ # Represents a User (Staff or Admin)
27
+ class User(SQLModel, table=True):
28
+ id: Optional[int] = Field(default=None, primary_key=True)
29
+ username: str = Field(index=True, unique=True)
30
+ email: str = Field(unique=True)
31
+ full_name: Optional[str] = None
32
+ hashed_password: str
33
+ role: UserRole = Field(default=UserRole.STAFF)
34
+ is_active: bool = Field(default=True)
35
+
36
+ # One-to-one relationship with an RFID tag
37
+ rfid_tag: Optional["RFIDTag"] = Relationship(back_populates="user")
38
+
39
+ # Represents a Student
40
+ class Student(SQLModel, table=True):
41
+ id: Optional[int] = Field(default=None, primary_key=True)
42
+ matric_no: str = Field(index=True, unique=True)
43
+ full_name: str
44
+ email: str = Field(unique=True)
45
+ department: Department
46
+ hashed_password: str
47
+
48
+ # One-to-many relationship with clearance statuses
49
+ clearance_statuses: List["ClearanceStatus"] = Relationship(
50
+ back_populates="student", sa_relationship_kwargs={"cascade": "all, delete-orphan"}
51
+ )
52
+ # One-to-one relationship with an RFID tag
53
+ rfid_tag: Optional["RFIDTag"] = Relationship(back_populates="student")
54
+
55
+ # Represents an RFID tag, linking it to either a User or a Student
56
+ class RFIDTag(SQLModel, table=True):
57
+ id: Optional[int] = Field(default=None, primary_key=True)
58
+ tag_id: str = Field(index=True, unique=True, description="The unique ID from the RFID chip")
59
+
60
+ # Foreign keys to link to User or Student (only one should be set)
61
+ user_id: Optional[int] = Field(default=None, foreign_key="user.id")
62
+ student_id: Optional[int] = Field(default=None, foreign_key="student.id")
63
+
64
+ # Relationships back to the owner of the tag
65
+ user: Optional[User] = Relationship(back_populates="rfid_tag")
66
+ student: Optional[Student] = Relationship(back_populates="rfid_tag")
67
+
68
+ # Represents a single clearance status for a student in a specific department
69
+ class ClearanceStatus(SQLModel, table=True):
70
+ id: Optional[int] = Field(default=None, primary_key=True)
71
+ department: Department
72
+ status: ClearanceProcess = Field(default=ClearanceProcess.PENDING)
73
+
74
+ student_id: int = Field(foreign_key="student.id")
75
+ student: Student = Relationship(back_populates="clearance_statuses")
76
+
77
+ # Represents a physical ESP32 device
78
+ class Device(SQLModel, table=True):
79
+ id: Optional[int] = Field(default=None, primary_key=True)
80
+ device_name: str = Field(index=True, unique=True)
81
+ api_key: str = Field(unique=True)
82
+ is_active: bool = Field(default=True)
83
+ department: Department
84
+
85
+ # --- Pydantic Models for API Operations ---
86
+ # These models define the shape of data for creating and updating records via the API.
87
+
88
+ # For Users
89
+ class UserCreate(SQLModel):
90
+ username: str
91
+ email: str
92
+ password: str
93
+ full_name: Optional[str] = None
94
+ role: UserRole = UserRole.STAFF
95
+
96
+ class UserUpdate(SQLModel):
97
+ email: Optional[str] = None
98
+ full_name: Optional[str] = None
99
+ password: Optional[str] = None
100
+ is_active: Optional[bool] = None
101
+
102
+ # For Students
103
+ class StudentCreate(SQLModel):
104
+ matric_no: str
105
+ full_name: str
106
+ email: str
107
+ password: str
108
+
109
+ class StudentUpdate(SQLModel):
110
+ full_name: Optional[str] = None
111
+ email: Optional[str] = None
112
+ password: Optional[str] = None
113
+
114
+ # For Devices
115
+ class DeviceCreate(SQLModel):
116
+ device_name: str
117
+ department: Department
118
+
119
+ # For Tag Linking
120
+ class TagLink(SQLModel):
121
+ tag_id: str
122
+ matric_no: Optional[str] = None
123
+ username: Optional[str] = None
124
+
125
+ # For Clearance Updates
126
+ class ClearanceUpdate(SQLModel):
127
+ matric_no: str
128
+ department: Department
129
+ status: ClearanceProcess
130
+
src/routers/__init__.py ADDED
File without changes
src/routers/admin.py ADDED
@@ -0,0 +1,225 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Depends, HTTPException, status, Query
2
+ from sqlmodel import Session
3
+ from typing import List, Optional, Dict
4
+
5
+ from src.database import get_session
6
+ from src.auth import get_current_active_user
7
+ from src.models import (
8
+ User, UserCreate, UserRead, UserUpdate, Role,
9
+ Student, StudentCreate, StudentRead, StudentUpdate, StudentReadWithClearance,
10
+ TagLink, RFIDTagRead, TagScan,
11
+ Device, DeviceCreate, DeviceRead
12
+ )
13
+ from src.crud import users as user_crud
14
+ from src.crud import students as student_crud
15
+ from src.crud import tag_linking as tag_crud
16
+ from src.crud import devices as device_crud
17
+
18
+ # In-memory storage for last scanned tags by an admin.
19
+ # Key: admin user ID, Value: last scanned tag ID.
20
+ # This is simple and effective for a non-distributed system.
21
+ admin_scanned_tags: Dict[int, str] = {}
22
+
23
+ # Define the router.
24
+ # It's accessible to both Admins and Staff, providing a unified management panel.
25
+ router = APIRouter(
26
+ prefix="/admin",
27
+ tags=["Administration"],
28
+ dependencies=[Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF]))],
29
+ )
30
+
31
+ # --- Admin Tag Scanning for Web UI ---
32
+
33
+ @router.post("/tags/scan", status_code=status.HTTP_204_NO_CONTENT)
34
+ def report_scanned_tag(
35
+ scan_data: TagScan,
36
+ current_user: User = Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF]))
37
+ ):
38
+ """
39
+ (Admin & Staff) Endpoint for the admin's desk RFID reader to report a scanned tag.
40
+ The backend stores this tag ID temporarily against the admin's user ID.
41
+ """
42
+ admin_scanned_tags[current_user.id] = scan_data.tag_id
43
+ return
44
+
45
+ @router.get("/tags/scan", response_model=TagScan)
46
+ def retrieve_scanned_tag(
47
+ current_user: User = Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF]))
48
+ ):
49
+ """
50
+ (Admin & Staff) Endpoint for the admin's web portal to retrieve the last scanned tag.
51
+ The tag is removed after being retrieved to prevent re-use.
52
+ """
53
+ tag_id = admin_scanned_tags.pop(current_user.id, None)
54
+ if not tag_id:
55
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No tag has been scanned recently.")
56
+ return TagScan(tag_id=tag_id)
57
+
58
+
59
+ # --- Student Management (Admin + Staff) ---
60
+
61
+ @router.post("/students/", response_model=StudentReadWithClearance, status_code=status.HTTP_201_CREATED)
62
+ def create_student(student: StudentCreate, db: Session = Depends(get_session)):
63
+ """(Admin & Staff) Creates a new student and initializes their clearance status."""
64
+ db_student = student_crud.get_student_by_matric_no(db, matric_no=student.matric_no)
65
+ if db_student:
66
+ raise HTTPException(status_code=400, detail="Matriculation number already registered")
67
+ return student_crud.create_student(db=db, student=student)
68
+
69
+ @router.get("/students/", response_model=List[StudentReadWithClearance])
70
+ def read_all_students(skip: int = 0, limit: int = 100, db: Session = Depends(get_session)):
71
+ """(Admin & Staff) Retrieves a list of all student records."""
72
+ return student_crud.get_all_students(db, skip=skip, limit=limit)
73
+
74
+ @router.get("/students/lookup", response_model=StudentReadWithClearance)
75
+ def lookup_student(
76
+ matric_no: Optional[str] = Query(None, description="Matriculation number of the student."),
77
+ tag_id: Optional[str] = Query(None, description="RFID tag ID linked to the student."),
78
+ db: Session = Depends(get_session)
79
+ ):
80
+ """(Admin & Staff) Looks up a single student by Matric Number OR Tag ID."""
81
+ if not matric_no and not tag_id:
82
+ raise HTTPException(status_code=400, detail="A matric_no or tag_id must be provided.")
83
+ if matric_no and tag_id:
84
+ raise HTTPException(status_code=400, detail="Provide either matric_no or tag_id, not both.")
85
+
86
+ db_student = None
87
+ if matric_no:
88
+ db_student = student_crud.get_student_by_matric_no(db, matric_no=matric_no)
89
+ elif tag_id:
90
+ db_student = student_crud.get_student_by_tag_id(db, tag_id=tag_id)
91
+
92
+ if not db_student:
93
+ raise HTTPException(status_code=404, detail="Student not found with the provided identifier.")
94
+ return db_student
95
+
96
+
97
+ @router.get("/students/{student_id}", response_model=StudentReadWithClearance)
98
+ def read_single_student(student_id: int, db: Session = Depends(get_session)):
99
+ """(Admin & Staff) Retrieves a single student's complete record by their internal ID."""
100
+ db_student = student_crud.get_student_by_id(db, student_id=student_id)
101
+ if not db_student:
102
+ raise HTTPException(status_code=404, detail="Student not found")
103
+ return db_student
104
+
105
+ @router.put("/students/{student_id}", response_model=StudentReadWithClearance)
106
+ def update_student_details(student_id: int, student: StudentUpdate, db: Session = Depends(get_session)):
107
+ """(Admin & Staff) Updates a student's information."""
108
+ updated_student = student_crud.update_student(db, student_id=student_id, student_update=student)
109
+ if not updated_student:
110
+ raise HTTPException(status_code=404, detail="Student not found")
111
+ return updated_student
112
+
113
+ # --- Tag Management (Admin + Staff) ---
114
+
115
+ @router.post("/tags/link", response_model=RFIDTagRead)
116
+ def link_rfid_tag(link_data: TagLink, db: Session = Depends(get_session)):
117
+ """(Admin & Staff) Links an RFID tag to a student or user."""
118
+ new_tag = tag_crud.link_tag(db, link_data)
119
+ if not new_tag:
120
+ raise HTTPException(
121
+ status_code=status.HTTP_409_CONFLICT,
122
+ detail="Could not link tag. The tag may already be in use, or the user/student already has a tag."
123
+ )
124
+ return new_tag
125
+
126
+ @router.delete("/tags/{tag_id}/unlink", response_model=RFIDTagRead)
127
+ def unlink_rfid_tag(tag_id: str, db: Session = Depends(get_session)):
128
+ """(Admin & Staff) Unlinks an RFID tag, making it available again."""
129
+ deleted_tag = tag_crud.unlink_tag(db, tag_id)
130
+ if not deleted_tag:
131
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="RFID Tag not found.")
132
+ return deleted_tag
133
+
134
+
135
+ # --- Super Admin Only Functions ---
136
+
137
+ def require_super_admin(current_user: User = Depends(get_current_active_user())):
138
+ """Dependency to ensure a user has the ADMIN role."""
139
+ if current_user.role != Role.ADMIN:
140
+ raise HTTPException(
141
+ status_code=status.HTTP_403_FORBIDDEN,
142
+ detail="This action requires Super Admin privileges."
143
+ )
144
+
145
+ @router.post("/users/", response_model=UserRead, status_code=status.HTTP_201_CREATED, dependencies=[Depends(require_super_admin)])
146
+ def create_user(user: UserCreate, db: Session = Depends(get_session)):
147
+ """(Super Admin Only) Creates a new user (Staff or Admin)."""
148
+ db_user = user_crud.get_user_by_username(db, username=user.username)
149
+ if db_user:
150
+ raise HTTPException(status_code=400, detail="Username already registered")
151
+ return user_crud.create_user(db=db, user=user)
152
+
153
+ @router.get("/users/", response_model=List[UserRead], dependencies=[Depends(require_super_admin)])
154
+ def read_all_users(db: Session = Depends(get_session)):
155
+ """(Super Admin Only) Retrieves a list of all users."""
156
+ return user_crud.get_all_users(db)
157
+
158
+ @router.get("/users/lookup", response_model=UserRead, dependencies=[Depends(require_super_admin)])
159
+ def lookup_user(
160
+ username: Optional[str] = Query(None, description="Username of the user."),
161
+ tag_id: Optional[str] = Query(None, description="RFID tag ID linked to the user."),
162
+ db: Session = Depends(get_session)
163
+ ):
164
+ """(Super Admin Only) Looks up a single user by Username OR Tag ID."""
165
+ if not username and not tag_id:
166
+ raise HTTPException(status_code=400, detail="A username or tag_id must be provided.")
167
+ if username and tag_id:
168
+ raise HTTPException(status_code=400, detail="Provide either username or tag_id, not both.")
169
+
170
+ db_user = None
171
+ if username:
172
+ db_user = user_crud.get_user_by_username(db, username=username)
173
+ elif tag_id:
174
+ db_user = user_crud.get_user_by_tag_id(db, tag_id=tag_id)
175
+
176
+ if not db_user:
177
+ raise HTTPException(status_code=404, detail="User not found with the provided identifier.")
178
+ return db_user
179
+
180
+ @router.put("/users/{user_id}", response_model=UserRead, dependencies=[Depends(require_super_admin)])
181
+ def update_user_details(user_id: int, user: UserUpdate, db: Session = Depends(get_session)):
182
+ """(Super Admin Only) Updates a user's details (e.g., role)."""
183
+ updated_user = user_crud.update_user(db, user_id=user_id, user_update=user)
184
+ if not updated_user:
185
+ raise HTTPException(status_code=404, detail="User not found")
186
+ return updated_user
187
+
188
+ @router.delete("/users/{user_id}", response_model=UserRead, dependencies=[Depends(require_super_admin)])
189
+ def delete_user_account(user_id: int, db: Session = Depends(get_session), current_user: User = Depends(get_current_active_user())):
190
+ """(Super Admin Only) Deletes a user account."""
191
+ if current_user.id == user_id:
192
+ raise HTTPException(status_code=400, detail="Cannot delete your own account.")
193
+ deleted_user = user_crud.delete_user(db, user_id=user_id)
194
+ if not deleted_user:
195
+ raise HTTPException(status_code=404, detail="User not found")
196
+ return deleted_user
197
+
198
+ @router.delete("/students/{student_id}", response_model=StudentRead, dependencies=[Depends(require_super_admin)])
199
+ def delete_student_record(student_id: int, db: Session = Depends(get_session)):
200
+ """(Super Admin Only) Deletes a student record and all associated data."""
201
+ deleted_student = student_crud.delete_student(db, student_id=student_id)
202
+ if not deleted_student:
203
+ raise HTTPException(status_code=404, detail="Student not found")
204
+ return deleted_student
205
+
206
+ @router.post("/devices/", response_model=DeviceRead, status_code=status.HTTP_201_CREATED, dependencies=[Depends(require_super_admin)])
207
+ def create_device(device: DeviceCreate, db: Session = Depends(get_session)):
208
+ """(Super Admin Only) Registers a new RFID hardware device."""
209
+ db_device = device_crud.get_device_by_location(db, location=device.location)
210
+ if db_device:
211
+ raise HTTPException(status_code=400, detail=f"A device at location '{device.location}' already exists.")
212
+ return device_crud.create_device(db=db, device=device)
213
+
214
+ @router.get("/devices/", response_model=List[DeviceRead], dependencies=[Depends(require_super_admin)])
215
+ def read_all_devices(db: Session = Depends(get_session)):
216
+ """(Super Admin Only) Retrieves a list of all registered devices."""
217
+ return device_crud.get_all_devices(db)
218
+
219
+ @router.delete("/devices/{device_id}", response_model=DeviceRead, dependencies=[Depends(require_super_admin)])
220
+ def delete_device_registration(device_id: int, db: Session = Depends(get_session)):
221
+ """(Super Admin Only) De-authorizes a hardware device."""
222
+ deleted_device = device_crud.delete_device(db, device_id=device_id)
223
+ if not deleted_device:
224
+ raise HTTPException(status_code=404, detail="Device not found")
225
+ return deleted_device
src/routers/clearance.py ADDED
@@ -0,0 +1,38 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Depends, HTTPException, status
2
+ from sqlmodel import Session
3
+
4
+ from src.database import get_session
5
+ from src.auth import get_current_active_user
6
+ from src.models import User, Role, ClearanceStatus, ClearanceUpdate, ClearanceStatusRead
7
+ from src.crud import clearance as clearance_crud
8
+
9
+ router = APIRouter(
10
+ prefix="/clearance",
11
+ tags=["Clearance"],
12
+ dependencies=[Depends(get_current_active_user(required_roles=[Role.STAFF, Role.ADMIN]))],
13
+ )
14
+
15
+ @router.put("/update", response_model=ClearanceStatusRead)
16
+ def update_student_clearance_status(
17
+ clearance_update: ClearanceUpdate,
18
+ db: Session = Depends(get_session),
19
+ # The current_user object is injected by the dependency
20
+ current_user: User = Depends(get_current_active_user(required_roles=[Role.STAFF, Role.ADMIN]))
21
+ ):
22
+ """
23
+ Endpoint for staff to update a student's clearance status.
24
+ A staff member can only approve for their own department.
25
+ (Future enhancement could enforce this rule more strictly).
26
+ """
27
+ # A potential security check: ensure staff's department matches clearance_update.department
28
+ # For now, we trust the role.
29
+
30
+ updated_status = clearance_crud.update_clearance_status(db, clearance_update)
31
+
32
+ if not updated_status:
33
+ raise HTTPException(
34
+ status_code=status.HTTP_404_NOT_FOUND,
35
+ detail=f"No clearance record found for student {clearance_update.matric_no} in department {clearance_update.department}"
36
+ )
37
+
38
+ return updated_status
src/routers/devices.py ADDED
@@ -0,0 +1,68 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Depends, HTTPException, status
2
+ from sqlmodel import Session
3
+ from typing import List
4
+
5
+ from src.database import get_session
6
+ from src.auth import get_current_active_user
7
+ from src.models import Role, Device, DeviceCreate, DeviceRead
8
+ from src.crud import devices as device_crud
9
+
10
+ # Define the router with admin-only access
11
+ router = APIRouter(
12
+ prefix="/devices",
13
+ tags=["Devices"],
14
+ dependencies=[Depends(get_current_active_user(required_roles=[Role.ADMIN]))],
15
+ )
16
+
17
+ @router.post("/", response_model=DeviceRead, status_code=status.HTTP_201_CREATED)
18
+ def create_device(
19
+ device: DeviceCreate,
20
+ db: Session = Depends(get_session)
21
+ ):
22
+ """
23
+ Admin endpoint to register a new RFID hardware device.
24
+
25
+ This generates a unique API key that the device must use to authenticate.
26
+ A device's location must be unique.
27
+ """
28
+ # Check if a device with the same location already exists
29
+ db_device = device_crud.get_device_by_location(db, location=device.location)
30
+ if db_device:
31
+ raise HTTPException(
32
+ status_code=status.HTTP_400_BAD_REQUEST,
33
+ detail=f"A device at location '{device.location}' already exists."
34
+ )
35
+
36
+ # Create the new device and its API key
37
+ return device_crud.create_device(db=db, device=device)
38
+
39
+
40
+ @router.get("/", response_model=List[DeviceRead])
41
+ def read_all_devices(
42
+ skip: int = 0,
43
+ limit: int = 100,
44
+ db: Session = Depends(get_session)
45
+ ):
46
+ """
47
+ Admin endpoint to retrieve a list of all registered hardware devices.
48
+ """
49
+ return device_crud.get_all_devices(db, skip=skip, limit=limit)
50
+
51
+
52
+ @router.delete("/{device_id}", response_model=DeviceRead)
53
+ def delete_device(
54
+ device_id: int,
55
+ db: Session = Depends(get_session)
56
+ ):
57
+ """
58
+ Admin endpoint to delete/de-authorize a hardware device.
59
+
60
+ This will render the device's API key invalid.
61
+ """
62
+ db_device = device_crud.delete_device(db, device_id=device_id)
63
+ if not db_device:
64
+ raise HTTPException(
65
+ status_code=status.HTTP_404_NOT_FOUND,
66
+ detail="Device not found."
67
+ )
68
+ return db_device
src/routers/rfid.py ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Depends, HTTPException, status, Security
2
+ from fastapi.security import APIKeyHeader
3
+ from sqlmodel import Session
4
+
5
+ from src.database import get_session
6
+ from src.auth import get_api_key
7
+ from src.models import RFIDStatusResponse, RFIDScanRequest
8
+ from src.crud import students as student_crud
9
+ from src.crud import users as user_crud
10
+
11
+ # Define the router and the API key security scheme
12
+ router = APIRouter(prefix="/rfid", tags=["RFID"])
13
+ api_key_header = APIKeyHeader(name="x-api-key", auto_error=False)
14
+
15
+ @router.post("/check-status", response_model=RFIDStatusResponse)
16
+ def check_rfid_status(
17
+ scan_data: RFIDScanRequest,
18
+ db: Session = Depends(get_session),
19
+ # This dependency ensures the request comes from a valid, registered device
20
+ api_key: str = Security(get_api_key),
21
+ ):
22
+ """
23
+ Public endpoint for hardware devices to check the status of a scanned RFID tag.
24
+ The device must provide a valid API key in the 'x-api-key' header.
25
+ """
26
+ tag_id = scan_data.tag_id
27
+
28
+ # 1. Check if the tag belongs to a student
29
+ student = student_crud.get_student_by_tag_id(db, tag_id=tag_id)
30
+ if student:
31
+ # Check overall clearance status
32
+ is_cleared = all(
33
+ status.status == "approved" for status in student.clearance_statuses
34
+ )
35
+ clearance_status_str = "Fully Cleared" if is_cleared else "Pending Clearance"
36
+
37
+ return RFIDStatusResponse(
38
+ status="found",
39
+ full_name=student.full_name,
40
+ entity_type="Student",
41
+ clearance_status=clearance_status_str,
42
+ )
43
+
44
+ # 2. If not a student, check if it belongs to a user (staff/admin)
45
+ user = user_crud.get_user_by_tag_id(db, tag_id=tag_id)
46
+ if user:
47
+ return RFIDStatusResponse(
48
+ status="found",
49
+ full_name=user.full_name,
50
+ entity_type=user.role.value, # e.g. "Admin" or "Staff"
51
+ clearance_status="N/A",
52
+ )
53
+
54
+ # 3. If the tag is not linked to anyone
55
+ return RFIDStatusResponse(
56
+ status="unregistered",
57
+ full_name=None,
58
+ entity_type=None,
59
+ clearance_status=None,
60
+ )
src/routers/students.py ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Depends, HTTPException, status
2
+ from sqlmodel import Session
3
+
4
+ from src.database import get_session
5
+ from src.auth import get_current_active_student
6
+ from src.models import Student, StudentReadWithClearance
7
+
8
+ router = APIRouter(
9
+ prefix="/students",
10
+ tags=["Students"],
11
+ )
12
+
13
+ @router.get("/me", response_model=StudentReadWithClearance)
14
+ def read_student_me(
15
+ # This dependency ensures the user is an authenticated student
16
+ # and injects their database object into the 'current_student' parameter.
17
+ current_student: Student = Depends(get_current_active_student)
18
+ ):
19
+ """
20
+ Endpoint for a logged-in student to retrieve their own profile
21
+ and clearance information. The user is identified via their JWT token.
22
+ """
23
+ # Because the dependency returns the full student object, we can just return it.
24
+ # No need for another database call.
25
+ if not current_student:
26
+ # This should not happen if the dependency is set up correctly
27
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Student not found")
28
+ return current_student
29
+
src/routers/token.py ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Depends, HTTPException, status
2
+ from fastapi.security import OAuth2PasswordRequestForm
3
+ from sqlmodel import Session
4
+ from datetime import timedelta
5
+
6
+ from src.database import get_session
7
+ from src.auth import authenticate_user, create_access_token
8
+ from src.models import Token
9
+ from src.config import settings
10
+
11
+ router = APIRouter(tags=["Authentication"])
12
+
13
+ @router.post("/token", response_model=Token)
14
+ async def login_for_access_token(
15
+ form_data: OAuth2PasswordRequestForm = Depends(),
16
+ db: Session = Depends(get_session)
17
+ ):
18
+ """
19
+ Provides a JWT access token for a valid user (student or staff).
20
+
21
+ This is the primary login endpoint. It uses the standard OAuth2
22
+ password flow. The client sends 'username' and 'password' in a
23
+ form-data body.
24
+ """
25
+ # The authenticate_user function will check both Student and User tables
26
+ user = authenticate_user(db, form_data.username, form_data.password)
27
+
28
+ if not user:
29
+ raise HTTPException(
30
+ status_code=status.HTTP_401_UNAUTHORIZED,
31
+ detail="Incorrect username or password",
32
+ headers={"WWW-Authenticate": "Bearer"},
33
+ )
34
+
35
+ # Create the JWT token
36
+ access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
37
+ access_token = create_access_token(
38
+ data={"sub": user.email}, expires_delta=access_token_expires
39
+ )
40
+
41
+ return {"access_token": access_token, "token_type": "bearer"}
src/routers/users.py ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Depends
2
+ from sqlmodel import Session
3
+
4
+ from src.database import get_session
5
+ from src.auth import get_current_active_user
6
+ from src.models import User, UserRead, Role
7
+
8
+ # Define the router
9
+ router = APIRouter(
10
+ prefix="/users",
11
+ tags=["Users"],
12
+ )
13
+
14
+ @router.get("/me", response_model=UserRead)
15
+ def read_user_me(
16
+ # This dependency ensures the user is an authenticated Admin or Staff
17
+ # and injects their database object into the 'current_user' parameter.
18
+ current_user: User = Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF]))
19
+ ):
20
+ """
21
+ Endpoint for a logged-in user (Admin or Staff) to retrieve their own profile.
22
+ The user is identified via their JWT token.
23
+ """
24
+ # The dependency handles fetching the user, so we just return it.
25
+ return current_user
src/utils.py ADDED
@@ -0,0 +1,51 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Dict, Any
2
+ from sqlalchemy.orm import Session as SQLAlchemySessionType
3
+ from fastapi.concurrency import run_in_threadpool
4
+
5
+ from src import crud, models
6
+
7
+ async def format_student_clearance_details(
8
+ db: SQLAlchemySessionType,
9
+ student_orm: models.Student
10
+ ) -> models.ClearanceDetail:
11
+ """
12
+ Formats clearance details for a student.
13
+
14
+ Args:
15
+ db (SQLAlchemySessionType): Database session.
16
+ student_orm (models.Student): ORM model of the student.
17
+
18
+ Returns:
19
+ models.ClearanceDetail: Formatted clearance details.
20
+ """
21
+ statuses_orm_list = await run_in_threadpool(crud.get_clearance_statuses_by_student_id, db, student_orm.student_id)
22
+
23
+ clearance_items_models: List[models.ClearanceStatusItem] = []
24
+ overall_status_val = models.OverallClearanceStatusEnum.COMPLETED
25
+
26
+
27
+ if not statuses_orm_list:
28
+ overall_status_val = models.OverallClearanceStatusEnum.PENDING
29
+
30
+ for status_orm in statuses_orm_list:
31
+ item = models.ClearanceStatusItem(
32
+ department=status_orm.department,
33
+ status=status_orm.status,
34
+ remarks=status_orm.remarks,
35
+ updated_at=status_orm.updated_at
36
+ )
37
+ clearance_items_models.append(item)
38
+ if item.status != models.ClearanceStatusEnum.COMPLETED:
39
+ overall_status_val = models.OverallClearanceStatusEnum.PENDING
40
+
41
+ if not statuses_orm_list and overall_status_val == models.OverallClearanceStatusEnum.COMPLETED:
42
+ overall_status_val = models.OverallClearanceStatusEnum.PENDING
43
+
44
+ return models.ClearanceDetail(
45
+ student_id=student_orm.student_id,
46
+ name=student_orm.name,
47
+ department=student_orm.department,
48
+ clearance_items=clearance_items_models,
49
+ overall_status=overall_status_val
50
+ )
51
+