File size: 5,103 Bytes
0b9ad55 7ad8f7d 0b9ad55 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
from agentpress.thread_manager import ThreadManager
from services.supabase import DBConnection
from datetime import datetime, timezone
from dotenv import load_dotenv
from utils.config import config, EnvMode
import asyncio
from utils.logger import logger
import uuid
import time
from collections import OrderedDict
# Import the agent API module
from agent import api as agent_api
from sandbox import api as sandbox_api
from services import billing as billing_api
# Load environment variables (these will be available through config)
load_dotenv()
# Initialize managers
db = DBConnection()
thread_manager = None
instance_id = "single"
# Rate limiter state
ip_tracker = OrderedDict()
MAX_CONCURRENT_IPS = 25
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
global thread_manager
logger.info(f"Starting up FastAPI application with instance ID: {instance_id} in {config.ENV_MODE.value} mode")
try:
# Initialize database
await db.initialize()
thread_manager = ThreadManager()
# Initialize the agent API with shared resources
agent_api.initialize(
thread_manager,
db,
instance_id
)
# Initialize the sandbox API with shared resources
sandbox_api.initialize(db)
# Initialize Redis connection
from services import redis
try:
await redis.initialize_async()
logger.info("Redis connection initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Redis connection: {e}")
# Continue without Redis - the application will handle Redis failures gracefully
# Start background tasks
asyncio.create_task(agent_api.restore_running_agent_runs())
yield
# Clean up agent resources
logger.info("Cleaning up agent resources")
await agent_api.cleanup()
# Clean up Redis connection
try:
logger.info("Closing Redis connection")
await redis.close()
logger.info("Redis connection closed successfully")
except Exception as e:
logger.error(f"Error closing Redis connection: {e}")
# Clean up database connection
logger.info("Disconnecting from database")
await db.disconnect()
except Exception as e:
logger.error(f"Error during application startup: {e}")
raise
app = FastAPI(lifespan=lifespan)
@app.middleware("http")
async def log_requests_middleware(request: Request, call_next):
start_time = time.time()
client_ip = request.client.host
method = request.method
url = str(request.url)
path = request.url.path
query_params = str(request.query_params)
# Log the incoming request
logger.info(f"Request started: {method} {path} from {client_ip} | Query: {query_params}")
try:
response = await call_next(request)
process_time = time.time() - start_time
logger.debug(f"Request completed: {method} {path} | Status: {response.status_code} | Time: {process_time:.2f}s")
return response
except Exception as e:
process_time = time.time() - start_time
logger.error(f"Request failed: {method} {path} | Error: {str(e)} | Time: {process_time:.2f}s")
raise
# Define allowed origins based on environment
allowed_origins = ["https://www.suna.so","https://suna-gold.vercel.app", "https://suna.so", "https://staging.suna.so", "http://localhost:3000"]
# Add staging-specific origins
if config.ENV_MODE == EnvMode.STAGING:
allowed_origins.append("http://localhost:3006")
# Add local-specific origins
if config.ENV_MODE == EnvMode.LOCAL:
allowed_origins.append("http://localhost:3006")
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type", "Authorization"],
)
# Include the agent router with a prefix
app.include_router(agent_api.router, prefix="/api")
# Include the sandbox router with a prefix
app.include_router(sandbox_api.router, prefix="/api")
# Include the billing router with a prefix
app.include_router(billing_api.router, prefix="/api")
@app.get("/api/health")
async def health_check():
"""Health check endpoint to verify API is working."""
logger.info("Health check endpoint called")
return {
"status": "ok",
"timestamp": datetime.now(timezone.utc).isoformat(),
"instance_id": instance_id
}
if __name__ == "__main__":
import uvicorn
workers = 2
logger.info(f"Starting server on 0.0.0.0:8000 with {workers} workers")
uvicorn.run(
"api:app",
host="0.0.0.0",
port=7860,
workers=workers,
# reload=True
) |