Spaces:
Runtime error
Runtime error
File size: 3,551 Bytes
71a3948 ec5cc84 71a3948 ec5cc84 71a3948 ec5cc84 71a3948 ec5cc84 71a3948 ec5cc84 71a3948 ec5cc84 71a3948 ec5cc84 71a3948 ec5cc84 71a3948 ec5cc84 71a3948 ec5cc84 71a3948 ec5cc84 71a3948 ec5cc84 71a3948 ec5cc84 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlmodel import Session, select
from typing import List, Optional
from typing import List, Optional
from datetime import datetime, timedelta
from src.config import settings
from src.database import get_session
from src.crud import users as user_crud
from src.crud import devices as device_crud
from src.models import User, Role, Device
# --- Configuration ---
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
api_key_header = APIKeyHeader(name="x-api-key", auto_error=True)
# --- JWT Token Functions ---
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
# --- Password Hashing ---
def verify_password(plain_password: str, hashed_password: str) -> bool:
return settings.PWD_CONTEXT.verify(plain_password, hashed_password)
def hash_password(password: str) -> str:
return settings.PWD_CONTEXT.hash(password)
# --- User Authentication ---
def authenticate_user(db: Session, username: str, password: str):
"""Authenticate user by username and password."""
user = user_crud.get_user_by_username(db, username=username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# --- Dependency for API Key Authentication ---
def get_api_key(
key: str = Depends(api_key_header), db: Session = Depends(get_session)
) -> Device:
"""
Dependency to validate the API key from the x-api-key header.
Ensures the device is registered in the database.
"""
db_device = device_crud.get_device_by_api_key(db, api_key=key)
if not db_device:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing API Key",
)
return db_device
# --- Dependency for User Authentication and Authorization ---
def get_current_active_user(required_roles: List[Role] = None):
def dependency(
token: str = Depends(oauth2_scheme), db: Session = Depends(get_session)
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = user_crud.get_user_by_username(db, username=username)
if user is None:
raise credentials_exception
# Check for roles if required
if required_roles:
is_authorized = any(role == user.role for role in required_roles)
if not is_authorized:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user does not have adequate privileges",
)
return user
return dependency
|