connect / routes /notifications.py
habulaj's picture
Update routes/notifications.py
6114a4a verified
import os
import aiohttp
from fastapi import APIRouter, HTTPException, Header
from pydantic import BaseModel
from google.oauth2 import service_account
from google.auth.transport.requests import Request
from datetime import datetime
from typing import Optional
router = APIRouter()
# Supabase Config
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co"
SUPABASE_KEY = os.getenv("SUPA_KEY")
SUPABASE_SERVICE_KEY = os.getenv("SUPA_SERVICE_KEY")
if not SUPABASE_KEY or not SUPABASE_SERVICE_KEY:
raise ValueError("❌ SUPA_KEY or SUPA_SERVICE_KEY not set in environment!")
SUPABASE_HEADERS = {
"apikey": SUPABASE_SERVICE_KEY,
"Authorization": f"Bearer {SUPABASE_SERVICE_KEY}",
"Content-Type": "application/json"
}
# Firebase config
SERVICE_ACCOUNT_FILE = './closetcoach-2d50b-firebase-adminsdk-fbsvc-7fcccbacb1.json'
FCM_PROJECT_ID = "closetcoach-2d50b"
class SimpleNotification(BaseModel):
target: str # email or "all"
title: str
content: str
image_url: str = ""
def get_fcm_access_token():
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE
)
scoped = credentials.with_scopes(
['https://www.googleapis.com/auth/firebase.messaging']
)
scoped.refresh(Request())
return scoped.token
async def get_user_id_from_token(user_token: str) -> str:
url = f"{SUPABASE_URL}/auth/v1/user"
headers = {
"Authorization": f"Bearer {user_token}",
"apikey": SUPABASE_KEY,
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
if resp.status != 200:
raise HTTPException(status_code=401, detail="Invalid or expired token")
data = await resp.json()
return data.get("id")
async def get_user_by_email(email: str):
url = f"{SUPABASE_URL}/rest/v1/User?email=eq.{email}&select=id,token_fcm"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=SUPABASE_HEADERS) as resp:
if resp.status != 200:
raise HTTPException(status_code=500, detail="Failed to fetch target user from database")
data = await resp.json()
return data[0] if data else None
async def get_user_by_id(user_id: str):
url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}&select=id,manage_notifications"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=SUPABASE_HEADERS) as resp:
if resp.status != 200:
raise HTTPException(status_code=500, detail="Failed to fetch sender user from database")
data = await resp.json()
return data[0] if data else None
async def log_notification(send_by: str, title: str, content: str, target_id: Optional[str], image_url: str):
payload = {
"send_by": send_by,
"title": title,
"content": content,
"target": target_id,
"image_url": image_url or None,
"created_at": datetime.utcnow().isoformat()
}
url = f"{SUPABASE_URL}/rest/v1/fcm_notifications"
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=SUPABASE_HEADERS, json=payload) as resp:
if resp.status not in (200, 201):
detail = await resp.text()
raise HTTPException(status_code=500, detail=f"Failed to log notification: {detail}")
def is_valid_image_url(url: str) -> bool:
valid_extensions = (".jpg", ".jpeg", ".png", ".gif", ".webp")
return url.lower().endswith(valid_extensions)
@router.post("/send-global-notification")
async def send_global_notification(
payload: SimpleNotification,
user_token: str = Header(..., alias="User-key")
):
sender_id = await get_user_id_from_token(user_token)
sender = await get_user_by_id(sender_id)
if not sender or not sender.get("manage_notifications"):
raise HTTPException(status_code=403, detail="You are not authorized to send notifications.")
# Validate image_url, if provided
if payload.image_url and not is_valid_image_url(payload.image_url):
raise HTTPException(status_code=400, detail="The image_url provided is not a valid image format.")
message = {
"notification": {
"title": payload.title,
"body": payload.content
}
}
target_user_id = None
if payload.image_url:
message["notification"]["image"] = payload.image_url
if payload.target.lower() == "all":
message["topic"] = "all"
else:
target_user = await get_user_by_email(payload.target.lower().strip())
if not target_user:
raise HTTPException(status_code=404, detail="The provided email does not correspond to any user.")
if not target_user.get("token_fcm"):
raise HTTPException(status_code=400, detail="The target user does not have a registered FCM token.")
message["token"] = target_user["token_fcm"]
target_user_id = target_user["id"]
# Envia para o Firebase
access_token = get_fcm_access_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
url = f"https://fcm.googleapis.com/v1/projects/{FCM_PROJECT_ID}/messages:send"
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json={"message": message}) as resp:
response_text = await resp.text()
if resp.status != 200:
raise HTTPException(status_code=resp.status, detail=f"FCM error: {response_text}")
# Salvar notificação no banco
await log_notification(
send_by=sender_id,
title=payload.title,
content=payload.content,
target_id=target_user_id,
image_url=payload.image_url
)
return {"detail": "Notification sent and logged successfully."}