File size: 3,745 Bytes
faff547
6600e8c
6d8700d
6600e8c
 
6d8700d
faff547
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6d8700d
6600e8c
faff547
6600e8c
 
6d8700d
 
 
6600e8c
 
c7da95d
6600e8c
6d8700d
 
6600e8c
6d8700d
6600e8c
 
6d8700d
 
6600e8c
 
 
 
 
 
 
 
 
6d8700d
6600e8c
 
 
faff547
6600e8c
 
 
 
faff547
6600e8c
 
 
 
 
faff547
6d8700d
6600e8c
6d8700d
6600e8c
 
 
6d8700d
 
6600e8c
 
 
 
 
 
 
6d8700d
6600e8c
 
 
 
6d8700d
6600e8c
 
 
6d8700d
 
6600e8c
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

from fastapi import FastAPI, Depends, HTTPException, status, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from firebase_admin import auth, credentials, firestore
import firebase_admin
import os
from dotenv import load_dotenv

# Charger les variables d'environnement depuis le fichier .env
load_dotenv()

# Récupérer les variables d'environnement pour initialiser Firebase
firebase_credentials = os.getenv("FIREBASE_CREDENTIALS")

# Vérification de la présence de la clé
if not firebase_credentials:
    raise ValueError("La variable d'environnement FIREBASE_CREDENTIALS n'est pas définie.")

# Charger les informations de la clé Firebase
import json
try:
    firebase_credentials_dict = json.loads(firebase_credentials)
    if not isinstance(firebase_credentials_dict, dict):
        raise ValueError("FIREBASE_CREDENTIALS n'est pas un JSON valide.")
except json.JSONDecodeError as e:
    raise ValueError("FIREBASE_CREDENTIALS n'est pas un JSON valide.") from e

# Initialisation Firebase Admin
cred = credentials.Certificate(firebase_credentials_dict)
firebase_admin.initialize_app(cred)
db = firestore.client()

app = FastAPI()

# Configuration CORS
allowed_origins = [
    "*"
]
app.add_middleware(
    CORSMiddleware,
    allow_origins=allowed_origins,
    allow_credentials=True,
    allow_methods=["POST", "GET"],
    allow_headers=["*"]
)

# Fonction pour vérifier le token Firebase et récupérer le rôle depuis Firestore
def get_user(res: Response,
             cred: HTTPAuthorizationCredentials = Depends(HTTPBearer(auto_error=False))):
    if cred is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Bearer authentication required",
            headers={'WWW-Authenticate': 'Bearer realm="auth_required"'},
        )
    try:
        # Vérification et décodage du token Firebase
        decoded_token = auth.verify_id_token(cred.credentials)
        user_id = decoded_token['uid']

        # Récupération du rôle de l'utilisateur depuis Firestore
        user_doc = db.collection('users').document(user_id).get()
        if not user_doc.exists:
            raise HTTPException(status_code=401, detail="Utilisateur non trouvé dans Firestore")

        # Extraction du rôle et ajout aux informations utilisateur
        user_data = user_doc.to_dict()
        user_role = user_data.get('role', 'user_extern')  # Par défaut à 'user_extern' si le rôle n'existe pas
        decoded_token['role'] = user_role
        res.headers['WWW-Authenticate'] = 'Bearer realm="auth_required"'

        return decoded_token
    except Exception as err:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Invalid authentication credentials. {err}",
            headers={'WWW-Authenticate': 'Bearer error="invalid_token"'},
        )

# Fonction de dépendance pour vérifier le rôle
def require_role(allowed_roles):
    def role_checker(user_info=Depends(get_user)):
        if user_info['role'] not in allowed_roles:
            raise HTTPException(status_code=403, detail="Accès non autorisé")
        return user_info
    return role_checker

# Route publique
@app.get("/")
async def root():
    return {"message": "This is the root."}

@app.get("/api/protected/user")
async def protected_user_route(user_info=Depends(get_user)):
    return {"message": "Protected user route accessed successfully", "user_info": user_info}

@app.get("/api/protected/admin")
async def protected_admin_route(user_info=Depends(require_role(["admin"]))):
    return {"message": "Protected admin route accessed successfully", "user_info": user_info}