mgbam's picture
Update app/app.py
acb6f17 verified
raw
history blame
3.91 kB
"""
Sentinel Arbitrage Engine - v15.0 OMEGA (Socket.IO Perfected)
The definitive, money-spinning engine. This version uses a perfectly
configured Socket.IO server to push real-time, multi-asset arbitrage
signals directly to a dynamic frontend. This is the final architecture.
"""
import asyncio
import os
import json
import time
from datetime import datetime, timezone
import httpx
import socketio
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from .price_fetcher import PriceFetcher
from .arbitrage_analyzer import ArbitrageAnalyzer
OPPORTUNITY_THRESHOLD = 0.0015
# --- Socket.IO Server Setup ---
# We create the server instance that will manage all real-time communication.
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
# --- FastAPI App Setup ---
# The FastAPI app is now simpler. It only serves the static files.
app = FastAPI()
app.mount("/", StaticFiles(directory="static", html=True), name="static")
# --- Background Engine ---
async def run_arbitrage_detector(price_fetcher, analyzer):
"""The core engine loop. Detects opportunities and emits them via Socket.IO."""
while True:
try:
await price_fetcher.update_prices_async()
all_prices = price_fetcher.get_all_prices()
for asset, prices in all_prices.items():
pyth_price = prices.get("pyth")
chainlink_price = prices.get("chainlink_agg")
if pyth_price and chainlink_price and pyth_price > 0:
spread = abs(pyth_price - chainlink_price) / chainlink_price
if spread > OPPORTUNITY_THRESHOLD:
current_time = time.time()
# Simple throttle to avoid spamming Gemini for the same opportunity
if not hasattr(analyzer, 'last_call') or current_time - analyzer.last_call.get(asset, 0) > 60:
analyzer.last_call = getattr(analyzer, 'last_call', {})
analyzer.last_call[asset] = current_time
opportunity = {
"asset": asset, "pyth_price": pyth_price,
"chainlink_price": chainlink_price, "spread_pct": spread * 100
}
print(f"⚑️ Dislocation for {asset}: {opportunity['spread_pct']:.3f}%")
briefing = await analyzer.get_alpha_briefing(asset, opportunity)
if briefing:
signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
await sio.emit('new_signal', signal)
print(f"βœ… Signal Emitted for {asset}: {signal['strategy']}")
except Exception as e:
print(f"❌ ERROR in engine loop: {e}")
await asyncio.sleep(15)
# --- Socket.IO Lifespan Events ---
# This is the CORRECT way to manage background tasks with python-socketio.
@sio.on('connect')
async def connect(sid, environ):
print(f"βœ… Client connected: {sid}")
# Start the engine only when the first user connects.
if sio.background_task is None:
print("πŸš€ First client connected. Starting Sentinel Engine...")
price_fetcher = PriceFetcher(httpx.AsyncClient())
arbitrage_analyzer = ArbitrageAnalyzer(httpx.AsyncClient())
sio.background_task = sio.start_background_task(
run_arbitrage_detector, price_fetcher, arbitrage_analyzer
)
@sio.on('disconnect')
def disconnect(sid):
print(f"πŸ”₯ Client disconnected: {sid}")
# --- Final ASGI App ---
# We wrap the FastAPI app (for static files) and the Socket.IO app together.
# The Socket.IO server is the primary application.
combined_app = socketio.ASGIApp(sio, other_asgi_app=app)