|
from fastapi import FastAPI, Depends, HTTPException, status, Request, APIRouter |
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm |
|
import firebase_admin |
|
from firebase_admin import credentials, auth |
|
from sqlalchemy.ext.asyncio import AsyncSession |
|
from sqlalchemy import select |
|
from app.models.api.user import UserCreate, UserSignIn, PasswordReset, TokenVerify, UserResponse |
|
from app.models.database.DBUser import DBUser |
|
import datetime |
|
import os |
|
from app.core.database.session_manager import get_db_session as get_db |
|
from pydantic import BaseModel, EmailStr |
|
|
|
router = APIRouter(prefix="/FirebaseAuth", tags=["FirebaseAuth related APIs"]) |
|
|
|
|
|
try: |
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
service_account_paths = [ |
|
"/opt/MailPilot/MailPilot_ai_agents/app/serviceAccountKey/mailpoilt-firebase-adminsdk-fbsvc-26bb455f79.json", |
|
os.path.join(current_dir, "../serviceAccountKey/mailpoilt-firebase-adminsdk-fbsvc-26bb455f79.json"), |
|
os.path.join(current_dir, "../../serviceAccountKey/mailpoilt-firebase-adminsdk-fbsvc-26bb455f79.json") |
|
] |
|
|
|
cred = None |
|
for path in service_account_paths: |
|
if os.path.exists(path): |
|
cred = credentials.Certificate(path) |
|
break |
|
|
|
if cred is None: |
|
raise FileNotFoundError("Firebase service account key not found") |
|
|
|
if not firebase_admin._apps: |
|
firebase_admin.initialize_app(cred) |
|
|
|
except Exception as e: |
|
print(f"Firebase initialization error: {str(e)}") |
|
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/FirebaseAuth/signin") |
|
async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)): |
|
try: |
|
decoded_token = auth.verify_id_token(token) |
|
user_id = decoded_token["uid"] |
|
|
|
|
|
firebase_user = auth.get_user(user_id) |
|
|
|
result = await db.execute(select(DBUser).filter(DBUser.firebase_uid == user_id)) |
|
db_user = result.scalar_one_or_none() |
|
|
|
if db_user is None: |
|
raise HTTPException(status_code=404, detail="User not found in database") |
|
|
|
return UserResponse( |
|
firebase_uid=db_user.firebase_uid, |
|
email=db_user.email, |
|
display_name=db_user.display_name, |
|
is_active=db_user.is_active, |
|
created_at=db_user.created_at, |
|
last_login=db_user.last_login, |
|
provider=db_user.provider, |
|
email_verified=firebase_user.email_verified |
|
) |
|
except Exception as e: |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail=f"Invalid authentication credentials: {str(e)}", |
|
headers={"WWW-Authenticate": "Bearer"}, |
|
) |
|
|
|
@router.post("/signup", response_model=dict) |
|
async def create_user(user_data: UserCreate, db: AsyncSession = Depends(get_db)): |
|
"""Create a new user with email and password and store in database""" |
|
try: |
|
|
|
try: |
|
existing_user = auth.get_user_by_email(user_data.email) |
|
raise HTTPException( |
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
detail=f"User with email {user_data.email} already exists" |
|
) |
|
except auth.UserNotFoundError: |
|
|
|
pass |
|
|
|
|
|
firebase_user = auth.create_user( |
|
email=user_data.email, |
|
password=user_data.password, |
|
display_name=user_data.display_name, |
|
email_verified=False |
|
) |
|
|
|
|
|
action_code_settings = auth.ActionCodeSettings( |
|
url=f"https://mailpoilt.web.app/verify-email?email={user_data.email}", |
|
handle_code_in_app=True |
|
) |
|
verification_link = auth.generate_email_verification_link( |
|
user_data.email, |
|
action_code_settings |
|
) |
|
|
|
|
|
|
|
current_time = datetime.datetime.utcnow() |
|
|
|
db_user = DBUser( |
|
firebase_uid=firebase_user.uid, |
|
email=user_data.email, |
|
display_name=user_data.display_name, |
|
is_active=True, |
|
created_at=current_time, |
|
last_login=current_time, |
|
provider="email" |
|
) |
|
|
|
db.add(db_user) |
|
await db.commit() |
|
await db.refresh(db_user) |
|
|
|
return { |
|
"message": "User created successfully. Please check your email to verify your account.", |
|
"verification_link": verification_link, |
|
"user": { |
|
"firebase_uid": db_user.firebase_uid, |
|
"email": db_user.email, |
|
"display_name": db_user.display_name, |
|
"is_active": db_user.is_active, |
|
"created_at": db_user.created_at.isoformat() if db_user.created_at else None, |
|
"last_login": db_user.last_login.isoformat() if db_user.last_login else None, |
|
"provider": db_user.provider, |
|
"email_verified": firebase_user.email_verified |
|
} |
|
} |
|
except Exception as e: |
|
await db.rollback() |
|
try: |
|
if 'firebase_user' in locals(): |
|
auth.delete_user(firebase_user.uid) |
|
except: |
|
pass |
|
raise HTTPException( |
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
detail=f"Error creating user: {str(e)}" |
|
) |
|
|
|
@router.post("/signin", response_model=dict) |
|
async def signin_user(user_data: UserSignIn, db: AsyncSession = Depends(get_db)): |
|
"""Sign in a user with email and password""" |
|
try: |
|
try: |
|
firebase_user = auth.get_user_by_email(user_data.email) |
|
except auth.UserNotFoundError: |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail=f"No user found with email: {user_data.email}" |
|
) |
|
|
|
|
|
custom_token = auth.create_custom_token(firebase_user.uid) |
|
|
|
|
|
result = await db.execute(select(DBUser).filter(DBUser.firebase_uid == firebase_user.uid)) |
|
db_user = result.scalar_one_or_none() |
|
|
|
if not db_user: |
|
|
|
db_user = DBUser( |
|
firebase_uid=firebase_user.uid, |
|
email=firebase_user.email, |
|
display_name=firebase_user.display_name or user_data.email.split('@')[0], |
|
is_active=True, |
|
created_at=datetime.datetime.utcnow(), |
|
last_login=datetime.datetime.utcnow(), |
|
provider="email" |
|
) |
|
db.add(db_user) |
|
else: |
|
db_user.last_login = datetime.datetime.utcnow() |
|
|
|
await db.commit() |
|
await db.refresh(db_user) |
|
|
|
user_info = { |
|
"firebase_uid": db_user.firebase_uid, |
|
"email": db_user.email, |
|
"display_name": db_user.display_name, |
|
"is_active": db_user.is_active, |
|
"created_at": db_user.created_at.isoformat() if db_user.created_at else None, |
|
"last_login": db_user.last_login.isoformat() if db_user.last_login else None, |
|
"provider": db_user.provider, |
|
"email_verified": firebase_user.email_verified, |
|
"custom_token": custom_token.decode("utf-8") if isinstance(custom_token, bytes) else custom_token |
|
} |
|
|
|
return { |
|
"message": "Login successful", |
|
"user": user_info, |
|
"custom_token": custom_token.decode("utf-8") if isinstance(custom_token, bytes) else custom_token, |
|
"email_verified": firebase_user.email_verified |
|
} |
|
except Exception as e: |
|
if isinstance(e, HTTPException): |
|
raise e |
|
await db.rollback() |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail=f"Authentication failed: {str(e)}" |
|
) |
|
class EmailVerifyRequest(BaseModel): |
|
email: EmailStr |
|
|
|
@router.post("/resend-verification", status_code=status.HTTP_200_OK) |
|
async def resend_verification_email( |
|
email_data: EmailVerifyRequest = None, |
|
current_user: UserResponse = Depends(get_current_user) |
|
): |
|
""" |
|
Resend verification email to a user |
|
|
|
If user is logged in, uses their email. |
|
Otherwise, uses the email provided in the request body. |
|
""" |
|
try: |
|
|
|
|
|
email = email_data.email if email_data else current_user.email |
|
|
|
|
|
try: |
|
firebase_user = auth.get_user_by_email(email) |
|
except auth.UserNotFoundError: |
|
raise HTTPException( |
|
status_code=status.HTTP_404_NOT_FOUND, |
|
detail=f"No user found with email: {email}" |
|
) |
|
|
|
|
|
if firebase_user.email_verified: |
|
return {"message": "Email is already verified"} |
|
|
|
|
|
action_code_settings = auth.ActionCodeSettings( |
|
url=f"https://mailpoilt.web.app/verify-email?email={email}", |
|
handle_code_in_app=True |
|
) |
|
verification_link = auth.generate_email_verification_link( |
|
email, |
|
action_code_settings |
|
) |
|
|
|
return { |
|
"message": "Verification email sent successfully", |
|
"verification_link": verification_link |
|
} |
|
except Exception as e: |
|
if isinstance(e, HTTPException): |
|
raise e |
|
raise HTTPException( |
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
detail=f"Failed to resend verification email: {str(e)}" |
|
) |
|
|
|
|
|
email: EmailStr |
|
|
|
@router.post("/check-email-verified") |
|
async def check_email_verified(email_data: EmailVerifyRequest): |
|
"""Check if a user's email is verified""" |
|
try: |
|
|
|
try: |
|
firebase_user = auth.get_user_by_email(email_data.email) |
|
except auth.UserNotFoundError: |
|
raise HTTPException( |
|
status_code=status.HTTP_404_NOT_FOUND, |
|
detail=f"No user found with email: {email_data.email}" |
|
) |
|
|
|
return { |
|
"email": email_data.email, |
|
"email_verified": firebase_user.email_verified |
|
} |
|
except Exception as e: |
|
if isinstance(e, HTTPException): |
|
raise e |
|
raise HTTPException( |
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
detail=f"Failed to check email verification status: {str(e)}" |
|
) |
|
|
|
@router.post("/verify-token", response_model=UserResponse) |
|
async def verify_token(token_data: TokenVerify, db: AsyncSession = Depends(get_db)): |
|
"""Verify a Firebase ID token or UID and return user data""" |
|
try: |
|
|
|
try: |
|
decoded_token = auth.verify_id_token(token_data.token) |
|
user_id = decoded_token["uid"] |
|
except: |
|
|
|
user_id = token_data.token |
|
|
|
try: |
|
firebase_user = auth.get_user(user_id) |
|
except auth.UserNotFoundError: |
|
raise HTTPException( |
|
status_code=status.HTTP_404_NOT_FOUND, |
|
detail="User not found" |
|
) |
|
|
|
result = await db.execute(select(DBUser).filter(DBUser.firebase_uid == user_id)) |
|
db_user = result.scalar_one_or_none() |
|
|
|
if not db_user: |
|
|
|
db_user = DBUser( |
|
firebase_uid=user_id, |
|
email=firebase_user.email, |
|
display_name=firebase_user.display_name or firebase_user.email.split('@')[0], |
|
is_active=True, |
|
created_at=datetime.datetime.utcnow(), |
|
last_login=datetime.datetime.utcnow(), |
|
provider="firebase" |
|
) |
|
db.add(db_user) |
|
await db.commit() |
|
await db.refresh(db_user) |
|
else: |
|
|
|
db_user.last_login = datetime.datetime.utcnow() |
|
await db.commit() |
|
await db.refresh(db_user) |
|
|
|
return UserResponse( |
|
firebase_uid=db_user.firebase_uid, |
|
email=db_user.email, |
|
display_name=db_user.display_name, |
|
is_active=db_user.is_active, |
|
created_at=db_user.created_at, |
|
last_login=db_user.last_login, |
|
provider=db_user.provider, |
|
email_verified=firebase_user.email_verified |
|
) |
|
except Exception as e: |
|
if isinstance(e, HTTPException): |
|
raise e |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail=f"Token verification failed: {str(e)}" |
|
) |
|
@router.post("/token") |
|
async def get_token(form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db)): |
|
return await signin_user( |
|
UserSignIn(email=form_data.username, password=form_data.password), |
|
db |
|
) |
|
|