""" Sentinel Arbitrage Engine - v20.0 FINAL (Raw Stream) This version uses a file-based log and a raw, unstoppable streaming endpoint to guarantee signal delivery. This is the final architecture. """ import asyncio import os import json import time from contextlib import asynccontextmanager from datetime import datetime, timezone import httpx from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.responses import StreamingResponse from .price_fetcher import PriceFetcher from .arbitrage_analyzer import ArbitrageAnalyzer OPPORTUNITY_THRESHOLD = 0.0015 SIGNALS_FILE = "signals.log" @asynccontextmanager async def lifespan(app: FastAPI): if os.path.exists(SIGNALS_FILE): os.remove(SIGNALS_FILE) async with httpx.AsyncClient() as client: price_fetcher = PriceFetcher(client) arbitrage_analyzer = ArbitrageAnalyzer(client) app.state.engine_task = asyncio.create_task( run_arbitrage_detector(price_fetcher, arbitrage_analyzer) ) print("🚀 Sentinel Engine v20.0 (Raw Stream) is ONLINE.") yield app.state.engine_task.cancel() print("✅ Engine shut down.") async def run_arbitrage_detector(price_fetcher, analyzer): """The core engine loop; writes signals to a log file.""" while True: try: await price_fetcher.update_prices_async() all_prices = price_fetcher.get_all_prices() for asset, prices in all_prices.items(): pyth_price = prices.get("pyth") chainlink_price = prices.get("chainlink_agg") if pyth_price and chainlink_price and pyth_price > 0: spread = abs(pyth_price - chainlink_price) / chainlink_price if spread > OPPORTUNITY_THRESHOLD: current_time = time.time() if not hasattr(analyzer, 'last_call') or current_time - analyzer.last_call.get(asset, 0) > 60: analyzer.last_call = getattr(analyzer, 'last_call', {}) analyzer.last_call[asset] = current_time opportunity = {"asset": asset, "pyth_price": pyth_price, "chainlink_price": chainlink_price, "spread_pct": spread * 100} briefing = await analyzer.get_alpha_briefing(asset, opportunity) if briefing: signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()} # --- THE FIX: Append as a single line to a log file --- with open(SIGNALS_FILE, "a") as f: f.write(json.dumps(signal) + "\n") print(f"✅ Signal LOGGED for {asset}") except Exception as e: print(f"❌ ERROR in engine loop: {e}") await asyncio.sleep(15) # --- FastAPI App Setup --- app = FastAPI(lifespan=lifespan) @app.get("/api/signals/stream") async def signal_stream(): """This endpoint streams the content of the signals.log file line by line.""" async def event_generator(): # Use an async-compatible way to tail the file try: with open(SIGNALS_FILE, 'r') as f: f.seek(0, 2) # Go to the end of the file while True: line = f.readline() if not line: await asyncio.sleep(0.5) # Wait for new lines continue # Stream the line to the client yield line except FileNotFoundError: print("signals.log not created yet, client connected early.") # Keep the connection open and wait for the file to be created while not os.path.exists(SIGNALS_FILE): await asyncio.sleep(1) # Re-run the generator now that the file exists yield from event_generator() return StreamingResponse(event_generator(), media_type="text/plain") app.mount("/", StaticFiles(directory="static", html=True), name="static")