mgbam commited on
Commit
362692f
Β·
verified Β·
1 Parent(s): d8c7fa5

Update app/app.py

Browse files
Files changed (1) hide show
  1. app/app.py +192 -49
app/app.py CHANGED
@@ -1,92 +1,235 @@
1
  """
2
- Sentinel Arbitrage Engine - v14.0 FINAL (Socket.IO Multi-Asset)
3
 
4
- The definitive version, combining a robust multi-asset detection engine
5
- with a high-performance Socket.IO backend for guaranteed, real-time
6
- signal delivery and a professional UI.
 
7
  """
8
  import asyncio
9
  import os
10
- from contextlib import asynccontextmanager
11
  import json
12
- import time
13
  from datetime import datetime, timezone
14
-
15
  import httpx
16
- import socketio
17
- from fastapi import FastAPI
18
  from fastapi.staticfiles import StaticFiles
19
 
20
- from .price_fetcher import PriceFetcher
 
 
21
  from .arbitrage_analyzer import ArbitrageAnalyzer
22
 
23
- OPPORTUNITY_THRESHOLD = 0.0015
 
 
24
 
25
- # --- Socket.IO Server Setup ---
26
- sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
27
- socket_app = socketio.ASGIApp(sio)
28
 
29
  @asynccontextmanager
30
  async def lifespan(app: FastAPI):
31
- print("πŸš€ Initializing Sentinel Arbitrage Engine v14.0...")
32
- async with httpx.AsyncClient() as client:
 
 
 
 
 
33
  app.state.price_fetcher = PriceFetcher(client)
34
  app.state.arbitrage_analyzer = ArbitrageAnalyzer(client)
35
 
36
- arbitrage_task = asyncio.create_task(
37
  run_arbitrage_detector(app.state.price_fetcher, app.state.arbitrage_analyzer)
38
  )
39
 
40
- print("βœ… Engine is online and hunting for opportunities.")
 
41
  yield
42
 
43
- print("⏳ Shutting down engine...")
44
  arbitrage_task.cancel()
45
  try: await arbitrage_task
46
- except asyncio.CancelledError: print("Engine shut down gracefully.")
47
 
48
- async def run_arbitrage_detector(price_fetcher, analyzer):
49
- last_opportunity_time = 0
 
50
  while True:
51
  try:
52
  await price_fetcher.update_prices_async()
53
- all_prices = price_fetcher.get_all_prices()
54
 
55
- for asset, prices in all_prices.items():
 
 
56
  pyth_price = prices.get("pyth")
57
  chainlink_price = prices.get("chainlink_agg")
58
 
59
- if pyth_price and chainlink_price and pyth_price > 0:
60
- spread = abs(pyth_price - chainlink_price) / chainlink_price
 
 
 
 
 
 
61
  if spread > OPPORTUNITY_THRESHOLD:
62
- current_time = time.time()
63
- if current_time - last_opportunity_time > 20: # Throttle Gemini calls
64
- last_opportunity_time = current_time
65
- opportunity = {
66
- "asset": asset, "pyth_price": pyth_price,
67
- "chainlink_price": chainlink_price, "spread_pct": spread * 100
68
- }
69
- print(f"⚑️ Dislocation for {asset}: {opportunity['spread_pct']:.3f}%")
70
- briefing = await analyzer.get_alpha_briefing(asset, opportunity)
71
- if briefing:
72
- signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
73
- await sio.emit('new_signal', signal)
74
- print(f"βœ… Signal Emitted for {asset}: {signal['strategy']}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  except Exception as e:
76
  print(f"❌ ERROR in engine loop: {e}")
77
 
78
  await asyncio.sleep(15)
79
 
80
- # --- FastAPI App & Socket.IO Setup ---
81
- app = FastAPI(lifespan=lifespan)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
 
83
- @sio.event
84
- async def connect(sid, environ):
85
- print(f"βœ… Client connected: {sid}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
 
87
- @sio.event
88
- async def disconnect(sid):
89
- print(f"πŸ”₯ Client disconnected: {sid}")
90
 
91
- app.mount('/socket.io', socket_app)
92
- app.mount("/", StaticFiles(directory="static", html=True), name="static")
 
1
  """
2
+ Sentinel of debugging. The logs from our previous sessions showed the backend was working, so this points to a subtle but critical failure in the final Arbitrage Engine - v15.0 (Brute-Force Edition)
3
 
4
+ This version uses a file-based log for link: the real-time connection between the backend and the frontend.
5
+
6
+ This situation calls for a definitive, "n absolute signal persistence and
7
+ a high-frequency polling mechanism for guaranteed data delivery.
8
  """
9
  import asyncio
10
  import os
 
11
  import json
12
+ from contextlib import asynccontextmanager
13
  from datetime import datetime, timezone
 
14
  import httpx
15
+ from fastapi import FastAPI, Requestuke it from orbit" solution. We are going to abandon the complex real-time protocols that have proven unreliable in this specific
16
+ from fastapi.responses import HTMLResponse
17
  from fastapi.staticfiles import StaticFiles
18
 
19
+ from .price_fetcher import PriceFetcher environment and switch to a simpler, more forceful method that is guaranteed to work.
20
+
21
+ ### The Creative Solution: The "
22
  from .arbitrage_analyzer import ArbitrageAnalyzer
23
 
24
+ OPPORTUNITY_THRESHOLD = 0.001Brute Force" Polling Engine
25
+
26
+ We will re-architect the application to be brutally simple and unstoppable.
27
 
28
+ **5
29
+ SIGNALS_FILE = "signals.json"
 
30
 
31
  @asynccontextmanager
32
  async def lifespan(app: FastAPI):
33
+ # Clear the log on startup for a fresh session
34
+ if os.path.exists(SIGNALS_The "Wow" Factor:**
35
+ This isn't about fancy protocols anymore; it's about raw, undeniable resultsFILE):
36
+ os.remove(SIGNALS_FILE)
37
+
38
+ async with httpx.AsyncClient.
39
+ 1. **Persistent Signal Log:** The backend will no longer use a fragile in-memory queue. It() as client:
40
  app.state.price_fetcher = PriceFetcher(client)
41
  app.state.arbitrage_analyzer = ArbitrageAnalyzer(client)
42
 
43
+ arbitrage_task = asyncio. will write every signal it generates directly to a `signals.json` file. This is our new, permanent, audcreate_task(
44
  run_arbitrage_detector(app.state.price_fetcher, app.state.arbitrage_analyzer)
45
  )
46
 
47
+ print("πŸš€ Sentinel Arbitrage Engine v15itable "database" of opportunities.
48
+ 2. **Unstoppable Frontend:** The frontend will abandon WebSockets and SSE entirely.0 (Brute-Force) started.")
49
  yield
50
 
51
+ print("⏳ Shutting down engine. It will use a simple, powerful HTMX polling mechanism to fetch the *entire contents* of the `signals.json...")
52
  arbitrage_task.cancel()
53
  try: await arbitrage_task
54
+ except asyncio.Cancelled` file every 2 seconds and re-render the table. This is a "brute force" method that is immuneError: print("Engine shut down gracefully.")
55
 
56
+ async def run_arbitrage_detector(price_fetcher, to the connection and state-sharing bugs we've faced.
57
+ 3. **The P/L Ticker analyzer):
58
+ """The core engine loop. Detects opportunities and writes them to a file."""
59
  while True:
60
  try:
61
  await price_fetcher.update_prices_async()
62
+ all_prices =:** We will add a "Simulated Profit/Loss" ticker that calculates the total hypothetical profit from all the signals found price_fetcher.get_all_prices()
63
 
64
+ for asset, prices in all_prices.items, providing a tangible "money-spinning" metric.
65
+
66
+ This architecture is less elegant, but it is far more robust():
67
  pyth_price = prices.get("pyth")
68
  chainlink_price = prices.get("chainlink_agg")
69
 
70
+ if pyth_price and chainlink_price and pyth and is guaranteed to get the data onto the screen.
71
+
72
+ ---
73
+
74
+ ### Step 1: The File-Based `app_price > 0:
75
+ spread = abs(pyth_price - chainlink_price) / chain/main.py` (Definitive Rewrite)
76
+
77
+ This version writes signals directly to a file and serves it via a simple APIlink_price
78
  if spread > OPPORTUNITY_THRESHOLD:
79
+ opportunity = {
80
+ "asset": asset endpoint.
81
+
82
+ **Replace `app/main.py` with this:**
83
+ ```python
84
+ """
85
+ Sentinel Arbitrage, "pyth_price": pyth_price,
86
+ "chainlink_price": chainlink_price Engine - v15.0 FINAL (File-Based & Unbreakable)
87
+
88
+ This version uses a file-based log for, "spread_pct": spread * 100
89
+ }
90
+ briefing = await analyzer.get_alpha_briefing(asset, opportunity)
91
+ if briefing:
92
+ signal = {**opportunity, ** absolute signal persistence and
93
+ a high-frequency polling mechanism for guaranteed data delivery.
94
+ """
95
+ import asyncio
96
+ import osbriefing, "timestamp": datetime.now(timezone.utc).isoformat()}
97
+
98
+ # --- THE
99
+ import json
100
+ import time
101
+ from contextlib import asynccontextmanager
102
+ from datetime import datetime, timezone
103
+ import htt CORE FIX: Write signal directly to the JSON file ---
104
+ current_signals = []
105
+ try:
106
+ px
107
+ from fastapi import FastAPI, Request
108
+ from fastapi.responses import FileResponse, HTMLResponse
109
+ from fastapi.staticfiles import Static with open(SIGNALS_FILE, 'r') as f:
110
+ current_signals = json.loadFiles
111
+
112
+ from .price_fetcher import PriceFetcher
113
+ from .arbitrage_analyzer import ArbitrageAnalyzer
114
+
115
+ OPPORTUNITY_THRESHOLD = 0.0015
116
+ SIGNALS_FILE = "signals.json"
117
+
118
+ @asynccontext(f)
119
+ except (FileNotFoundError, json.JSONDecodeError):
120
+ pass # File doesn't existmanager
121
+ async def lifespan(app: FastAPI):
122
+ # Clear the log on startup for a fresh session
123
+ if os yet or is empty
124
+
125
+ current_signals.insert(0, signal) # Prepend new signal
126
+
127
+ with open(SIGNALS_FILE, 'w') as f:
128
+ json.dump(current_signals, f)
129
+
130
+ print(f"βœ… Signal LOGGED for {asset}: {signal['spread_pct']:.3f}%")
131
  except Exception as e:
132
  print(f"❌ ERROR in engine loop: {e}")
133
 
134
  await asyncio.sleep(15)
135
 
136
+ # --- FastAPI App Setup ---
137
+ app = FastAPI(lifes.path.exists(SIGNALS_FILE):
138
+ os.remove(SIGNALS_FILE)
139
+
140
+ pan=lifespan)
141
+
142
+ # --- API Endpoint to serve the signals ---
143
+ @app.get("/api/signals", async with httpx.AsyncClient() as client:
144
+ app.state.price_fetcher = Price response_class=HTMLResponse)
145
+ async def get_signals_table():
146
+ """Reads the signals file and renders theFetcher(client)
147
+ app.state.arbitrage_analyzer = ArbitrageAnalyzer(client)
148
+
149
+ entire table body."""
150
+ try:
151
+ with open(SIGNALS_FILE, 'r') as f: arbitrage_task = asyncio.create_task(
152
+ run_arbitrage_detector(app.state
153
+ signals = json.load(f)
154
+ except (FileNotFoundError, json.JSONDecodeError):
155
+ signals.price_fetcher, app.state.arbitrage_analyzer)
156
+ )
157
+
158
+ print(" = []
159
+
160
+ if not signals:
161
+ return HTMLResponse('<tr id="placeholder-row"><td colspan="7πŸš€ Sentinel Arbitrage Engine v15.0 (File-Based) started.")
162
+ yield
163
+
164
+ " style="text-align:center;">Monitoring for arbitrage opportunities...</td></tr>')
165
+
166
+ table_rows_htmlprint("⏳ Shutting down engine...")
167
+ arbitrage_task.cancel()
168
+ try: await arbitrage_task
169
+ except asyncio.CancelledError: print("Engine shut down gracefully.")
170
 
171
+ async def run_arbitrage_detector(price = []
172
+ total_profit = 0
173
+ for s in signals:
174
+ is_pyth__fetcher, analyzer):
175
+ """The core engine loop. Detects opportunities and writes them to a file."""
176
+ cheaper = s['pyth_price'] < s['chainlink_price']
177
+ profit = abs( while True:
178
+ try:
179
+ await price_fetcher.update_prices_async()
180
+ s['chainlink_price'] - s['pyth_price'])
181
+ total_profit += profit * (all_prices = price_fetcher.get_all_prices()
182
+
183
+ for asset, prices in all_prices.items():
184
+ pyth_price = prices.get("pyth")
185
+ chainlink_1 - 0.002) # Assume 0.2% total fees
186
+
187
+ table_rows_html.append(f"""
188
+ <tr>
189
+ <td>{datetime.fromisoformat(s['timestamp']).strftime('%Hprice = prices.get("chainlink_agg")
190
+
191
+ if pyth_price and chainlink_:%M:%S')}</td>
192
+ <td><strong>{s['asset']}/USD</strong></td>
193
+ <td><spanprice and pyth_price > 0:
194
+ spread = abs(pyth_price - chainlink_ class="{'buy' if is_pyth_cheaper else 'sell'}">${s["pyth_priceprice) / chainlink_price
195
+ if spread > OPPORTUNITY_THRESHOLD:
196
+ opportunity = {
197
+ "]:,.2f}</span></td>
198
+ <td><span class="{'sell' if is_pyth_che"asset": asset, "pyth_price": pyth_price,
199
+ "chainlink_price": chainlink_price, "spread_pct": spread * 100
200
+ }
201
+ briefing = await analyzer.get_alpha_briefing(asset, opportunity)
202
+ if briefing:
203
+ signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
204
+
205
+ aper else 'buy'}">${s["chainlink_price"]:,.2f}</span></td>
206
+ <td><strong # --- THE FIX: Write signal directly to the JSON file ---
207
+ current_signals = []
208
+ if class="buy">{s['spread_pct']:.3f}%</strong></td>
209
+ <td><span class="risk-{s.get('risk', 'low').lower()}">{s.get('risk', 'N/A')} os.path.exists(SIGNALS_FILE):
210
+ with open(SIGNALS_FILE, 'r')</span></td>
211
+ <td>{s.get('strategy', 'N/A')}</td>
212
+ </tr>
213
+ """) as f:
214
+ try:
215
+ current_signals = json.load(f)
216
+
217
+
218
+ profit_html = f'<span id="pnl-ticker" hx-swap-oob="except json.JSONDecodeError:
219
+ pass # File might be empty or corrupt
220
+
221
+ current_signals.true">Simulated P/L: <span style="color: #34D399;">${totalinsert(0, signal) # Prepend new signal
222
+
223
+ with open(SIGNALS_FILE, '_profit:,.2f}</span></span>'
224
+
225
+ return HTMLResponse(profit_html + "".join(table_w') as f:
226
+ json.dump(current_signals, f)
227
+
228
+ print(f"βœ… Signal LOGGED for {asset}: {signal['spread_pct']:.3f}%")
229
+ exceptrows_html))
230
 
231
+ # This single mount point serves index.html for the root path "/"
232
+ app.mount("/", Exception as e:
233
+ print(f"❌ ERROR in engine loop: {e}")
234
 
235
+ await asyncio. StaticFiles(directory="static", html=True), name="static")