File size: 9,111 Bytes
96b013c
 
 
 
a09ee49
1771391
 
96b013c
 
 
 
d45fbd5
d3ab314
a09ee49
96b013c
 
 
 
a09ee49
96b013c
 
d45fbd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96b013c
 
 
 
 
 
 
 
 
 
a09ee49
 
96b013c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a09ee49
 
96b013c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
394841d
 
 
 
 
 
 
 
 
 
d45fbd5
394841d
d45fbd5
 
394841d
d45fbd5
 
 
 
 
394841d
96b013c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from fastapi import HTTPException, status, Header, Depends
from fastapi.security import OAuth2PasswordBearer
from fastapi.concurrency import run_in_threadpool # For calling sync crud in async auth
from sqlalchemy.orm import Session as SQLAlchemySessionType

from src import crud, models 

from src.database import get_db
from typing import Optional, Dict, Any # Added Any
from datetime import datetime, timedelta
from typing import Union # For type hinting
from jose import JWTError, jwt
from passlib.context import CryptContext

# JWT Configuration - Loaded from models.py (which loads from .env)
SECRET_KEY = models.JWT_SECRET_KEY
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/token/login") # Path to token endpoint

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Password hashing context
# Password hashing context from models.py

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """
    Verifies a plain password against a hashed password.
    Uses the CryptContext to verify the password.
    """
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """
    Hashes a password using the CryptContext.
    This is used when creating or updating user passwords.
    """
    return pwd_context.hash(password)


# --- JWT Helper Functions ---
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire, "iat": datetime.utcnow()}) # Add issued_at time
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user_from_token(
    token: str = Depends(oauth2_scheme),
    db: SQLAlchemySessionType = Depends(get_db)
) -> models.User: # Now aims to return the ORM User model
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: Optional[str] = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = models.TokenData(username=username) # Use if TokenData has more fields
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token has expired",
            headers={"WWW-Authenticate": "Bearer"},)
    except jwt.PyJWTError:
        raise credentials_exception
    
    # User is fetched using sync ORM function, so run in threadpool if this dep is used by async endpoint
    user_orm = await run_in_threadpool(crud.get_user_by_username, db, username)
    if user_orm is None:
        raise credentials_exception
    return user_orm # Return the ORM model instance

async def get_current_active_user(
    current_user: models.User = Depends(get_current_user_from_token)
) -> models.User: # Expects and returns ORM User
    if not current_user.is_active:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
    return current_user

async def get_current_active_staff_user_from_token(
    current_user: models.User = Depends(get_current_active_user)
) -> models.User: # Expects and returns ORM User
    if current_user.role not in [models.UserRole.STAFF, models.UserRole.ADMIN]:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Staff or admin access required")
    return current_user

async def get_current_active_admin_user_from_token(
    current_user: models.User = Depends(get_current_active_user)
) -> models.User: # Expects and returns ORM User
    if current_user.role != models.UserRole.ADMIN:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
    return current_user

# Dependency to get and verify API key from header (Device Authentication)
async def get_verified_device(
    x_api_key: str = Header(..., description="The API Key for the ESP32 device."),
    db: SQLAlchemySessionType = Depends(get_db)
) -> models.Device: # Returns the ORM Device model
    """
    Verifies API key and returns the active ORM Device model.
    """
    if not x_api_key:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing API key")
    
    # Use run_in_threadpool as crud.get_device_by_api_key is sync
    device_orm = await run_in_threadpool(crud.get_device_by_api_key, db, x_api_key)

    if not device_orm:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API Key.")
    if not device_orm.is_active:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Device is not active.")
    return device_orm


async def authenticate_user(
    username: str,
    password: str,
    db: SQLAlchemySessionType = Depends(get_db)
) -> models.User: # Returns ORM User model
    """
    Authenticates a user by username and password.
    Returns the ORM User model if successful, raises HTTPException otherwise.
    """
    user = await run_in_threadpool(crud.get_user_by_username, db, username)
    
    if not user:
        return None  # User not found, return None
    
    is_password_valid = verify_password(password, user.hashed_password)
    if not is_password_valid:
        return None

    return user  # Return the ORM User model if password is valid

# Tag-based authentication (User/Student Authentication via RFID tag)
async def authenticate_tag_user_or_student( # Renamed for clarity
    tag_id: str = Header(..., alias="X-User-Tag-ID", description="RFID Tag ID of the user or student"),
    db: SQLAlchemySessionType = Depends(get_db)
) -> Union[models.Student, models.User]: # Returns Student or User ORM model
    """
    Authenticates a tag and returns the corresponding Student or User ORM model.
    Used as a base for tag-based auth dependencies.
    """
    # Run sync ORM calls in threadpool
    student_orm = await run_in_threadpool(crud.get_student_by_tag_id, db, tag_id)
    if student_orm:
        return student_orm # Return Student ORM model
    
    user_orm = await run_in_threadpool(crud.get_user_by_tag_id, db, tag_id) # get_user_by_tag_id checks is_active
    if user_orm: # user_orm already checked for is_active in CRUD
        return user_orm # Return User ORM model
    
    raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Tag not registered or associated user/student is inactive.")

# Dependency for current student via Tag ID
async def get_current_student_via_tag(
    authenticated_entity: Union[models.Student, models.User] = Depends(authenticate_tag_user_or_student)
) -> models.Student:
    if not isinstance(authenticated_entity, models.Student):
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access restricted to students only.")
    return authenticated_entity

# Dependency for current staff or admin via Tag ID
async def get_current_staff_or_admin_via_tag(
    authenticated_entity: Union[models.Student, models.User] = Depends(authenticate_tag_user_or_student)
) -> models.User:
    if not isinstance(authenticated_entity, models.User): # It's a student
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Staff or admin access required.")
    # authenticated_entity is User ORM model
    if authenticated_entity.role not in [models.UserRole.STAFF, models.UserRole.ADMIN]:
        # This case should ideally not be hit if authenticate_tag_user_or_student is correct
        # and users fetched by tag are always staff/admin.
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User role is not staff or admin.")
    if not authenticated_entity.is_active: # Double check, though crud.get_user_by_tag_id should handle this
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User is inactive.")
    return authenticated_entity

# Dependency for current admin via Tag ID
async def get_current_admin_via_tag(
    current_user: models.User = Depends(get_current_staff_or_admin_via_tag) # Leverages the staff_or_admin check
) -> models.User:
    if current_user.role != models.UserRole.ADMIN:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required.")
    return current_user


# Department Access Verification (this is a utility, not a dependency itself)
def verify_department_access( # Made sync as it's pure logic
    user_role: models.UserRole,
    user_department: Optional[models.ClearanceDepartment],
    target_department: models.ClearanceDepartment
) -> bool:
    if user_role == models.UserRole.ADMIN:
        return True
    if user_role == models.UserRole.STAFF:
        return user_department == target_department
    return False