Spaces:
Sleeping
Sleeping
""" | |
Sentinel Arbitrage Engine - v10.0 OMEGA | |
A complete architectural overhaul using FastAPI-SocketIO for guaranteed, | |
real-time signal delivery. This is the definitive money-spinning engine. | |
""" | |
import asyncio | |
import os | |
from contextlib import asynccontextmanager | |
from datetime import datetime, timezone | |
import httpx | |
import socketio | |
from fastapi import FastAPI | |
from fastapi.staticfiles import StaticFiles | |
from price_fetcher import PriceFetcher | |
from arbitrage_analyzer import ArbitrageAnalyzer | |
OPPORTUNITY_THRESHOLD = 0.001 | |
# --- Socket.IO Server Setup --- | |
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*') | |
socket_app = socketio.ASGIApp(sio) | |
# --- Application Lifespan --- | |
async def lifespan(app: FastAPI): | |
print("π Initializing Sentinel Arbitrage Engine v10.0...") | |
price_fetcher = PriceFetcher(httpx.AsyncClient()) | |
arbitrage_analyzer = ArbitrageAnalyzer(httpx.AsyncClient()) | |
# Launch the engine as a background task, passing the Socket.IO server instance | |
arbitrage_task = asyncio.create_task( | |
run_arbitrage_detector(price_fetcher, arbitrage_analyzer) | |
) | |
print("β Engine is online and hunting for opportunities.") | |
yield | |
print("β³ Shutting down engine...") | |
arbitrage_task.cancel() | |
try: await arbitrage_task | |
except asyncio.CancelledError: print("Engine shut down gracefully.") | |
async def run_arbitrage_detector(price_fetcher, analyzer): | |
"""The core engine loop. Now directly emits events via Socket.IO.""" | |
while True: | |
try: | |
await price_fetcher.update_prices_async() | |
prices = price_fetcher.get_current_prices() | |
pyth_price = prices.get("pyth") | |
chainlink_price = prices.get("chainlink_agg") | |
if pyth_price and chainlink_price: | |
spread = abs(pyth_price - chainlink_price) / chainlink_price | |
if spread > OPPORTUNITY_THRESHOLD: | |
opportunity = { | |
"pyth_price": pyth_price, | |
"chainlink_price": chainlink_price, | |
"spread_pct": spread * 100 | |
} | |
print(f"β‘οΈ Discrepancy Detected: {opportunity['spread_pct']:.3f}%") | |
briefing = await analyzer.get_alpha_briefing(opportunity) | |
if briefing: | |
signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()} | |
# --- THE FIX: Directly emit to all connected clients --- | |
await sio.emit('new_signal', signal) | |
print(f"β Signal Emitted: {signal['strategy']}") | |
except Exception as e: | |
print(f"β ERROR in engine loop: {e}") | |
await asyncio.sleep(15) | |
# --- FastAPI App Setup --- | |
app = FastAPI(lifespan=lifespan) | |
# This serves the index.html and any other static files (like JS or CSS) | |
app.mount("/", StaticFiles(directory="static", html=True), name="static") | |
async def connect(sid, environ): | |
print(f"β Client connected: {sid}") | |
async def disconnect(sid): | |
print(f"π₯ Client disconnected: {sid}") | |
# Mount the Socket.IO app on top of the FastAPI app | |
app.mount('/socket.io', socket_app) |