Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, Depends, HTTPException, status, Response | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from firebase_admin import auth, credentials, firestore | |
import firebase_admin | |
import os | |
from dotenv import load_dotenv | |
# Charger les variables d'environnement depuis le fichier .env | |
load_dotenv() | |
# Récupérer les variables d'environnement pour initialiser Firebase | |
firebase_credentials = os.getenv("FIREBASE_CREDENTIALS") | |
# Vérification de la présence de la clé | |
if not firebase_credentials: | |
raise ValueError("La variable d'environnement FIREBASE_CREDENTIALS n'est pas définie.") | |
# Charger les informations de la clé Firebase | |
import json | |
try: | |
firebase_credentials_dict = json.loads(firebase_credentials) | |
if not isinstance(firebase_credentials_dict, dict): | |
raise ValueError("FIREBASE_CREDENTIALS n'est pas un JSON valide.") | |
except json.JSONDecodeError as e: | |
raise ValueError("FIREBASE_CREDENTIALS n'est pas un JSON valide.") from e | |
# Initialisation Firebase Admin | |
cred = credentials.Certificate(firebase_credentials_dict) | |
firebase_admin.initialize_app(cred) | |
db = firestore.client() | |
app = FastAPI() | |
# Configuration CORS | |
allowed_origins = [ | |
"*" | |
] | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=allowed_origins, | |
allow_credentials=True, | |
allow_methods=["POST", "GET"], | |
allow_headers=["*"] | |
) | |
# Fonction pour vérifier le token Firebase et récupérer le rôle depuis Firestore | |
def get_user(res: Response, | |
cred: HTTPAuthorizationCredentials = Depends(HTTPBearer(auto_error=False))): | |
if cred is None: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Bearer authentication required", | |
headers={'WWW-Authenticate': 'Bearer realm="auth_required"'}, | |
) | |
try: | |
# Vérification et décodage du token Firebase | |
decoded_token = auth.verify_id_token(cred.credentials) | |
user_id = decoded_token['uid'] | |
# Récupération du rôle de l'utilisateur depuis Firestore | |
user_doc = db.collection('users').document(user_id).get() | |
if not user_doc.exists: | |
raise HTTPException(status_code=401, detail="Utilisateur non trouvé dans Firestore") | |
# Extraction du rôle et ajout aux informations utilisateur | |
user_data = user_doc.to_dict() | |
user_role = user_data.get('role', 'user_extern') # Par défaut à 'user_extern' si le rôle n'existe pas | |
decoded_token['role'] = user_role | |
res.headers['WWW-Authenticate'] = 'Bearer realm="auth_required"' | |
return decoded_token | |
except Exception as err: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail=f"Invalid authentication credentials. {err}", | |
headers={'WWW-Authenticate': 'Bearer error="invalid_token"'}, | |
) | |
# Fonction de dépendance pour vérifier le rôle | |
def require_role(allowed_roles): | |
def role_checker(user_info=Depends(get_user)): | |
if user_info['role'] not in allowed_roles: | |
raise HTTPException(status_code=403, detail="Accès non autorisé") | |
return user_info | |
return role_checker | |
# Route publique | |
async def root(): | |
return {"message": "This is the root."} | |
async def protected_user_route(user_info=Depends(get_user)): | |
return {"message": "Protected user route accessed successfully", "user_info": user_info} | |
async def protected_admin_route(user_info=Depends(require_role(["admin"]))): | |
return {"message": "Protected admin route accessed successfully", "user_info": user_info} | |