mgbam's picture
Update app/app.py
b50b626 verified
raw
history blame
4.25 kB
"""
Sentinel Arbitrage Engine - v8.0 FINAL (Multi-Oracle)
Detects and analyzes price dislocations between major decentralized oracles.
This architecture is robust and immune to CEX geoblocking.
"""
import asyncio
import os
from contextlib import asynccontextmanager
from datetime import datetime, timezone
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from .price_fetcher import PriceFetcher
from .arbitrage_analyzer import ArbitrageAnalyzer
OPPORTUNITY_THRESHOLD = 0.001 # 0.1% price difference
@asynccontextmanager
async def lifespan(app: FastAPI):
async with httpx.AsyncClient() as client:
app.state.price_fetcher = PriceFetcher(client=client)
app.state.arbitrage_analyzer = ArbitrageAnalyzer(client=client)
app.state.signal_queue: asyncio.Queue = asyncio.Queue()
arbitrage_task = asyncio.create_task(run_arbitrage_detector(app, 15)) # Check every 15 seconds
print("πŸš€ Sentinel Arbitrage Engine v8.0 (Multi-Oracle) started.")
yield
print("⏳ Shutting down engine...")
arbitrage_task.cancel()
try: await arbitrage_task
except asyncio.CancelledError: print("Engine shut down.")
async def run_arbitrage_detector(app: FastAPI, interval_seconds: int):
while True:
await app.state.price_fetcher.update_prices_async()
prices = app.state.price_fetcher.get_current_prices()
pyth_price = prices.get("pyth")
chainlink_price = prices.get("chainlink_agg")
if pyth_price and chainlink_price:
spread = abs(pyth_price - chainlink_price) / chainlink_price
if spread > OPPORTUNITY_THRESHOLD:
opportunity = {
"id": f"{int(datetime.now().timestamp())}",
"pyth_price": pyth_price, "chainlink_price": chainlink_price,
"spread_pct": spread * 100
}
print(f"⚑️ Oracle Dislocation Detected: {opportunity['spread_pct']:.3f}%")
briefing = await app.state.arbitrage_analyzer.get_alpha_briefing(opportunity)
if briefing:
signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
await app.state.signal_queue.put(signal)
await asyncio.sleep(interval_seconds)
app = FastAPI(title="Sentinel Arbitrage Engine", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")
def render_signal_card(payload: dict) -> str:
s = payload
time_str = datetime.fromisoformat(s['timestamp']).strftime('%H:%M:%S UTC')
pyth_class = "buy" if s['pyth_price'] < s['chainlink_price'] else "sell"
chainlink_class = "sell" if s['pyth_price'] < s['chainlink_price'] else "buy"
return f"""
<tr id="trade-row-{s['id']}" hx-swap-oob="afterbegin:#opportunities-table">
<td><strong>BTC/USD</strong></td>
<td><span class="{pyth_class}">Pyth Network</span><br>${s['pyth_price']:,.2f}</td>
<td><span class="{chainlink_class}">Chainlink Agg.</span><br>${s['chainlink_price']:,.2f}</td>
<td><strong>{s['spread_pct']:.3f}%</strong></td>
<td><span class="risk-{s.get('risk', 'low').lower()}">{s.get('risk', 'N/A')}</span></td>
<td>{s.get('rationale', 'N/A')}</td>
<td><button class="trade-btn">{s.get('strategy', 'N/A')}</button></td>
</tr>
<div id="last-update-time" hx-swap-oob="true">{time_str}</div>
"""
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/signals/stream")
async def signal_stream(request: Request):
queue: asyncio.Queue = request.app.state.signal_queue
async def event_generator():
while True:
payload = await queue.get()
html_card = render_signal_card(payload)
data_payload = html_card.replace('\n', ' ').strip()
yield f"event: message\ndata: {data_payload}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")