mgbam's picture
Update app/main.py
0323e8e verified
"""
Sentinel Arbitrage Engine - v20.1 FINAL (Syntax Fix)
This version uses a file-based log and a raw, unstoppable streaming
endpoint to guarantee signal delivery. This version corrects the async
generator syntax.
"""
import asyncio
import os
import json
import time
from contextlib import asynccontextmanager
from datetime import datetime, timezone
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from fastapi.staticfiles import StaticFiles
# --- RELATIVE IMPORTS ---
# This assumes your project is structured with an 'app' package.
from .price_fetcher import PriceFetcher
from .arbitrage_analyzer import ArbitrageAnalyzer
OPPORTUNITY_THRESHOLD = 0.0015
SIGNALS_FILE = "signals.log"
@asynccontextmanager
async def lifespan(app: FastAPI):
if os.path.exists(SIGNALS_FILE):
os.remove(SIGNALS_FILE)
async with httpx.AsyncClient() as client:
price_fetcher = PriceFetcher(client)
arbitrage_analyzer = ArbitrageAnalyzer(client)
app.state.engine_task = asyncio.create_task(
run_arbitrage_detector(price_fetcher, arbitrage_analyzer)
)
print("πŸš€ Sentinel Engine v20.1 (Syntax Fix) is ONLINE.")
yield
app.state.engine_task.cancel()
print("βœ… Engine shut down.")
async def run_arbitrage_detector(price_fetcher, analyzer):
"""The core engine loop; writes signals to a log file."""
while True:
try:
await price_fetcher.update_prices_async()
all_prices = price_fetcher.get_all_prices()
for asset, prices in all_prices.items():
pyth_price = prices.get("pyth")
chainlink_price = prices.get("chainlink_agg")
if pyth_price and chainlink_price and pyth_price > 0:
spread = abs(pyth_price - chainlink_price) / chainlink_price
if spread > OPPORTUNITY_THRESHOLD:
current_time = time.time()
if not hasattr(analyzer, 'last_call') or current_time - analyzer.last_call.get(asset, 0) > 60:
analyzer.last_call = getattr(analyzer, 'last_call', {})
analyzer.last_call[asset] = current_time
opportunity = {"asset": asset, "pyth_price": pyth_price, "chainlink_price": chainlink_price, "spread_pct": spread * 100}
briefing = await analyzer.get_alpha_briefing(asset, opportunity)
if briefing:
signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
with open(SIGNALS_FILE, "a") as f:
f.write(json.dumps(signal) + "\n")
print(f"βœ… Signal LOGGED for {asset}")
except Exception as e:
print(f"❌ ERROR in engine loop: {e}")
await asyncio.sleep(15)
# --- FastAPI App Setup ---
app = FastAPI(lifespan=lifespan)
# --- API Endpoints ---
@app.get("/api/signals/stream")
async def signal_stream():
"""This endpoint streams the content of the signals.log file line by line."""
async def event_generator():
try:
with open(SIGNALS_FILE, 'r') as f:
# Go to the end of the file
f.seek(0, 2)
while True:
line = f.readline()
if not line:
await asyncio.sleep(0.5) # Wait for new lines
continue
# Stream the line to the client
yield line
except FileNotFoundError:
print("signals.log not created yet, client connected early.")
# Keep the connection open and wait for the file to be created
while not os.path.exists(SIGNALS_FILE):
await asyncio.sleep(1)
# ====================================================================
# THE CRITICAL FIX IS HERE
# ====================================================================
# Use 'async for' to correctly delegate to the async generator
async for line in event_generator():
yield line
# ====================================================================
return StreamingResponse(event_generator(), media_type="text/plain")
# Mount static files to serve index.html at the root
app.mount("/", StaticFiles(directory="static", html=True), name="static")