Spaces:
Running
Running
""" | |
Sentinel Arbitrage Engine - v20.1 FINAL (Syntax Fix) | |
This version uses a file-based log and a raw, unstoppable streaming | |
endpoint to guarantee signal delivery. This version corrects the async | |
generator syntax. | |
""" | |
import asyncio | |
import os | |
import json | |
import time | |
from contextlib import asynccontextmanager | |
from datetime import datetime, timezone | |
import httpx | |
from fastapi import FastAPI, Request | |
from fastapi.responses import StreamingResponse | |
from fastapi.staticfiles import StaticFiles | |
# --- RELATIVE IMPORTS --- | |
# This assumes your project is structured with an 'app' package. | |
from .price_fetcher import PriceFetcher | |
from .arbitrage_analyzer import ArbitrageAnalyzer | |
OPPORTUNITY_THRESHOLD = 0.0015 | |
SIGNALS_FILE = "signals.log" | |
async def lifespan(app: FastAPI): | |
if os.path.exists(SIGNALS_FILE): | |
os.remove(SIGNALS_FILE) | |
async with httpx.AsyncClient() as client: | |
price_fetcher = PriceFetcher(client) | |
arbitrage_analyzer = ArbitrageAnalyzer(client) | |
app.state.engine_task = asyncio.create_task( | |
run_arbitrage_detector(price_fetcher, arbitrage_analyzer) | |
) | |
print("π Sentinel Engine v20.1 (Syntax Fix) is ONLINE.") | |
yield | |
app.state.engine_task.cancel() | |
print("β Engine shut down.") | |
async def run_arbitrage_detector(price_fetcher, analyzer): | |
"""The core engine loop; writes signals to a log file.""" | |
while True: | |
try: | |
await price_fetcher.update_prices_async() | |
all_prices = price_fetcher.get_all_prices() | |
for asset, prices in all_prices.items(): | |
pyth_price = prices.get("pyth") | |
chainlink_price = prices.get("chainlink_agg") | |
if pyth_price and chainlink_price and pyth_price > 0: | |
spread = abs(pyth_price - chainlink_price) / chainlink_price | |
if spread > OPPORTUNITY_THRESHOLD: | |
current_time = time.time() | |
if not hasattr(analyzer, 'last_call') or current_time - analyzer.last_call.get(asset, 0) > 60: | |
analyzer.last_call = getattr(analyzer, 'last_call', {}) | |
analyzer.last_call[asset] = current_time | |
opportunity = {"asset": asset, "pyth_price": pyth_price, "chainlink_price": chainlink_price, "spread_pct": spread * 100} | |
briefing = await analyzer.get_alpha_briefing(asset, opportunity) | |
if briefing: | |
signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()} | |
with open(SIGNALS_FILE, "a") as f: | |
f.write(json.dumps(signal) + "\n") | |
print(f"β Signal LOGGED for {asset}") | |
except Exception as e: | |
print(f"β ERROR in engine loop: {e}") | |
await asyncio.sleep(15) | |
# --- FastAPI App Setup --- | |
app = FastAPI(lifespan=lifespan) | |
# --- API Endpoints --- | |
async def signal_stream(): | |
"""This endpoint streams the content of the signals.log file line by line.""" | |
async def event_generator(): | |
try: | |
with open(SIGNALS_FILE, 'r') as f: | |
# Go to the end of the file | |
f.seek(0, 2) | |
while True: | |
line = f.readline() | |
if not line: | |
await asyncio.sleep(0.5) # Wait for new lines | |
continue | |
# Stream the line to the client | |
yield line | |
except FileNotFoundError: | |
print("signals.log not created yet, client connected early.") | |
# Keep the connection open and wait for the file to be created | |
while not os.path.exists(SIGNALS_FILE): | |
await asyncio.sleep(1) | |
# ==================================================================== | |
# THE CRITICAL FIX IS HERE | |
# ==================================================================== | |
# Use 'async for' to correctly delegate to the async generator | |
async for line in event_generator(): | |
yield line | |
# ==================================================================== | |
return StreamingResponse(event_generator(), media_type="text/plain") | |
# Mount static files to serve index.html at the root | |
app.mount("/", StaticFiles(directory="static", html=True), name="static") |