|
import os |
|
import aiohttp |
|
import hashlib |
|
from fastapi import APIRouter, HTTPException, Header |
|
from pydantic import BaseModel |
|
from google.oauth2 import service_account |
|
from google.auth.transport.requests import Request |
|
|
|
router = APIRouter() |
|
|
|
|
|
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co" |
|
SUPABASE_KEY = os.getenv("SUPA_KEY") |
|
SUPABASE_ROLE_KEY = os.getenv("SUPA_SERVICE_KEY") |
|
|
|
if not SUPABASE_KEY or not SUPABASE_ROLE_KEY: |
|
raise ValueError("❌ SUPA_KEY or SUPA_SERVICE_KEY not set in environment!") |
|
|
|
SUPABASE_HEADERS = { |
|
"apikey": SUPABASE_KEY, |
|
"Authorization": f"Bearer {SUPABASE_KEY}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
SUPABASE_ROLE_HEADERS = { |
|
"apikey": SUPABASE_ROLE_KEY, |
|
"Authorization": f"Bearer {SUPABASE_ROLE_KEY}", |
|
"Content-Type": "application/json", |
|
"Prefer": "return=representation" |
|
} |
|
|
|
|
|
SERVICE_ACCOUNT_FILE = './closetcoach-2d50b-firebase-adminsdk-fbsvc-7fcccbacb1.json' |
|
FCM_PROJECT_ID = "closetcoach-2d50b" |
|
|
|
class NotificationRequest(BaseModel): |
|
keyword: str |
|
target_user_id: str = "" |
|
reference: str = "" |
|
|
|
def short_collapse_key(keyword: str, sender_id: str, receiver_id: str) -> str: |
|
raw = f"{keyword}:{sender_id}:{receiver_id}" |
|
return hashlib.sha1(raw.encode()).hexdigest()[:20] |
|
|
|
async def verify_user_token(user_token: str) -> str: |
|
headers = { |
|
"Authorization": f"Bearer {user_token}", |
|
"apikey": SUPABASE_KEY, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) as response: |
|
if response.status != 200: |
|
raise HTTPException(status_code=401, detail="Invalid or expired token") |
|
|
|
user_data = await response.json() |
|
user_id = user_data.get("id") |
|
if not user_id: |
|
raise HTTPException(status_code=400, detail="User ID not found") |
|
|
|
return user_id |
|
|
|
async def fetch_supabase(table: str, select: str, filters: dict, headers=SUPABASE_ROLE_HEADERS): |
|
filter_query = '&'.join([f'{k}=eq.{v}' for k, v in filters.items()]) |
|
url = f"{SUPABASE_URL}/rest/v1/{table}?select={select}&{filter_query}&order=created_at.desc" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url, headers=headers) as resp: |
|
if resp.status != 200: |
|
detail = await resp.text() |
|
raise HTTPException(status_code=500, detail=f"Supabase error: {detail}") |
|
return await resp.json() |
|
|
|
def format_name(full_name: str) -> str: |
|
parts = full_name.strip().split() |
|
if len(parts) == 1: |
|
return parts[0] |
|
return f"{parts[0]} {parts[1][0].upper()}." |
|
|
|
async def get_user_info(user_id: str): |
|
users = await fetch_supabase("User", "name,token_fcm,notifications", {"id": user_id}) |
|
if not users: |
|
return None |
|
return users[0] |
|
|
|
async def check_follow_exists(follower_id: str, following_id: str) -> bool: |
|
result = await fetch_supabase("followers", "id", {"follower_id": follower_id, "following_id": following_id}) |
|
return len(result) > 0 |
|
|
|
async def check_subscription_exists(customer_id: str, stylist_id: str) -> bool: |
|
result = await fetch_supabase( |
|
"Subscriptions", |
|
"id", |
|
{ |
|
"customer_id": customer_id, |
|
"stylist_id": stylist_id, |
|
"active": "true" |
|
} |
|
) |
|
return len(result) > 0 |
|
|
|
async def get_post_info(feed_id: str): |
|
feeds = await fetch_supabase("Feeds", "description,portfolios,user_id", {"id": feed_id}) |
|
if not feeds: |
|
raise HTTPException(status_code=404, detail="Post not found") |
|
|
|
feed = feeds[0] |
|
description = feed.get("description", "") |
|
portfolio_ids = feed.get("portfolios") or [] |
|
user_id = feed.get("user_id") |
|
|
|
image_url = None |
|
if portfolio_ids: |
|
portfolio_data = await fetch_supabase("Portfolio", "image_url", {"id": portfolio_ids[0]}) |
|
if portfolio_data: |
|
image_url = portfolio_data[0].get("image_url") |
|
|
|
return { |
|
"description": description, |
|
"image_url": image_url, |
|
"user_id": user_id |
|
} |
|
|
|
def get_access_token(): |
|
credentials = service_account.Credentials.from_service_account_file( |
|
SERVICE_ACCOUNT_FILE |
|
) |
|
scoped_credentials = credentials.with_scopes( |
|
['https://www.googleapis.com/auth/firebase.messaging'] |
|
) |
|
scoped_credentials.refresh(Request()) |
|
return scoped_credentials.token |
|
|
|
@router.post("/send-notification") |
|
async def send_notification( |
|
data: NotificationRequest, |
|
user_token: str = Header(..., alias="User-key") |
|
): |
|
sender_id = await verify_user_token(user_token) |
|
|
|
if data.keyword not in ("follow", "like", "subscriber", "newmessage", "changeprice"): |
|
raise HTTPException(status_code=400, detail="Unsupported keyword") |
|
|
|
|
|
if data.keyword == "like": |
|
post_info = await get_post_info(data.reference) |
|
target_user_id = post_info["user_id"] |
|
|
|
elif data.keyword == "newmessage": |
|
|
|
messages = await fetch_supabase("messages", "sender_id,content,file", { |
|
"chat_id": data.reference |
|
}) |
|
if not messages: |
|
raise HTTPException(status_code=404, detail="No messages found in chat") |
|
|
|
last_message = messages[0] |
|
sender_id = last_message["sender_id"] |
|
content = (last_message.get("content") or "").strip() |
|
file_url = last_message.get("file") |
|
|
|
|
|
chats = await fetch_supabase("chats", "client_id,stylist_id", { |
|
"id": data.reference |
|
}) |
|
if not chats: |
|
raise HTTPException(status_code=404, detail="Chat not found") |
|
|
|
chat = chats[0] |
|
target_user_id = chat["stylist_id"] if sender_id == chat["client_id"] else chat["client_id"] |
|
|
|
else: |
|
target_user_id = data.target_user_id |
|
|
|
|
|
if data.keyword == "follow": |
|
follow_exists = await check_follow_exists(sender_id, target_user_id) |
|
if not follow_exists: |
|
raise HTTPException(status_code=403, detail="Follow relationship does not exist") |
|
|
|
if data.keyword == "subscriber": |
|
subscription_exists = await check_subscription_exists( |
|
customer_id=sender_id, |
|
stylist_id=target_user_id |
|
) |
|
if not subscription_exists: |
|
return {"detail": "No active subscription found, notification not sent"} |
|
|
|
|
|
target_user = await get_user_info(target_user_id) |
|
if not target_user or not target_user.get("token_fcm"): |
|
raise HTTPException(status_code=404, detail="Target user or FCM token not found") |
|
|
|
|
|
if not target_user.get("notifications", True): |
|
return {"detail": "Target user has notifications disabled, notification not sent"} |
|
|
|
actor_info = await get_user_info(sender_id) |
|
if not actor_info or not actor_info.get("name"): |
|
raise HTTPException(status_code=404, detail="User not found") |
|
actor_name = format_name(actor_info["name"]) |
|
|
|
collapse_id = short_collapse_key(data.keyword, sender_id, target_user_id) |
|
|
|
|
|
title = image_url = None |
|
|
|
if data.keyword == "follow": |
|
title = "🎉 New Follower!" |
|
body = f"{actor_name} started following you." |
|
|
|
elif data.keyword == "like": |
|
desc = post_info["description"] |
|
title = "❤️ New Like!" |
|
body = f"{actor_name} liked your post" + (f": \"{desc}\"" if desc else ".") |
|
image_url = post_info["image_url"] |
|
|
|
elif data.keyword == "changeprice": |
|
title = "⚠️ Subscription Price Changed" |
|
|
|
body = f"{actor_name} changed your subscription price. Your subscription was automatically canceled. Please check the chat with {actor_name} for reactivation options and more info." |
|
|
|
elif data.keyword == "subscriber": |
|
title = "💼 New Subscriber!" |
|
body = f"{actor_name} just subscribed to your styling services." |
|
|
|
elif data.keyword == "newmessage": |
|
title = "💬 New Message" |
|
if content: |
|
body = f"{actor_name}: {content}" |
|
elif file_url: |
|
is_image = file_url.lower().endswith((".jpg", ".jpeg", ".png", ".gif", ".webp")) |
|
if is_image: |
|
body = f"{actor_name} sent you a photo" |
|
image_url = file_url |
|
else: |
|
body = f"{actor_name} sent you a file" |
|
else: |
|
body = f"{actor_name} sent you a message" |
|
|
|
else: |
|
raise HTTPException(status_code=400, detail="Unsupported keyword") |
|
|
|
|
|
message = { |
|
"notification": { |
|
"title": title, |
|
"body": body, |
|
}, |
|
"token": target_user["token_fcm"], |
|
"android": { |
|
"collapse_key": collapse_id, |
|
"notification": { |
|
"tag": collapse_id |
|
} |
|
}, |
|
"apns": { |
|
"headers": { |
|
"apns-collapse-id": collapse_id |
|
} |
|
} |
|
} |
|
|
|
if image_url: |
|
message["notification"]["image"] = image_url |
|
message["android"]["notification"]["image"] = image_url |
|
|
|
payload = {"message": message} |
|
|
|
access_token = get_access_token() |
|
headers = { |
|
"Authorization": f"Bearer {access_token}", |
|
"Content-Type": "application/json" |
|
} |
|
url = f"https://fcm.googleapis.com/v1/projects/{FCM_PROJECT_ID}/messages:send" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.post(url, headers=headers, json=payload) as resp: |
|
resp_text = await resp.text() |
|
if resp.status != 200: |
|
raise HTTPException(status_code=resp.status, detail=f"FCM error: {resp_text}") |
|
fcm_response = await resp.json() |
|
|
|
return {"detail": "Notification sent successfully", "fcm_response": fcm_response} |