Spaces:
Sleeping
Sleeping
""" | |
Sentinel Arbitrage Engine - v5.0 FINAL (Anti-Geoblock) | |
Detects on-chain vs. off-chain price discrepancies and provides AI-powered analysis. | |
""" | |
import asyncio | |
import os | |
from contextlib import asynccontextmanager | |
from datetime import datetime, timezone | |
import json | |
import httpx | |
from fastapi import FastAPI, Request | |
from fastapi.responses import HTMLResponse, StreamingResponse | |
from fastapi.templating import Jinja2Templates | |
from .price_fetcher import PriceFetcher | |
from .arbitrage_analyzer import ArbitrageAnalyzer | |
OPPORTUNITY_THRESHOLD = 0.001 # 0.1% price difference to trigger a signal | |
async def lifespan(app: FastAPI): | |
async with httpx.AsyncClient() as client: | |
app.state.price_fetcher = PriceFetcher(client=client) | |
app.state.arbitrage_analyzer = ArbitrageAnalyzer(client=client) | |
app.state.signal_queue: asyncio.Queue = asyncio.Queue() | |
arbitrage_task = asyncio.create_task(run_arbitrage_detector(app, 10)) | |
print("π Sentinel Arbitrage Engine v5.0 started successfully.") | |
yield | |
print("β³ Shutting down engine...") | |
arbitrage_task.cancel() | |
try: await arbitrage_task | |
except asyncio.CancelledError: print("Engine shut down.") | |
async def run_arbitrage_detector(app: FastAPI, interval_seconds: int): | |
while True: | |
await app.state.price_fetcher.update_prices_async() | |
prices = app.state.price_fetcher.get_current_prices() | |
on_chain = prices.get("on_chain_pyth") | |
off_chain = prices.get("off_chain_agg") | |
if on_chain and off_chain: | |
spread = abs(on_chain - off_chain) / off_chain | |
if spread > OPPORTUNITY_THRESHOLD: | |
opportunity = { | |
"id": f"{int(datetime.now().timestamp())}", | |
"on_chain_price": on_chain, | |
"off_chain_price": off_chain, | |
"spread_pct": spread * 100 | |
} | |
print(f"β‘οΈ Discrepancy Detected: {opportunity['spread_pct']:.3f}%") | |
briefing = await app.state.arbitrage_analyzer.get_alpha_briefing(opportunity) | |
if briefing: | |
signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()} | |
await app.state.signal_queue.put(signal) | |
await asyncio.sleep(interval_seconds) | |
app = FastAPI(title="Sentinel Arbitrage Engine", lifespan=lifespan) | |
templates = Jinja2Templates(directory="templates") | |
def render_signal_card(payload: dict) -> str: | |
s = payload | |
time_str = datetime.fromisoformat(s['timestamp']).strftime('%H:%M:%S UTC') | |
on_chain_class = "buy" if s['on_chain_price'] < s['off_chain_price'] else "sell" | |
off_chain_class = "sell" if s['on_chain_price'] < s['off_chain_price'] else "buy" | |
return f""" | |
<tr id="trade-row-{s['id']}" hx-swap-oob="afterbegin:#opportunities-table"> | |
<td><strong>BTC/USD</strong></td> | |
<td><span class="{on_chain_class}">On-Chain (Pyth)</span><br>${s['on_chain_price']:,.2f}</td> | |
<td><span class="{off_chain_class}">Off-Chain (Agg)</span><br>${s['off_chain_price']:,.2f}</td> | |
<td><strong>{s['spread_pct']:.3f}%</strong></td> | |
<td><span class="risk-{s.get('risk', 'low').lower()}">{s.get('risk', 'N/A')}</span></td> | |
<td>{s.get('rationale', 'N/A')}</td> | |
<td><button class="trade-btn">{s.get('strategy', 'N/A')}</button></td> | |
</tr> | |
<div id="last-update-time" hx-swap-oob="true">{time_str}</div> | |
""" | |
async def serve_dashboard(request: Request): | |
return templates.TemplateResponse("index.html", {"request": request}) | |
async def signal_stream(request: Request): | |
queue: asyncio.Queue = request.app.state.signal_queue | |
async def event_generator(): | |
while True: | |
payload = await queue.get() | |
html_card = render_signal_card(payload) | |
data_payload = html_card.replace('\n', ' ').strip() | |
yield f"event: message\ndata: {data_payload}\n\n" | |
return StreamingResponse(event_generator(), media_type="text/event-stream") |