Spaces:
Running
Running
File size: 3,574 Bytes
b159555 1690b33 ea758f6 1690b33 b159555 c6f94f2 b159555 6a1e668 a94a20c 56eb560 6a1e668 b159555 1690b33 b159555 a94a20c 6a1e668 510ee6f faf277f b349d30 1690b33 0e87c05 1690b33 6a1e668 073930c 1690b33 a94a20c 6a1e668 073930c 1690b33 6a1e668 1690b33 510ee6f 1690b33 073930c 1690b33 b159555 1690b33 b159555 1690b33 b159555 6a1e668 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
"""
Sentinel Arbitrage Engine - v10.0 OMEGA
A complete architectural overhaul using FastAPI-SocketIO for guaranteed,
real-time signal delivery. This is the definitive money-spinning engine.
"""
import asyncio
import os
from contextlib import asynccontextmanager
import json
import time
from datetime import datetime, timezone
import httpx
import socketio
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
# --- RELATIVE IMPORTS FOR PACKAGE STRUCTURE ---
from .price_fetcher import PriceFetcher
from .arbitrage_analyzer import ArbitrageAnalyzer
from .broker import signal_broker
OPPORTUNITY_THRESHOLD = 0.001
# --- Socket.IO Server Setup ---
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
socket_app = socketio.ASGIApp(sio)
# --- Application Lifespan ---
@asynccontextmanager
async def lifespan(app: FastAPI):
print("π Initializing Sentinel Arbitrage Engine v10.0...")
async with httpx.AsyncClient() as client:
app.state.price_fetcher = PriceFetcher(client)
app.state.arbitrage_analyzer = ArbitrageAnalyzer(client)
arbitrage_task = asyncio.create_task(
run_arbitrage_detector(app.state.price_fetcher, app.state.arbitrage_analyzer)
)
print("β
Engine is online and hunting for opportunities.")
yield
print("β³ Shutting down engine...")
arbitrage_task.cancel()
try: await arbitrage_task
except asyncio.CancelledError: print("Engine shut down gracefully.")
async def run_arbitrage_detector(price_fetcher, analyzer):
"""The core engine loop. Directly emits events via Socket.IO."""
last_opportunity_time = 0
while True:
try:
await price_fetcher.update_prices_async()
prices = price_fetcher.get_current_prices()
pyth_price = prices.get("pyth")
chainlink_price = prices.get("chainlink_agg")
if pyth_price and chainlink_price:
spread = abs(pyth_price - chainlink_price) / chainlink_price
if spread > OPPORTUNITY_THRESHOLD:
current_time = time.time()
if current_time - last_opportunity_time > 30: # Throttle Gemini calls
last_opportunity_time = current_time
opportunity = {
"pyth_price": pyth_price, "chainlink_price": chainlink_price, "spread_pct": spread * 100
}
print(f"β‘οΈ Discrepancy Detected: {opportunity['spread_pct']:.3f}%")
briefing = await analyzer.get_alpha_briefing(opportunity)
if briefing:
signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
await sio.emit('new_signal', signal)
print(f"β
Signal Emitted: {signal['strategy']}")
except Exception as e:
print(f"β ERROR in engine loop: {e}")
await asyncio.sleep(15)
# --- FastAPI App Setup ---
app = FastAPI(lifespan=lifespan)
@sio.event
async def connect(sid, environ):
print(f"β
Client connected: {sid}")
@sio.event
async def disconnect(sid):
print(f"π₯ Client disconnected: {sid}")
# Mount the Socket.IO app on a specific path
app.mount('/socket.io', socket_app)
# Serve the static files (like index.html) from the 'static' directory
app.mount("/", StaticFiles(directory="static", html=True), name="static") |