|
import os |
|
import stripe |
|
import requests |
|
import logging |
|
import pytz |
|
from fastapi import APIRouter, HTTPException, Header, Query |
|
from datetime import datetime, timedelta |
|
from dateutil.relativedelta import relativedelta |
|
from typing import List, Dict, Any |
|
|
|
router = APIRouter() |
|
|
|
|
|
stripe.api_key = os.getenv("STRIPE_KEY") |
|
stripe.api_version = "2023-10-16" |
|
SUPABASE_URL = "https://ussxqnifefkgkaumjann.supabase.co" |
|
SUPABASE_KEY = os.getenv("SUPA_KEY") |
|
|
|
if not stripe.api_key or not SUPABASE_KEY: |
|
raise ValueError("❌ STRIPE_KEY ou SUPA_KEY não foram definidos no ambiente!") |
|
|
|
SUPABASE_HEADERS = { |
|
"apikey": SUPABASE_KEY, |
|
"Authorization": f"Bearer {SUPABASE_KEY}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
def verify_token(user_token: str) -> str: |
|
headers = { |
|
"Authorization": f"Bearer {user_token}", |
|
"apikey": SUPABASE_KEY, |
|
"Content-Type": "application/json" |
|
} |
|
response = requests.get(f"{SUPABASE_URL}/auth/v1/user", headers=headers) |
|
if response.status_code == 200: |
|
user_data = response.json() |
|
user_id = user_data.get("id") |
|
if not user_id: |
|
raise HTTPException(status_code=400, detail="Invalid token: User ID not found") |
|
return user_id |
|
else: |
|
raise HTTPException(status_code=401, detail="Invalid or expired token") |
|
|
|
def get_account_balance(account_id: str) -> Dict[str, Any]: |
|
try: |
|
balance = stripe.Balance.retrieve(stripe_account=account_id) |
|
available_balance = next((b.amount for b in balance.available if b.currency.upper() == "BRL"), 0) |
|
pending_balance = next((b.amount for b in balance.pending if b.currency.upper() == "BRL"), 0) |
|
return { |
|
"available_balance": available_balance, |
|
"pending_balance": pending_balance, |
|
"currency": "BRL" |
|
} |
|
except Exception as e: |
|
logger.error(f"❌ Error getting account balance: {str(e)}") |
|
return {"available_balance": 0, "pending_balance": 0, "currency": "BRL"} |
|
|
|
def get_monthly_revenue(account_id: str) -> List[Dict[str, Any]]: |
|
ny_timezone = pytz.timezone('America/New_York') |
|
now_ny = datetime.now(ny_timezone) |
|
monthly_data = {} |
|
|
|
for i in range(6): |
|
target_date = now_ny - relativedelta(months=i) |
|
month_num = target_date.month |
|
month_name = target_date.strftime('%b') |
|
year = target_date.year |
|
month_key = f"{year}-{month_num}" |
|
monthly_data[month_key] = { |
|
"month": month_num, |
|
"name": month_name, |
|
"current": (month_num == now_ny.month and year == now_ny.year), |
|
"amount": 0, |
|
"growth": {"status": "", "percentage": 0, "formatted": "0.0%"} |
|
} |
|
|
|
start_date = now_ny - relativedelta(months=12) |
|
start_timestamp = int(start_date.timestamp()) |
|
|
|
try: |
|
transfers = stripe.Transfer.list( |
|
destination=account_id, |
|
created={"gte": start_timestamp}, |
|
limit=100 |
|
) |
|
for transfer in transfers.data: |
|
transfer_date = datetime.fromtimestamp(transfer.created, ny_timezone) |
|
month_key = f"{transfer_date.year}-{transfer_date.month}" |
|
if month_key in monthly_data: |
|
monthly_data[month_key]["amount"] += transfer.amount |
|
|
|
result = list(monthly_data.values()) |
|
result.sort(key=lambda x: (now_ny.month - x["month"]) % 12) |
|
|
|
for i in range(len(result)): |
|
current_month_data = result[i] |
|
prev_year_month_key = f"{now_ny.year - 1}-{current_month_data['month']}" |
|
previous_amount = monthly_data.get(prev_year_month_key, {}).get("amount", 0) |
|
current_amount = current_month_data["amount"] |
|
|
|
if previous_amount > 0: |
|
growth_percentage = ((current_amount - previous_amount) / previous_amount) * 100 |
|
else: |
|
growth_percentage = 100 if current_amount > 0 else 0 |
|
|
|
current_month_data["growth"] = { |
|
"status": "up" if growth_percentage > 0 else "down", |
|
"percentage": round(growth_percentage, 1), |
|
"formatted": f"{round(growth_percentage, 1)}%" |
|
} |
|
|
|
return result |
|
except Exception as e: |
|
logger.error(f"❌ Error getting monthly revenue: {str(e)}") |
|
return list(monthly_data.values()) |
|
|
|
def get_active_subscribers(user_id: str, page: int) -> Dict[str, Any]: |
|
limit = 30 |
|
offset = page * limit |
|
url = f"{SUPABASE_URL}/rest/v1/Subscriptions?stylist_id=eq.{user_id}&active=eq.true&limit={limit}&offset={offset}" |
|
response = requests.get(url, headers=SUPABASE_HEADERS) |
|
if response.status_code == 200: |
|
subscribers = response.json() |
|
subscriber_list = [] |
|
for sub in subscribers: |
|
customer_id = sub.get("customer_id") |
|
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{customer_id}" |
|
user_response = requests.get(user_data_url, headers=SUPABASE_HEADERS) |
|
if user_response.status_code == 200 and user_response.json(): |
|
user_info = user_response.json()[0] |
|
subscriber_list.append({ |
|
"id": user_info.get("id"), |
|
"name": user_info.get("name"), |
|
"avatar": user_info.get("avatar"), |
|
"blurhash": user_info.get("blurhash") |
|
}) |
|
|
|
has_next_page = len(subscribers) == limit |
|
return {"subscribers": subscriber_list, "has_next_page": has_next_page} |
|
|
|
return {"subscribers": [], "has_next_page": False} |
|
|
|
def get_total_followers(user_id: str) -> int: |
|
url = f"{SUPABASE_URL}/rest/v1/followers?following_id=eq.{user_id}" |
|
response = requests.get(url, headers=SUPABASE_HEADERS) |
|
if response.status_code == 200: |
|
followers = response.json() |
|
return len(followers) |
|
return 0 |
|
|
|
def get_total_subscribers(user_id: str) -> int: |
|
url = f"{SUPABASE_URL}/rest/v1/Subscriptions?stylist_id=eq.{user_id}&active=eq.true" |
|
response = requests.get(url, headers=SUPABASE_HEADERS) |
|
if response.status_code == 200: |
|
subscribers = response.json() |
|
return len(subscribers) |
|
return 0 |
|
|
|
@router.get("/dashboard") |
|
def get_dashboard(user_token: str = Header(None, alias="User-key"), page: int = Query(0, ge=0)): |
|
try: |
|
user_id = verify_token(user_token) |
|
user_data_url = f"{SUPABASE_URL}/rest/v1/User?id=eq.{user_id}" |
|
response = requests.get(user_data_url, headers=SUPABASE_HEADERS) |
|
user_data = response.json()[0] |
|
stripe_id = user_data.get("stripe_id") |
|
|
|
return { |
|
"stripe_id": stripe_id, |
|
"available_balance": get_account_balance(stripe_id), |
|
"monthly_revenue": get_monthly_revenue(stripe_id), |
|
"total_followers": get_total_followers(user_id), |
|
"total_subscribers": get_total_subscribers(user_id), |
|
**get_active_subscribers(user_id, page) |
|
} |
|
except Exception as e: |
|
logger.error(f"❌ Error: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |