File size: 3,551 Bytes
71a3948
ec5cc84
71a3948
 
ec5cc84
 
 
71a3948
 
ec5cc84
 
 
 
 
71a3948
ec5cc84
 
 
71a3948
ec5cc84
 
71a3948
 
ec5cc84
71a3948
ec5cc84
71a3948
 
 
 
ec5cc84
 
 
71a3948
ec5cc84
 
 
 
 
 
 
 
 
 
 
71a3948
 
ec5cc84
 
 
 
 
71a3948
ec5cc84
 
71a3948
ec5cc84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71a3948
ec5cc84
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlmodel import Session, select
from typing import List, Optional
from typing import List, Optional
from datetime import datetime, timedelta


from src.config import settings
from src.database import get_session
from src.crud import users as user_crud
from src.crud import devices as device_crud
from src.models import User, Role, Device

# --- Configuration ---
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
api_key_header = APIKeyHeader(name="x-api-key", auto_error=True)

# --- JWT Token Functions ---
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return encoded_jwt

# --- Password Hashing ---
def verify_password(plain_password: str, hashed_password: str) -> bool:
    return settings.PWD_CONTEXT.verify(plain_password, hashed_password)

def hash_password(password: str) -> str:
    return settings.PWD_CONTEXT.hash(password)

# --- User Authentication ---
def authenticate_user(db: Session, username: str, password: str):
    """Authenticate user by username and password."""
    user = user_crud.get_user_by_username(db, username=username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

# --- Dependency for API Key Authentication ---

def get_api_key(
    key: str = Depends(api_key_header), db: Session = Depends(get_session)
) -> Device:
    """
    Dependency to validate the API key from the x-api-key header.
    Ensures the device is registered in the database.
    """
    db_device = device_crud.get_device_by_api_key(db, api_key=key)
    if not db_device:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or missing API Key",
        )
    return db_device


# --- Dependency for User Authentication and Authorization ---
def get_current_active_user(required_roles: List[Role] = None):
    def dependency(
        token: str = Depends(oauth2_scheme), db: Session = Depends(get_session)
    ) -> User:
        credentials_exception = HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
        try:
            payload = jwt.decode(
                token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
            )
            username: str = payload.get("sub")
            if username is None:
                raise credentials_exception
        except JWTError:
            raise credentials_exception

        user = user_crud.get_user_by_username(db, username=username)
        if user is None:
            raise credentials_exception

        # Check for roles if required
        if required_roles:
            is_authorized = any(role == user.role for role in required_roles)
            if not is_authorized:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail="The user does not have adequate privileges",
                )
        return user

    return dependency