Yadav122's picture
fix docker
abe9dd4
raw
history blame
14.2 kB
import datetime
from app import router
from fastapi import FastAPI, Depends, HTTPException, status, Request, APIRouter
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import firebase_admin
from firebase_admin import credentials, auth
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.api.user import UserCreate, UserSignIn, PasswordReset, TokenVerify, UserResponse,EmailVerifyRequest
from app.models.database.DBUser import DBUser
from app.core.database.session_manager import get_db_session as get_db
import os
router=APIRouter(prefix="/FirebaseAuth", tags=["Firebase Auth"])
# Initialize OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/FirebaseAuth/token")
async def verify_firebase_token(token: str = Depends(oauth2_scheme)) -> dict:
"""Verify Firebase token and return user info"""
try:
# Verify the token
decoded_token = auth.verify_id_token(token)
user_id = decoded_token["uid"]
# Get user from Firebase
firebase_user = auth.get_user(user_id)
return {
"firebase_uid": user_id,
"email": firebase_user.email,
"email_verified": firebase_user.email_verified,
"display_name": firebase_user.display_name
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token",
headers={"WWW-Authenticate": "Bearer"},
)
# Initialize Firebase Admin SDK with better error handling
try:
current_dir = os.path.dirname(os.path.abspath(__file__))
# Try multiple possible paths for the service account file
service_account_paths = [
"/opt/MailPilot/MailPilot_ai_agents/app/serviceAccountKey/mailpoilt-firebase-adminsdk-fbsvc-26bb455f79.json",
os.path.join(current_dir, "../serviceAccountKey/mailpoilt-firebase-adminsdk-fbsvc-26bb455f79.json"),
os.path.join(current_dir, "../../serviceAccountKey/mailpoilt-firebase-adminsdk-fbsvc-26bb455f79.json")
]
cred = None
for path in service_account_paths:
if os.path.exists(path):
cred = credentials.Certificate(path)
break
if cred is None:
raise FileNotFoundError("Firebase service account key not found")
if not firebase_admin._apps:
firebase_admin.initialize_app(cred)
except Exception as e:
print(f"Firebase initialization error: {str(e)}")
# Continue without crashing, but auth functions will fail
@router.post("/signup", response_model=dict)
async def create_user(user_data: UserCreate, db: AsyncSession = Depends(get_db)):
"""Create a new user with email and password and store in database"""
try:
# Check if user already exists
try:
existing_user = auth.get_user_by_email(user_data.email)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"User with email {user_data.email} already exists"
)
except auth.UserNotFoundError:
# This is what we want - user doesn't exist yet
pass
# Create Firebase user
firebase_user = auth.create_user(
email=user_data.email,
password=user_data.password,
display_name=user_data.display_name,
email_verified=False # Explicitly set to false
)
# Generate email verification link
action_code_settings = auth.ActionCodeSettings(
url=f"https://mailpoilt.web.app/verify-email?email={user_data.email}",
handle_code_in_app=True
)
verification_link = auth.generate_email_verification_link(
user_data.email,
action_code_settings
)
# Firebase will handle sending the verification email automatically
current_time = datetime.datetime.utcnow()
db_user = DBUser(
firebase_uid=firebase_user.uid,
email=user_data.email,
display_name=user_data.display_name,
is_active=True,
created_at=current_time,
last_login=current_time,
provider="email"
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return {
"message": "User created successfully. Please check your email to verify your account.",
"verification_link": verification_link, # In production, you might not return this
"user": {
"firebase_uid": db_user.firebase_uid,
"email": db_user.email,
"display_name": db_user.display_name,
"is_active": db_user.is_active,
"created_at": db_user.created_at.isoformat() if db_user.created_at else None,
"last_login": db_user.last_login.isoformat() if db_user.last_login else None,
"provider": db_user.provider,
"email_verified": firebase_user.email_verified
}
}
except Exception as e:
await db.rollback()
try:
if 'firebase_user' in locals():
auth.delete_user(firebase_user.uid)
except:
pass
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Error creating user: {str(e)}"
)
@router.post("/signin", response_model=dict)
async def signin_user(user_data: UserSignIn, db: AsyncSession = Depends(get_db)):
"""Sign in user and return token"""
try:
# Verify user exists in Firebase
firebase_user = auth.get_user_by_email(user_data.email)
# Generate custom token
custom_token = auth.create_custom_token(firebase_user.uid)
# Update last login in database
result = await db.execute(
select(DBUser).filter(DBUser.firebase_uid == firebase_user.uid)
)
db_user = result.scalar_one_or_none()
if db_user:
db_user.last_login = datetime.datetime.utcnow()
await db.commit()
return {
"access_token": custom_token.decode() if isinstance(custom_token, bytes) else custom_token,
"token_type": "bearer",
"user": {
"firebase_uid": firebase_user.uid,
"email": firebase_user.email,
"email_verified": firebase_user.email_verified,
"display_name": firebase_user.display_name
}
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Authentication failed: {str(e)}"
)
# Example of a protected endpoint
@router.get("/me", response_model=dict)
async def get_current_user(
current_user: dict = Depends(verify_firebase_token),
db: AsyncSession = Depends(get_db)
):
"""Get current user information"""
try:
result = await db.execute(
select(DBUser).filter(DBUser.firebase_uid == current_user["firebase_uid"])
)
db_user = result.scalar_one_or_none()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found in database"
)
return {
"message": "User authenticated",
"user": {
"firebase_uid": db_user.firebase_uid,
"email": db_user.email,
"display_name": db_user.display_name,
"is_active": db_user.is_active,
"email_verified": current_user["email_verified"]
}
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.post("/resend-verification", status_code=status.HTTP_200_OK)
async def resend_verification_email(
email_data: EmailVerifyRequest = None,
current_user: UserResponse = Depends(get_current_user)
):
"""
Resend verification email to a user
If user is logged in, uses their email.
Otherwise, uses the email provided in the request body.
"""
try:
# If email is provided in request body, use that
# Otherwise use logged in user's email
email = email_data.email if email_data else current_user.email
# Check if user exists
try:
firebase_user = auth.get_user_by_email(email)
except auth.UserNotFoundError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No user found with email: {email}"
)
# Check if email is already verified
if firebase_user.email_verified:
return {"message": "Email is already verified"}
# Generate a new verification link
action_code_settings = auth.ActionCodeSettings(
url=f"https://mailpoilt.web.app/verify-email?email={email}",
handle_code_in_app=True
)
verification_link = auth.generate_email_verification_link(
email,
action_code_settings
)
return {
"message": "Verification email sent successfully",
"verification_link": verification_link
}
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Failed to resend verification email: {str(e)}"
)
email: EmailStr
@router.post("/check-email-verified")
async def check_email_verified(email_data: EmailVerifyRequest):
"""Check if a user's email is verified"""
try:
# Check if user exists
try:
firebase_user = auth.get_user_by_email(email_data.email)
except auth.UserNotFoundError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No user found with email: {email_data.email}"
)
return {
"email": email_data.email,
"email_verified": firebase_user.email_verified
}
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Failed to check email verification status: {str(e)}"
)
@router.post("/verify-token", response_model=UserResponse)
async def verify_token(token_data: TokenVerify, db: AsyncSession = Depends(get_db)):
"""Verify a Firebase ID token or UID and return user data"""
try:
# First try to verify as an ID token
try:
decoded_token = auth.verify_id_token(token_data.token)
user_id = decoded_token["uid"]
except:
# If that fails, treat it as a UID
user_id = token_data.token
try:
firebase_user = auth.get_user(user_id)
except auth.UserNotFoundError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
result = await db.execute(select(DBUser).filter(DBUser.firebase_uid == user_id))
db_user = result.scalar_one_or_none()
if not db_user:
# Create DB user if it doesn't exist
db_user = DBUser(
firebase_uid=user_id,
email=firebase_user.email,
display_name=firebase_user.display_name or firebase_user.email.split('@')[0],
is_active=True,
created_at=datetime.datetime.utcnow(),
last_login=datetime.datetime.utcnow(),
provider="firebase"
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
else:
# Update last_login time
db_user.last_login = datetime.datetime.utcnow()
await db.commit()
await db.refresh(db_user)
return UserResponse(
firebase_uid=db_user.firebase_uid,
email=db_user.email,
display_name=db_user.display_name,
is_active=db_user.is_active,
created_at=db_user.created_at,
last_login=db_user.last_login,
provider=db_user.provider,
email_verified=firebase_user.email_verified
)
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token verification failed: {str(e)}"
)
@router.post("/token")
async def get_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""Get access token"""
try:
# Get Firebase user
firebase_user = auth.get_user_by_email(form_data.username)
# Create custom token
custom_token = auth.create_custom_token(firebase_user.uid)
return {
"access_token": custom_token.decode() if isinstance(custom_token, bytes) else custom_token,
"token_type": "bearer"
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
# Function to verify token
async def verify_token(token: str = Depends(oauth2_scheme)):
try:
# Verify the token
decoded_token = auth.verify_id_token(token)
return decoded_token
except:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
# Example protected route
@router.get("/protected")
async def protected_route(token: str = Depends(verify_token)):
return {"message": "You have access!", "token_info": token}