mgbam commited on
Commit
acb6f17
Β·
verified Β·
1 Parent(s): 362692f

Update app/app.py

Browse files
Files changed (1) hide show
  1. app/app.py +59 -204
app/app.py CHANGED
@@ -1,235 +1,90 @@
1
  """
2
- Sentinel of debugging. The logs from our previous sessions showed the backend was working, so this points to a subtle but critical failure in the final Arbitrage Engine - v15.0 (Brute-Force Edition)
3
 
4
- This version uses a file-based log for link: the real-time connection between the backend and the frontend.
5
-
6
- This situation calls for a definitive, "n absolute signal persistence and
7
- a high-frequency polling mechanism for guaranteed data delivery.
8
  """
9
  import asyncio
10
  import os
11
  import json
12
- from contextlib import asynccontextmanager
13
  from datetime import datetime, timezone
 
14
  import httpx
15
- from fastapi import FastAPI, Requestuke it from orbit" solution. We are going to abandon the complex real-time protocols that have proven unreliable in this specific
16
- from fastapi.responses import HTMLResponse
17
  from fastapi.staticfiles import StaticFiles
18
 
19
- from .price_fetcher import PriceFetcher environment and switch to a simpler, more forceful method that is guaranteed to work.
20
-
21
- ### The Creative Solution: The "
22
  from .arbitrage_analyzer import ArbitrageAnalyzer
23
 
24
- OPPORTUNITY_THRESHOLD = 0.001Brute Force" Polling Engine
25
-
26
- We will re-architect the application to be brutally simple and unstoppable.
27
 
28
- **5
29
- SIGNALS_FILE = "signals.json"
 
30
 
31
- @asynccontextmanager
32
- async def lifespan(app: FastAPI):
33
- # Clear the log on startup for a fresh session
34
- if os.path.exists(SIGNALS_The "Wow" Factor:**
35
- This isn't about fancy protocols anymore; it's about raw, undeniable resultsFILE):
36
- os.remove(SIGNALS_FILE)
37
-
38
- async with httpx.AsyncClient.
39
- 1. **Persistent Signal Log:** The backend will no longer use a fragile in-memory queue. It() as client:
40
- app.state.price_fetcher = PriceFetcher(client)
41
- app.state.arbitrage_analyzer = ArbitrageAnalyzer(client)
42
-
43
- arbitrage_task = asyncio. will write every signal it generates directly to a `signals.json` file. This is our new, permanent, audcreate_task(
44
- run_arbitrage_detector(app.state.price_fetcher, app.state.arbitrage_analyzer)
45
- )
46
-
47
- print("πŸš€ Sentinel Arbitrage Engine v15itable "database" of opportunities.
48
- 2. **Unstoppable Frontend:** The frontend will abandon WebSockets and SSE entirely.0 (Brute-Force) started.")
49
- yield
50
-
51
- print("⏳ Shutting down engine. It will use a simple, powerful HTMX polling mechanism to fetch the *entire contents* of the `signals.json...")
52
- arbitrage_task.cancel()
53
- try: await arbitrage_task
54
- except asyncio.Cancelled` file every 2 seconds and re-render the table. This is a "brute force" method that is immuneError: print("Engine shut down gracefully.")
55
 
56
- async def run_arbitrage_detector(price_fetcher, to the connection and state-sharing bugs we've faced.
57
- 3. **The P/L Ticker analyzer):
58
- """The core engine loop. Detects opportunities and writes them to a file."""
59
  while True:
60
  try:
61
  await price_fetcher.update_prices_async()
62
- all_prices =:** We will add a "Simulated Profit/Loss" ticker that calculates the total hypothetical profit from all the signals found price_fetcher.get_all_prices()
63
 
64
- for asset, prices in all_prices.items, providing a tangible "money-spinning" metric.
65
-
66
- This architecture is less elegant, but it is far more robust():
67
  pyth_price = prices.get("pyth")
68
  chainlink_price = prices.get("chainlink_agg")
69
 
70
- if pyth_price and chainlink_price and pyth and is guaranteed to get the data onto the screen.
71
-
72
- ---
73
-
74
- ### Step 1: The File-Based `app_price > 0:
75
- spread = abs(pyth_price - chainlink_price) / chain/main.py` (Definitive Rewrite)
76
-
77
- This version writes signals directly to a file and serves it via a simple APIlink_price
78
  if spread > OPPORTUNITY_THRESHOLD:
79
- opportunity = {
80
- "asset": asset endpoint.
81
-
82
- **Replace `app/main.py` with this:**
83
- ```python
84
- """
85
- Sentinel Arbitrage, "pyth_price": pyth_price,
86
- "chainlink_price": chainlink_price Engine - v15.0 FINAL (File-Based & Unbreakable)
87
-
88
- This version uses a file-based log for, "spread_pct": spread * 100
89
- }
90
- briefing = await analyzer.get_alpha_briefing(asset, opportunity)
91
- if briefing:
92
- signal = {**opportunity, ** absolute signal persistence and
93
- a high-frequency polling mechanism for guaranteed data delivery.
94
- """
95
- import asyncio
96
- import osbriefing, "timestamp": datetime.now(timezone.utc).isoformat()}
97
-
98
- # --- THE
99
- import json
100
- import time
101
- from contextlib import asynccontextmanager
102
- from datetime import datetime, timezone
103
- import htt CORE FIX: Write signal directly to the JSON file ---
104
- current_signals = []
105
- try:
106
- px
107
- from fastapi import FastAPI, Request
108
- from fastapi.responses import FileResponse, HTMLResponse
109
- from fastapi.staticfiles import Static with open(SIGNALS_FILE, 'r') as f:
110
- current_signals = json.loadFiles
111
-
112
- from .price_fetcher import PriceFetcher
113
- from .arbitrage_analyzer import ArbitrageAnalyzer
114
-
115
- OPPORTUNITY_THRESHOLD = 0.0015
116
- SIGNALS_FILE = "signals.json"
117
-
118
- @asynccontext(f)
119
- except (FileNotFoundError, json.JSONDecodeError):
120
- pass # File doesn't existmanager
121
- async def lifespan(app: FastAPI):
122
- # Clear the log on startup for a fresh session
123
- if os yet or is empty
124
-
125
- current_signals.insert(0, signal) # Prepend new signal
126
-
127
- with open(SIGNALS_FILE, 'w') as f:
128
- json.dump(current_signals, f)
129
-
130
- print(f"βœ… Signal LOGGED for {asset}: {signal['spread_pct']:.3f}%")
131
  except Exception as e:
132
  print(f"❌ ERROR in engine loop: {e}")
133
 
134
  await asyncio.sleep(15)
135
 
136
- # --- FastAPI App Setup ---
137
- app = FastAPI(lifes.path.exists(SIGNALS_FILE):
138
- os.remove(SIGNALS_FILE)
139
-
140
- pan=lifespan)
141
-
142
- # --- API Endpoint to serve the signals ---
143
- @app.get("/api/signals", async with httpx.AsyncClient() as client:
144
- app.state.price_fetcher = Price response_class=HTMLResponse)
145
- async def get_signals_table():
146
- """Reads the signals file and renders theFetcher(client)
147
- app.state.arbitrage_analyzer = ArbitrageAnalyzer(client)
148
-
149
- entire table body."""
150
- try:
151
- with open(SIGNALS_FILE, 'r') as f: arbitrage_task = asyncio.create_task(
152
- run_arbitrage_detector(app.state
153
- signals = json.load(f)
154
- except (FileNotFoundError, json.JSONDecodeError):
155
- signals.price_fetcher, app.state.arbitrage_analyzer)
156
  )
157
-
158
- print(" = []
159
-
160
- if not signals:
161
- return HTMLResponse('<tr id="placeholder-row"><td colspan="7πŸš€ Sentinel Arbitrage Engine v15.0 (File-Based) started.")
162
- yield
163
-
164
- " style="text-align:center;">Monitoring for arbitrage opportunities...</td></tr>')
165
 
166
- table_rows_htmlprint("⏳ Shutting down engine...")
167
- arbitrage_task.cancel()
168
- try: await arbitrage_task
169
- except asyncio.CancelledError: print("Engine shut down gracefully.")
170
-
171
- async def run_arbitrage_detector(price = []
172
- total_profit = 0
173
- for s in signals:
174
- is_pyth__fetcher, analyzer):
175
- """The core engine loop. Detects opportunities and writes them to a file."""
176
- cheaper = s['pyth_price'] < s['chainlink_price']
177
- profit = abs( while True:
178
- try:
179
- await price_fetcher.update_prices_async()
180
- s['chainlink_price'] - s['pyth_price'])
181
- total_profit += profit * (all_prices = price_fetcher.get_all_prices()
182
-
183
- for asset, prices in all_prices.items():
184
- pyth_price = prices.get("pyth")
185
- chainlink_1 - 0.002) # Assume 0.2% total fees
186
-
187
- table_rows_html.append(f"""
188
- <tr>
189
- <td>{datetime.fromisoformat(s['timestamp']).strftime('%Hprice = prices.get("chainlink_agg")
190
-
191
- if pyth_price and chainlink_:%M:%S')}</td>
192
- <td><strong>{s['asset']}/USD</strong></td>
193
- <td><spanprice and pyth_price > 0:
194
- spread = abs(pyth_price - chainlink_ class="{'buy' if is_pyth_cheaper else 'sell'}">${s["pyth_priceprice) / chainlink_price
195
- if spread > OPPORTUNITY_THRESHOLD:
196
- opportunity = {
197
- "]:,.2f}</span></td>
198
- <td><span class="{'sell' if is_pyth_che"asset": asset, "pyth_price": pyth_price,
199
- "chainlink_price": chainlink_price, "spread_pct": spread * 100
200
- }
201
- briefing = await analyzer.get_alpha_briefing(asset, opportunity)
202
- if briefing:
203
- signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
204
-
205
- aper else 'buy'}">${s["chainlink_price"]:,.2f}</span></td>
206
- <td><strong # --- THE FIX: Write signal directly to the JSON file ---
207
- current_signals = []
208
- if class="buy">{s['spread_pct']:.3f}%</strong></td>
209
- <td><span class="risk-{s.get('risk', 'low').lower()}">{s.get('risk', 'N/A')} os.path.exists(SIGNALS_FILE):
210
- with open(SIGNALS_FILE, 'r')</span></td>
211
- <td>{s.get('strategy', 'N/A')}</td>
212
- </tr>
213
- """) as f:
214
- try:
215
- current_signals = json.load(f)
216
-
217
-
218
- profit_html = f'<span id="pnl-ticker" hx-swap-oob="except json.JSONDecodeError:
219
- pass # File might be empty or corrupt
220
-
221
- current_signals.true">Simulated P/L: <span style="color: #34D399;">${totalinsert(0, signal) # Prepend new signal
222
-
223
- with open(SIGNALS_FILE, '_profit:,.2f}</span></span>'
224
-
225
- return HTMLResponse(profit_html + "".join(table_w') as f:
226
- json.dump(current_signals, f)
227
-
228
- print(f"βœ… Signal LOGGED for {asset}: {signal['spread_pct']:.3f}%")
229
- exceptrows_html))
230
-
231
- # This single mount point serves index.html for the root path "/"
232
- app.mount("/", Exception as e:
233
- print(f"❌ ERROR in engine loop: {e}")
234
 
235
- await asyncio. StaticFiles(directory="static", html=True), name="static")
 
 
 
 
1
  """
2
+ Sentinel Arbitrage Engine - v15.0 OMEGA (Socket.IO Perfected)
3
 
4
+ The definitive, money-spinning engine. This version uses a perfectly
5
+ configured Socket.IO server to push real-time, multi-asset arbitrage
6
+ signals directly to a dynamic frontend. This is the final architecture.
 
7
  """
8
  import asyncio
9
  import os
10
  import json
11
+ import time
12
  from datetime import datetime, timezone
13
+
14
  import httpx
15
+ import socketio
16
+ from fastapi import FastAPI
17
  from fastapi.staticfiles import StaticFiles
18
 
19
+ from .price_fetcher import PriceFetcher
 
 
20
  from .arbitrage_analyzer import ArbitrageAnalyzer
21
 
22
+ OPPORTUNITY_THRESHOLD = 0.0015
 
 
23
 
24
+ # --- Socket.IO Server Setup ---
25
+ # We create the server instance that will manage all real-time communication.
26
+ sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
27
 
28
+ # --- FastAPI App Setup ---
29
+ # The FastAPI app is now simpler. It only serves the static files.
30
+ app = FastAPI()
31
+ app.mount("/", StaticFiles(directory="static", html=True), name="static")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
 
33
+ # --- Background Engine ---
34
+ async def run_arbitrage_detector(price_fetcher, analyzer):
35
+ """The core engine loop. Detects opportunities and emits them via Socket.IO."""
36
  while True:
37
  try:
38
  await price_fetcher.update_prices_async()
39
+ all_prices = price_fetcher.get_all_prices()
40
 
41
+ for asset, prices in all_prices.items():
 
 
42
  pyth_price = prices.get("pyth")
43
  chainlink_price = prices.get("chainlink_agg")
44
 
45
+ if pyth_price and chainlink_price and pyth_price > 0:
46
+ spread = abs(pyth_price - chainlink_price) / chainlink_price
 
 
 
 
 
 
47
  if spread > OPPORTUNITY_THRESHOLD:
48
+ current_time = time.time()
49
+ # Simple throttle to avoid spamming Gemini for the same opportunity
50
+ if not hasattr(analyzer, 'last_call') or current_time - analyzer.last_call.get(asset, 0) > 60:
51
+ analyzer.last_call = getattr(analyzer, 'last_call', {})
52
+ analyzer.last_call[asset] = current_time
53
+
54
+ opportunity = {
55
+ "asset": asset, "pyth_price": pyth_price,
56
+ "chainlink_price": chainlink_price, "spread_pct": spread * 100
57
+ }
58
+ print(f"⚑️ Dislocation for {asset}: {opportunity['spread_pct']:.3f}%")
59
+ briefing = await analyzer.get_alpha_briefing(asset, opportunity)
60
+ if briefing:
61
+ signal = {**opportunity, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()}
62
+ await sio.emit('new_signal', signal)
63
+ print(f"βœ… Signal Emitted for {asset}: {signal['strategy']}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  except Exception as e:
65
  print(f"❌ ERROR in engine loop: {e}")
66
 
67
  await asyncio.sleep(15)
68
 
69
+ # --- Socket.IO Lifespan Events ---
70
+ # This is the CORRECT way to manage background tasks with python-socketio.
71
+ @sio.on('connect')
72
+ async def connect(sid, environ):
73
+ print(f"βœ… Client connected: {sid}")
74
+ # Start the engine only when the first user connects.
75
+ if sio.background_task is None:
76
+ print("πŸš€ First client connected. Starting Sentinel Engine...")
77
+ price_fetcher = PriceFetcher(httpx.AsyncClient())
78
+ arbitrage_analyzer = ArbitrageAnalyzer(httpx.AsyncClient())
79
+ sio.background_task = sio.start_background_task(
80
+ run_arbitrage_detector, price_fetcher, arbitrage_analyzer
 
 
 
 
 
 
 
 
81
  )
 
 
 
 
 
 
 
 
82
 
83
+ @sio.on('disconnect')
84
+ def disconnect(sid):
85
+ print(f"πŸ”₯ Client disconnected: {sid}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
 
87
+ # --- Final ASGI App ---
88
+ # We wrap the FastAPI app (for static files) and the Socket.IO app together.
89
+ # The Socket.IO server is the primary application.
90
+ combined_app = socketio.ASGIApp(sio, other_asgi_app=app)