mgbam's picture
Update app/main.py
c1c0c08 verified
raw
history blame
4.18 kB
"""
Sentinel Arbitrage Engine - v20.0 FINAL (Raw Stream)
This version uses a file-based log and a raw, unstoppable streaming
endpoint to guarantee signal delivery. This is the final architecture.
"""
import asyncio
import os
import json
import time
from contextlib import asynccontextmanager
from datetime import datetime, timezone
import httpx
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import StreamingResponse
from .price_fetcher import PriceFetcher
from .arbitrage_analyzer import ArbitrageAnalyzer
OPPORTUNITY_THRESHOLD = 0.0015
SIGNALS_FILE = "signals.log"
@asynccontextmanager
async def lifespan(app: FastAPI):
if os.path.exists(SIGNALS_FILE): os.remove(SIGNALS_FILE)
async with httpx.AsyncClient() as client:
price_fetcher = PriceFetcher(client)
arbitrage_analyzer = ArbitrageAnalyzer(client)
app.state.engine_task = asyncio.create_task(
run_arbitrage_detector(price_fetcher, arbitrage_analyzer)
)
print("πŸš€ Sentinel Engine v20.0 (Raw Stream) is ONLINE.")
yield
app.state.engine_task.cancel()
print("βœ… Engine shut down.")
async def run_arbitrage_detector(price_fetcher, analyzer):
"""The core engine loop; writes signals to a log file."""
while True:
try:
await price_fetcher.update_prices_async()
all_prices = price_fetcher.get_all_prices()
for asset, prices in all_prices.items():
pyth_price = prices.get("pyth")
chainlink_price = prices.get("chainlink_agg")
if pyth_price and chainlink_price and pyth_price > 0:
spread = abs(pyth_price - chainlink_price) / chainlink_price
if spread > OPPORTUNITY_THRESHOLD:
current_time = time.time()
if not hasattr(analyzer, 'last_call') or current_time - analyzer.last_call.get(asset, 0) > 60:
analyzer.last_call = getattr(analyzer, 'last_call', {})
analyzer.last_call[asset] = current_time
opportunity = {"asset": asset, "pyth_price": pyth_price, "chainlink_price": chainlink_price, "spread_pct": spread * 100}
briefing = await analyzer.get_alpha_briefing(asset, opportunity)
if briefing:
signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
# --- THE FIX: Append as a single line to a log file ---
with open(SIGNALS_FILE, "a") as f:
f.write(json.dumps(signal) + "\n")
print(f"βœ… Signal LOGGED for {asset}")
except Exception as e:
print(f"❌ ERROR in engine loop: {e}")
await asyncio.sleep(15)
# --- FastAPI App Setup ---
app = FastAPI(lifespan=lifespan)
@app.get("/api/signals/stream")
async def signal_stream():
"""This endpoint streams the content of the signals.log file line by line."""
async def event_generator():
# Use an async-compatible way to tail the file
try:
with open(SIGNALS_FILE, 'r') as f:
f.seek(0, 2) # Go to the end of the file
while True:
line = f.readline()
if not line:
await asyncio.sleep(0.5) # Wait for new lines
continue
# Stream the line to the client
yield line
except FileNotFoundError:
print("signals.log not created yet, client connected early.")
# Keep the connection open and wait for the file to be created
while not os.path.exists(SIGNALS_FILE):
await asyncio.sleep(1)
# Re-run the generator now that the file exists
yield from event_generator()
return StreamingResponse(event_generator(), media_type="text/plain")
app.mount("/", StaticFiles(directory="static", html=True), name="static")