Yadav122's picture
add firebase
c5c9b05
import os
import datetime
import requests
from app import router
from fastapi import FastAPI, Depends, HTTPException, status, Request, APIRouter
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import firebase_admin
from firebase_admin import credentials, auth
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.api.user import UserCreate, UserSignIn, PasswordReset, TokenVerify, UserResponse, EmailVerifyRequest
from app.models.database.DBUser import DBUser
from app.core.database.session_manager import get_db_session as get_db
router = APIRouter(prefix="/FirebaseAuth", tags=["Firebase Auth"])
# Initialize OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/FirebaseAuth/token")
async def verify_firebase_token(token: str = Depends(oauth2_scheme)) -> dict:
"""Verify Firebase token and return user info"""
try:
# Verify the token
decoded_token = auth.verify_id_token(token)
user_id = decoded_token["uid"]
# Get user from Firebase
firebase_user = auth.get_user(user_id)
return {
"firebase_uid": user_id,
"email": firebase_user.email,
"email_verified": firebase_user.email_verified,
"display_name": firebase_user.display_name
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token",
headers={"WWW-Authenticate": "Bearer"},
)
# Initialize Firebase Admin SDK with better error handling
try:
current_dir = os.path.dirname(os.path.abspath(__file__))
# Try multiple possible paths for the service account file
service_account_paths = [
"/opt/MailPilot/MailPilot_ai_agents/app/serviceAccountKey/mailpoilt-firebase-adminsdk-fbsvc-26bb455f79.json",
os.path.join(current_dir, "../serviceAccountKey/mailpoilt-firebase-adminsdk-fbsvc-26bb455f79.json"),
os.path.join(current_dir, "../../serviceAccountKey/mailpoilt-firebase-adminsdk-fbsvc-26bb455f79.json")
]
cred = None
for path in service_account_paths:
if os.path.exists(path):
cred = credentials.Certificate(path)
break
if cred is None:
raise FileNotFoundError("Firebase service account key not found")
if not firebase_admin._apps:
firebase_admin.initialize_app(cred)
except Exception as e:
print(f"Firebase initialization error: {str(e)}")
# Continue without crashing, but auth functions will fail
@router.post("/signup", response_model=dict)
async def create_user(user_data: UserCreate, db: AsyncSession = Depends(get_db)):
"""Create a new user with email and password and store in database"""
try:
# Check if user already exists
try:
existing_user = auth.get_user_by_email(user_data.email)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"User with email {user_data.email} already exists"
)
except auth.UserNotFoundError:
# This is what we want - user doesn't exist yet
pass
# Create Firebase user
firebase_user = auth.create_user(
email=user_data.email,
password=user_data.password,
display_name=user_data.display_name,
email_verified=False # Explicitly set to false
)
# Generate email verification link
action_code_settings = auth.ActionCodeSettings(
url=f"https://mailpoilt.web.app/verify-email?email={user_data.email}",
handle_code_in_app=True
)
verification_link = auth.generate_email_verification_link(
user_data.email,
action_code_settings
)
# Firebase will handle sending the verification email automatically
current_time = datetime.datetime.utcnow()
db_user = DBUser(
firebase_uid=firebase_user.uid,
email=user_data.email,
display_name=user_data.display_name,
is_active=True,
created_at=current_time,
last_login=current_time,
provider="email"
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return {
"message": "User created successfully. Please check your email to verify your account.",
"verification_link": verification_link, # In production, you might not return this
"user": {
"firebase_uid": db_user.firebase_uid,
"email": db_user.email,
"display_name": db_user.display_name,
"is_active": db_user.is_active,
"created_at": db_user.created_at.isoformat() if db_user.created_at else None,
"last_login": db_user.last_login.isoformat() if db_user.last_login else None,
"provider": db_user.provider,
"email_verified": firebase_user.email_verified
}
}
except Exception as e:
await db.rollback()
try:
if 'firebase_user' in locals():
auth.delete_user(firebase_user.uid)
except:
pass
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Error creating user: {str(e)}"
)
@router.post("/signin", response_model=dict)
async def signin_user(user_data: UserSignIn, db: AsyncSession = Depends(get_db)):
"""Sign in user and return token"""
try:
# Get Firebase API key from environment variable
api_key = os.getenv("FIREBASE_API_KEY")
if not api_key:
raise ValueError("FIREBASE_API_KEY environment variable is not set")
# Call Firebase Auth REST API to sign in with email/password
response = requests.post(
f"https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key={api_key}",
json={
"email": user_data.email,
"password": user_data.password,
"returnSecureToken": True
}
)
if response.status_code != 200:
error_data = response.json()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Authentication failed: {error_data.get('error', {}).get('message', 'Invalid credentials')}"
)
auth_data = response.json()
id_token = auth_data["idToken"]
# Verify the ID token
decoded_token = auth.verify_id_token(id_token)
user_id = decoded_token["uid"]
# Get user from Firebase
firebase_user = auth.get_user(user_id)
# Update last login in database
result = await db.execute(
select(DBUser).filter(DBUser.firebase_uid == user_id)
)
db_user = result.scalar_one_or_none()
if db_user:
db_user.last_login = datetime.datetime.utcnow()
await db.commit()
return {
"access_token": id_token,
"token_type": "bearer",
"user": {
"firebase_uid": firebase_user.uid,
"email": firebase_user.email,
"email_verified": firebase_user.email_verified,
"display_name": firebase_user.display_name
}
}
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Authentication failed: {str(e)}"
)
# Example of a protected endpoint
@router.get("/me", response_model=dict)
async def get_current_user(
current_user: dict = Depends(verify_firebase_token),
db: AsyncSession = Depends(get_db)
):
"""Get current user information"""
try:
result = await db.execute(
select(DBUser).filter(DBUser.firebase_uid == current_user["firebase_uid"])
)
db_user = result.scalar_one_or_none()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found in database"
)
return {
"message": "User authenticated",
"user": {
"firebase_uid": db_user.firebase_uid,
"email": db_user.email,
"display_name": db_user.display_name,
"is_active": db_user.is_active,
"email_verified": current_user["email_verified"]
}
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.post("/resend-verification", status_code=status.HTTP_200_OK)
async def resend_verification_email(
email_data: EmailVerifyRequest = None,
current_user: dict = Depends(verify_firebase_token)
):
"""
Resend verification email to a user
If user is logged in, uses their email.
Otherwise, uses the email provided in the request body.
"""
try:
# If email is provided in request body, use that
# Otherwise use logged in user's email
email = email_data.email if email_data and email_data.email else current_user["email"]
# Check if user exists
try:
firebase_user = auth.get_user_by_email(email)
except auth.UserNotFoundError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No user found with email: {email}"
)
# Check if email is already verified
if firebase_user.email_verified:
return {"message": "Email is already verified"}
# Generate a new verification link
action_code_settings = auth.ActionCodeSettings(
url=f"https://mailpoilt.web.app/verify-email?email={email}",
handle_code_in_app=True
)
verification_link = auth.generate_email_verification_link(
email,
action_code_settings
)
return {
"message": "Verification email sent successfully",
"verification_link": verification_link
}
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Failed to resend verification email: {str(e)}"
)
@router.post("/check-email-verified")
async def check_email_verified(email_data: EmailVerifyRequest):
"""Check if a user's email is verified"""
try:
# Check if user exists
try:
firebase_user = auth.get_user_by_email(email_data.email)
except auth.UserNotFoundError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No user found with email: {email_data.email}"
)
return {
"email": email_data.email,
"email_verified": firebase_user.email_verified
}
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Failed to check email verification status: {str(e)}"
)
@router.post("/verify-token", response_model=UserResponse)
async def verify_token(token_data: TokenVerify, db: AsyncSession = Depends(get_db)):
"""Verify a Firebase ID token or UID and return user data"""
try:
# First try to verify as an ID token
try:
decoded_token = auth.verify_id_token(token_data.token)
user_id = decoded_token["uid"]
except:
# If that fails, treat it as a UID
user_id = token_data.token
try:
firebase_user = auth.get_user(user_id)
except auth.UserNotFoundError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
result = await db.execute(select(DBUser).filter(DBUser.firebase_uid == user_id))
db_user = result.scalar_one_or_none()
if not db_user:
# Create DB user if it doesn't exist
db_user = DBUser(
firebase_uid=user_id,
email=firebase_user.email,
display_name=firebase_user.display_name or firebase_user.email.split('@')[0],
is_active=True,
created_at=datetime.datetime.utcnow(),
last_login=datetime.datetime.utcnow(),
provider="firebase"
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
else:
# Update last_login time
db_user.last_login = datetime.datetime.utcnow()
await db.commit()
await db.refresh(db_user)
return UserResponse(
firebase_uid=db_user.firebase_uid,
email=db_user.email,
display_name=db_user.display_name,
is_active=db_user.is_active,
created_at=db_user.created_at,
last_login=db_user.last_login,
provider=db_user.provider,
email_verified=firebase_user.email_verified
)
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token verification failed: {str(e)}"
)
@router.post("/token")
async def get_token(form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db)):
"""Get access token using OAuth2 form"""
try:
# Get Firebase API key from environment variable
api_key = os.getenv("FIREBASE_API_KEY")
if not api_key:
raise ValueError("FIREBASE_API_KEY environment variable is not set")
# Call Firebase Auth REST API to sign in with email/password
response = requests.post(
f"https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key={api_key}",
json={
"email": form_data.username, # Username field contains email
"password": form_data.password,
"returnSecureToken": True
}
)
if response.status_code != 200:
error_data = response.json()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Authentication failed: {error_data.get('error', {}).get('message', 'Invalid credentials')}"
)
auth_data = response.json()
id_token = auth_data["idToken"]
# Verify the ID token
decoded_token = auth.verify_id_token(id_token)
user_id = decoded_token["uid"]
# Update last login in database
result = await db.execute(
select(DBUser).filter(DBUser.firebase_uid == user_id)
)
db_user = result.scalar_one_or_none()
if db_user:
db_user.last_login = datetime.datetime.utcnow()
await db.commit()
return {
"access_token": id_token,
"token_type": "bearer"
}
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Authentication failed: {str(e)}"
)
# Function to verify token
async def verify_token(token: str = Depends(oauth2_scheme)):
try:
# Verify the token
decoded_token = auth.verify_id_token(token)
return decoded_token
except:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
# Example protected route
@router.get("/protected")
async def protected_route(token: dict = Depends(verify_token)):
return {"message": "You have access!", "token_info": token}