Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
""" | |
Atlas Unified Bridge Server | |
This server connects all Atlas platform components together | |
""" | |
import os | |
import sys | |
import json | |
import time | |
import logging | |
import requests | |
from fastapi import FastAPI, HTTPException, Request | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import JSONResponse | |
import uvicorn | |
from pydantic import BaseModel | |
from typing import Dict, List, Any, Optional | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler("bridge_server.log") | |
] | |
) | |
logger = logging.getLogger("atlas_bridge") | |
# Component endpoints | |
OPENMANUS_ENDPOINT = "http://localhost:50505" | |
CASIBASE_ENDPOINT = "http://localhost:7777" | |
CYPHER_ENDPOINT = "http://localhost:5000" | |
QUANTUMVISION_ENDPOINT = "http://localhost:8000" | |
CONVERSATION_ENDPOINT = "http://localhost:8001" | |
# Initialize FastAPI app | |
app = FastAPI(title="Atlas Unified Bridge") | |
# Add CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # Adjust in production | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Health check endpoint | |
async def health_check(): | |
"""Health check endpoint to verify if bridge server is running""" | |
status = { | |
"bridge": "operational", | |
"components": { | |
"openmanus": check_component_health(OPENMANUS_ENDPOINT), | |
"casibase": check_component_health(CASIBASE_ENDPOINT), | |
"cypher": check_component_health(CYPHER_ENDPOINT), | |
"quantumvision": check_component_health(QUANTUMVISION_ENDPOINT), | |
"conversation": check_component_health(CONVERSATION_ENDPOINT) | |
}, | |
"timestamp": time.time() | |
} | |
return status | |
def check_component_health(endpoint: str) -> str: | |
"""Check if a component is operational""" | |
try: | |
response = requests.get(f"{endpoint}/health", timeout=3) | |
if response.status_code == 200: | |
return "operational" | |
except: | |
pass | |
return "unavailable" | |
# Main entry point | |
if __name__ == "__main__": | |
logger.info("Starting Atlas Unified Bridge Server") | |
uvicorn.run("bridge_server:app", host="0.0.0.0", port=8080, reload=True) | |