mgbam commited on
Commit
69513b3
·
verified ·
1 Parent(s): 0b00485

Update app/app.py

Browse files
Files changed (1) hide show
  1. app/app.py +35 -28
app/app.py CHANGED
@@ -1,8 +1,8 @@
1
  """
2
- Sentinel Arbitrage Engine - v6.0 FINAL (Rate-Limit Proof)
3
 
4
- Detects on-chain vs. off-chain price discrepancies using a fault-tolerant,
5
- multi-source data aggregator.
6
  """
7
  import asyncio
8
  import os
@@ -25,59 +25,64 @@ async def lifespan(app: FastAPI):
25
  app.state.price_fetcher = PriceFetcher(client=client)
26
  app.state.arbitrage_analyzer = ArbitrageAnalyzer(client=client)
27
  app.state.signal_queue: asyncio.Queue = asyncio.Queue()
28
- # Slowing down the loop slightly to be respectful to APIs
29
- arbitrage_task = asyncio.create_task(run_arbitrage_detector(app, 10))
30
 
31
- print("🚀 Sentinel Arbitrage Engine v6.0 started successfully.")
 
 
 
 
 
32
  yield
33
 
34
  print("⏳ Shutting down engine...")
 
35
  arbitrage_task.cancel()
36
- try: await arbitrage_task
37
  except asyncio.CancelledError: print("Engine shut down.")
38
 
39
  async def run_arbitrage_detector(app: FastAPI, interval_seconds: int):
40
- # This loop logic remains the same, but it's now fed by a more reliable PriceFetcher
41
  while True:
42
- await app.state.price_fetcher.update_prices_async()
43
- prices = app.state.price_fetcher.get_current_prices()
44
 
45
  on_chain = prices.get("on_chain_pyth")
46
- off_chain = prices.get("off_chain_agg")
47
 
48
  if on_chain and off_chain:
49
  spread = abs(on_chain - off_chain) / off_chain
50
  if spread > OPPORTUNITY_THRESHOLD:
51
- opportunity = {
52
- "id": f"{int(datetime.now().timestamp())}",
53
- "on_chain_price": on_chain,
54
- "off_chain_price": off_chain,
55
- "spread_pct": spread * 100
56
- }
57
- print(f"⚡️ Discrepancy Detected: {opportunity['spread_pct']:.3f}%")
58
- briefing = await app.state.arbitrage_analyzer.get_alpha_briefing(opportunity)
59
- if briefing:
60
- signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
61
- await app.state.signal_queue.put(signal)
 
 
 
62
 
63
  await asyncio.sleep(interval_seconds)
64
 
65
  app = FastAPI(title="Sentinel Arbitrage Engine", lifespan=lifespan)
66
  templates = Jinja2Templates(directory="templates")
67
 
68
- # No changes needed to render_signal_card, GET /, or the SSE stream endpoint.
69
- # The code below this line is the same as the previous version.
70
  def render_signal_card(payload: dict) -> str:
71
  s = payload
72
  time_str = datetime.fromisoformat(s['timestamp']).strftime('%H:%M:%S UTC')
73
  on_chain_class = "buy" if s['on_chain_price'] < s['off_chain_price'] else "sell"
74
  off_chain_class = "sell" if s['on_chain_price'] < s['off_chain_price'] else "buy"
75
-
76
  return f"""
77
  <tr id="trade-row-{s['id']}" hx-swap-oob="afterbegin:#opportunities-table">
78
  <td><strong>BTC/USD</strong></td>
79
- <td><span class="{on_chain_class}">On-Chain (Pyth)</span><br>${s['on_chain_price']:,.2f}</td>
80
- <td><span class="{off_chain_class}">Off-Chain (Agg)</span><br>${s['off_chain_price']:,.2f}</td>
81
  <td><strong>{s['spread_pct']:.3f}%</strong></td>
82
  <td><span class="risk-{s.get('risk', 'low').lower()}">{s.get('risk', 'N/A')}</span></td>
83
  <td>{s.get('rationale', 'N/A')}</td>
@@ -99,4 +104,6 @@ async def signal_stream(request: Request):
99
  html_card = render_signal_card(payload)
100
  data_payload = html_card.replace('\n', ' ').strip()
101
  yield f"event: message\ndata: {data_payload}\n\n"
102
- return StreamingResponse(event_generator(), media_type="text/event-stream")
 
 
 
1
  """
2
+ Sentinel Arbitrage Engine - v7.0 FINAL (WebSocket-Powered)
3
 
4
+ Utilizes a real-time WebSocket data stream for high-frequency discrepancy
5
+ detection between CeFi and DeFi price sources.
6
  """
7
  import asyncio
8
  import os
 
25
  app.state.price_fetcher = PriceFetcher(client=client)
26
  app.state.arbitrage_analyzer = ArbitrageAnalyzer(client=client)
27
  app.state.signal_queue: asyncio.Queue = asyncio.Queue()
 
 
28
 
29
+ # Launch the persistent WebSocket listener
30
+ websocket_task = asyncio.create_task(app.state.price_fetcher.run_websocket_listener())
31
+ # Launch the discrepancy detector
32
+ arbitrage_task = asyncio.create_task(run_arbitrage_detector(app, 2)) # Can run much faster now!
33
+
34
+ print("🚀 Sentinel Arbitrage Engine v7.0 (WebSocket) started successfully.")
35
  yield
36
 
37
  print("⏳ Shutting down engine...")
38
+ websocket_task.cancel()
39
  arbitrage_task.cancel()
40
+ try: await asyncio.gather(websocket_task, arbitrage_task)
41
  except asyncio.CancelledError: print("Engine shut down.")
42
 
43
  async def run_arbitrage_detector(app: FastAPI, interval_seconds: int):
44
+ last_opportunity_time = 0
45
  while True:
46
+ # This now gets the latest price from the websocket and a fresh price from Pyth
47
+ prices = await app.state.price_fetcher.get_current_prices()
48
 
49
  on_chain = prices.get("on_chain_pyth")
50
+ off_chain = prices.get("realtime_binance")
51
 
52
  if on_chain and off_chain:
53
  spread = abs(on_chain - off_chain) / off_chain
54
  if spread > OPPORTUNITY_THRESHOLD:
55
+ # Throttle Gemini calls to once every 30 seconds max
56
+ current_time = time.time()
57
+ if current_time - last_opportunity_time > 30:
58
+ last_opportunity_time = current_time
59
+ opportunity = {
60
+ "id": f"{int(current_time)}",
61
+ "on_chain_price": on_chain, "off_chain_price": off_chain,
62
+ "spread_pct": spread * 100
63
+ }
64
+ print(f"⚡️ Discrepancy Detected: {opportunity['spread_pct']:.3f}%")
65
+ briefing = await app.state.arbitrage_analyzer.get_alpha_briefing(opportunity)
66
+ if briefing:
67
+ signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
68
+ await app.state.signal_queue.put(signal)
69
 
70
  await asyncio.sleep(interval_seconds)
71
 
72
  app = FastAPI(title="Sentinel Arbitrage Engine", lifespan=lifespan)
73
  templates = Jinja2Templates(directory="templates")
74
 
 
 
75
  def render_signal_card(payload: dict) -> str:
76
  s = payload
77
  time_str = datetime.fromisoformat(s['timestamp']).strftime('%H:%M:%S UTC')
78
  on_chain_class = "buy" if s['on_chain_price'] < s['off_chain_price'] else "sell"
79
  off_chain_class = "sell" if s['on_chain_price'] < s['off_chain_price'] else "buy"
80
+
81
  return f"""
82
  <tr id="trade-row-{s['id']}" hx-swap-oob="afterbegin:#opportunities-table">
83
  <td><strong>BTC/USD</strong></td>
84
+ <td><span class="{off_chain_class}">CEX (Binance)</span><br>${s['off_chain_price']:,.2f}</td>
85
+ <td><span class="{on_chain_class}">DEX (Pyth)</span><br>${s['on_chain_price']:,.2f}</td>
86
  <td><strong>{s['spread_pct']:.3f}%</strong></td>
87
  <td><span class="risk-{s.get('risk', 'low').lower()}">{s.get('risk', 'N/A')}</span></td>
88
  <td>{s.get('rationale', 'N/A')}</td>
 
104
  html_card = render_signal_card(payload)
105
  data_payload = html_card.replace('\n', ' ').strip()
106
  yield f"event: message\ndata: {data_payload}\n\n"
107
+ return StreamingResponse(event_generator(), media_type="text/event-stream")
108
+
109
+ # Need to add 'import time' at the top of main.py