mgbam's picture
Update app/app.py
a94a20c verified
raw
history blame
3.57 kB
"""
Sentinel Arbitrage Engine - v10.0 OMEGA
A complete architectural overhaul using FastAPI-SocketIO for guaranteed,
real-time signal delivery. This is the definitive money-spinning engine.
"""
import asyncio
import os
from contextlib import asynccontextmanager
import json
import time
from datetime import datetime, timezone
import httpx
import socketio
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
# --- RELATIVE IMPORTS FOR PACKAGE STRUCTURE ---
from .price_fetcher import PriceFetcher
from .arbitrage_analyzer import ArbitrageAnalyzer
from .broker import signal_broker
OPPORTUNITY_THRESHOLD = 0.001
# --- Socket.IO Server Setup ---
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
socket_app = socketio.ASGIApp(sio)
# --- Application Lifespan ---
@asynccontextmanager
async def lifespan(app: FastAPI):
print("πŸš€ Initializing Sentinel Arbitrage Engine v10.0...")
async with httpx.AsyncClient() as client:
app.state.price_fetcher = PriceFetcher(client)
app.state.arbitrage_analyzer = ArbitrageAnalyzer(client)
arbitrage_task = asyncio.create_task(
run_arbitrage_detector(app.state.price_fetcher, app.state.arbitrage_analyzer)
)
print("βœ… Engine is online and hunting for opportunities.")
yield
print("⏳ Shutting down engine...")
arbitrage_task.cancel()
try: await arbitrage_task
except asyncio.CancelledError: print("Engine shut down gracefully.")
async def run_arbitrage_detector(price_fetcher, analyzer):
"""The core engine loop. Directly emits events via Socket.IO."""
last_opportunity_time = 0
while True:
try:
await price_fetcher.update_prices_async()
prices = price_fetcher.get_current_prices()
pyth_price = prices.get("pyth")
chainlink_price = prices.get("chainlink_agg")
if pyth_price and chainlink_price:
spread = abs(pyth_price - chainlink_price) / chainlink_price
if spread > OPPORTUNITY_THRESHOLD:
current_time = time.time()
if current_time - last_opportunity_time > 30: # Throttle Gemini calls
last_opportunity_time = current_time
opportunity = {
"pyth_price": pyth_price, "chainlink_price": chainlink_price, "spread_pct": spread * 100
}
print(f"⚑️ Discrepancy Detected: {opportunity['spread_pct']:.3f}%")
briefing = await analyzer.get_alpha_briefing(opportunity)
if briefing:
signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
await sio.emit('new_signal', signal)
print(f"βœ… Signal Emitted: {signal['strategy']}")
except Exception as e:
print(f"❌ ERROR in engine loop: {e}")
await asyncio.sleep(15)
# --- FastAPI App Setup ---
app = FastAPI(lifespan=lifespan)
@sio.event
async def connect(sid, environ):
print(f"βœ… Client connected: {sid}")
@sio.event
async def disconnect(sid):
print(f"πŸ”₯ Client disconnected: {sid}")
# Mount the Socket.IO app on a specific path
app.mount('/socket.io', socket_app)
# Serve the static files (like index.html) from the 'static' directory
app.mount("/", StaticFiles(directory="static", html=True), name="static")