Spaces:
Sleeping
Sleeping
File size: 3,745 Bytes
faff547 6600e8c 6d8700d 6600e8c 6d8700d faff547 6d8700d 6600e8c faff547 6600e8c 6d8700d 6600e8c c7da95d 6600e8c 6d8700d 6600e8c 6d8700d 6600e8c 6d8700d 6600e8c 6d8700d 6600e8c faff547 6600e8c faff547 6600e8c faff547 6d8700d 6600e8c 6d8700d 6600e8c 6d8700d 6600e8c 6d8700d 6600e8c 6d8700d 6600e8c 6d8700d 6600e8c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
from fastapi import FastAPI, Depends, HTTPException, status, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from firebase_admin import auth, credentials, firestore
import firebase_admin
import os
from dotenv import load_dotenv
# Charger les variables d'environnement depuis le fichier .env
load_dotenv()
# Récupérer les variables d'environnement pour initialiser Firebase
firebase_credentials = os.getenv("FIREBASE_CREDENTIALS")
# Vérification de la présence de la clé
if not firebase_credentials:
raise ValueError("La variable d'environnement FIREBASE_CREDENTIALS n'est pas définie.")
# Charger les informations de la clé Firebase
import json
try:
firebase_credentials_dict = json.loads(firebase_credentials)
if not isinstance(firebase_credentials_dict, dict):
raise ValueError("FIREBASE_CREDENTIALS n'est pas un JSON valide.")
except json.JSONDecodeError as e:
raise ValueError("FIREBASE_CREDENTIALS n'est pas un JSON valide.") from e
# Initialisation Firebase Admin
cred = credentials.Certificate(firebase_credentials_dict)
firebase_admin.initialize_app(cred)
db = firestore.client()
app = FastAPI()
# Configuration CORS
allowed_origins = [
"*"
]
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["POST", "GET"],
allow_headers=["*"]
)
# Fonction pour vérifier le token Firebase et récupérer le rôle depuis Firestore
def get_user(res: Response,
cred: HTTPAuthorizationCredentials = Depends(HTTPBearer(auto_error=False))):
if cred is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Bearer authentication required",
headers={'WWW-Authenticate': 'Bearer realm="auth_required"'},
)
try:
# Vérification et décodage du token Firebase
decoded_token = auth.verify_id_token(cred.credentials)
user_id = decoded_token['uid']
# Récupération du rôle de l'utilisateur depuis Firestore
user_doc = db.collection('users').document(user_id).get()
if not user_doc.exists:
raise HTTPException(status_code=401, detail="Utilisateur non trouvé dans Firestore")
# Extraction du rôle et ajout aux informations utilisateur
user_data = user_doc.to_dict()
user_role = user_data.get('role', 'user_extern') # Par défaut à 'user_extern' si le rôle n'existe pas
decoded_token['role'] = user_role
res.headers['WWW-Authenticate'] = 'Bearer realm="auth_required"'
return decoded_token
except Exception as err:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid authentication credentials. {err}",
headers={'WWW-Authenticate': 'Bearer error="invalid_token"'},
)
# Fonction de dépendance pour vérifier le rôle
def require_role(allowed_roles):
def role_checker(user_info=Depends(get_user)):
if user_info['role'] not in allowed_roles:
raise HTTPException(status_code=403, detail="Accès non autorisé")
return user_info
return role_checker
# Route publique
@app.get("/")
async def root():
return {"message": "This is the root."}
@app.get("/api/protected/user")
async def protected_user_route(user_info=Depends(get_user)):
return {"message": "Protected user route accessed successfully", "user_info": user_info}
@app.get("/api/protected/admin")
async def protected_admin_route(user_info=Depends(require_role(["admin"]))):
return {"message": "Protected admin route accessed successfully", "user_info": user_info}
|