mgbam's picture
Update app/app.py
1690b33 verified
raw
history blame
3.31 kB
"""
Sentinel Arbitrage Engine - v10.0 OMEGA
A complete architectural overhaul using FastAPI-SocketIO for guaranteed,
real-time signal delivery. This is the definitive money-spinning engine.
"""
import asyncio
import os
from contextlib import asynccontextmanager
from datetime import datetime, timezone
import httpx
import socketio
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from price_fetcher import PriceFetcher
from arbitrage_analyzer import ArbitrageAnalyzer
OPPORTUNITY_THRESHOLD = 0.001
# --- Socket.IO Server Setup ---
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
socket_app = socketio.ASGIApp(sio)
# --- Application Lifespan ---
@asynccontextmanager
async def lifespan(app: FastAPI):
print("πŸš€ Initializing Sentinel Arbitrage Engine v10.0...")
price_fetcher = PriceFetcher(httpx.AsyncClient())
arbitrage_analyzer = ArbitrageAnalyzer(httpx.AsyncClient())
# Launch the engine as a background task, passing the Socket.IO server instance
arbitrage_task = asyncio.create_task(
run_arbitrage_detector(price_fetcher, arbitrage_analyzer)
)
print("βœ… Engine is online and hunting for opportunities.")
yield
print("⏳ Shutting down engine...")
arbitrage_task.cancel()
try: await arbitrage_task
except asyncio.CancelledError: print("Engine shut down gracefully.")
async def run_arbitrage_detector(price_fetcher, analyzer):
"""The core engine loop. Now directly emits events via Socket.IO."""
while True:
try:
await price_fetcher.update_prices_async()
prices = price_fetcher.get_current_prices()
pyth_price = prices.get("pyth")
chainlink_price = prices.get("chainlink_agg")
if pyth_price and chainlink_price:
spread = abs(pyth_price - chainlink_price) / chainlink_price
if spread > OPPORTUNITY_THRESHOLD:
opportunity = {
"pyth_price": pyth_price,
"chainlink_price": chainlink_price,
"spread_pct": spread * 100
}
print(f"⚑️ Discrepancy Detected: {opportunity['spread_pct']:.3f}%")
briefing = await analyzer.get_alpha_briefing(opportunity)
if briefing:
signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
# --- THE FIX: Directly emit to all connected clients ---
await sio.emit('new_signal', signal)
print(f"βœ… Signal Emitted: {signal['strategy']}")
except Exception as e:
print(f"❌ ERROR in engine loop: {e}")
await asyncio.sleep(15)
# --- FastAPI App Setup ---
app = FastAPI(lifespan=lifespan)
# This serves the index.html and any other static files (like JS or CSS)
app.mount("/", StaticFiles(directory="static", html=True), name="static")
@sio.event
async def connect(sid, environ):
print(f"βœ… Client connected: {sid}")
@sio.event
async def disconnect(sid):
print(f"πŸ”₯ Client disconnected: {sid}")
# Mount the Socket.IO app on top of the FastAPI app
app.mount('/socket.io', socket_app)