mgbam commited on
Commit
faf277f
Β·
verified Β·
1 Parent(s): e5df665

Update app/app.py

Browse files
Files changed (1) hide show
  1. app/app.py +30 -23
app/app.py CHANGED
@@ -1,8 +1,8 @@
1
  """
2
- Sentinel Arbitrage Engine - v8.0 FINAL (Multi-Oracle)
3
 
4
- Detects and analyzes price dislocations between major decentralized oracles.
5
- This architecture is robust and immune to CEX geoblocking.
6
  """
7
  import asyncio
8
  import os
@@ -14,28 +14,32 @@ from fastapi import FastAPI, Request
14
  from fastapi.responses import HTMLResponse, StreamingResponse
15
  from fastapi.templating import Jinja2Templates
16
 
 
17
  from .price_fetcher import PriceFetcher
18
  from .arbitrage_analyzer import ArbitrageAnalyzer
 
19
 
20
- OPPORTUNITY_THRESHOLD = 0.001 # 0.1% price difference
21
 
22
  @asynccontextmanager
23
  async def lifespan(app: FastAPI):
24
- async with httpx.AsyncClient() as client:
25
- app.state.price_fetcher = PriceFetcher(client=client)
26
- app.state.arbitrage_analyzer = ArbitrageAnalyzer(client=client)
27
- app.state.signal_queue: asyncio.Queue = asyncio.Queue()
28
- arbitrage_task = asyncio.create_task(run_arbitrage_detector(app, 15)) # Check every 15 seconds
29
-
30
- print("πŸš€ Sentinel Arbitrage Engine v8.0 (Multi-Oracle) started.")
31
- yield
32
-
33
- print("⏳ Shutting down engine...")
34
- arbitrage_task.cancel()
35
- try: await arbitrage_task
36
- except asyncio.CancelledError: print("Engine shut down.")
 
37
 
38
- async def run_arbitrage_detector(app: FastAPI, interval_seconds: int):
 
39
  while True:
40
  await app.state.price_fetcher.update_prices_async()
41
  prices = app.state.price_fetcher.get_current_prices()
@@ -51,18 +55,20 @@ async def run_arbitrage_detector(app: FastAPI, interval_seconds: int):
51
  "pyth_price": pyth_price, "chainlink_price": chainlink_price,
52
  "spread_pct": spread * 100
53
  }
54
- print(f"⚑️ Oracle Dislocation Detected: {opportunity['spread_pct']:.3f}%")
55
  briefing = await app.state.arbitrage_analyzer.get_alpha_briefing(opportunity)
56
  if briefing:
57
  signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
58
- await app.state.signal_queue.put(signal)
 
 
59
 
60
- await asyncio.sleep(interval_seconds)
61
 
62
  app = FastAPI(title="Sentinel Arbitrage Engine", lifespan=lifespan)
63
  templates = Jinja2Templates(directory="templates")
64
 
65
  def render_signal_card(payload: dict) -> str:
 
66
  s = payload
67
  time_str = datetime.fromisoformat(s['timestamp']).strftime('%H:%M:%S UTC')
68
  pyth_class = "buy" if s['pyth_price'] < s['chainlink_price'] else "sell"
@@ -87,10 +93,11 @@ async def serve_dashboard(request: Request):
87
 
88
  @app.get("/api/signals/stream")
89
  async def signal_stream(request: Request):
90
- queue: asyncio.Queue = request.app.state.signal_queue
91
  async def event_generator():
92
  while True:
93
- payload = await queue.get()
 
94
  html_card = render_signal_card(payload)
95
  data_payload = html_card.replace('\n', ' ').strip()
96
  yield f"event: message\ndata: {data_payload}\n\n"
 
1
  """
2
+ Sentinel Arbitrage Engine - v9.0 FINAL (Singleton Broker)
3
 
4
+ This version uses a singleton message broker to ensure robust, stateful
5
+ communication between the background engine and the API endpoints.
6
  """
7
  import asyncio
8
  import os
 
14
  from fastapi.responses import HTMLResponse, StreamingResponse
15
  from fastapi.templating import Jinja2Templates
16
 
17
+ # Import our local modules AND the new singleton broker
18
  from .price_fetcher import PriceFetcher
19
  from .arbitrage_analyzer import ArbitrageAnalyzer
20
+ from .broker import signal_broker
21
 
22
+ OPPORTUNITY_THRESHOLD = 0.001
23
 
24
  @asynccontextmanager
25
  async def lifespan(app: FastAPI):
26
+ # We no longer need to manage the queue in the app state.
27
+ # The broker exists independently.
28
+ app.state.price_fetcher = PriceFetcher(httpx.AsyncClient())
29
+ app.state.arbitrage_analyzer = ArbitrageAnalyzer(httpx.AsyncClient())
30
+
31
+ arbitrage_task = asyncio.create_task(run_arbitrage_detector(app))
32
+
33
+ print("πŸš€ Sentinel Arbitrage Engine v9.0 (Singleton Broker) started.")
34
+ yield
35
+
36
+ print("⏳ Shutting down engine...")
37
+ arbitrage_task.cancel()
38
+ try: await arbitrage_task
39
+ except asyncio.CancelledError: print("Engine shut down.")
40
 
41
+ async def run_arbitrage_detector(app: FastAPI):
42
+ # Now this function uses the globally imported signal_broker
43
  while True:
44
  await app.state.price_fetcher.update_prices_async()
45
  prices = app.state.price_fetcher.get_current_prices()
 
55
  "pyth_price": pyth_price, "chainlink_price": chainlink_price,
56
  "spread_pct": spread * 100
57
  }
 
58
  briefing = await app.state.arbitrage_analyzer.get_alpha_briefing(opportunity)
59
  if briefing:
60
  signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
61
+ # Put the signal into the singleton broker's queue
62
+ await signal_broker.queue.put(signal)
63
+ print(f"βœ… Signal Queued: {signal['spread_pct']:.3f}% Discrepancy")
64
 
65
+ await asyncio.sleep(15) # Check every 15 seconds
66
 
67
  app = FastAPI(title="Sentinel Arbitrage Engine", lifespan=lifespan)
68
  templates = Jinja2Templates(directory="templates")
69
 
70
  def render_signal_card(payload: dict) -> str:
71
+ # This rendering function remains the same
72
  s = payload
73
  time_str = datetime.fromisoformat(s['timestamp']).strftime('%H:%M:%S UTC')
74
  pyth_class = "buy" if s['pyth_price'] < s['chainlink_price'] else "sell"
 
93
 
94
  @app.get("/api/signals/stream")
95
  async def signal_stream(request: Request):
96
+ # This endpoint now also imports and uses the singleton broker
97
  async def event_generator():
98
  while True:
99
+ # Get the signal from the singleton broker's queue
100
+ payload = await signal_broker.queue.get()
101
  html_card = render_signal_card(payload)
102
  data_payload = html_card.replace('\n', ' ').strip()
103
  yield f"event: message\ndata: {data_payload}\n\n"