Spaces:
Sleeping
Sleeping
""" | |
A professional-grade, WebSocket-based, high-frequency price engine. | |
This engine establishes a persistent connection to the Binance real-time data | |
stream for BTC/USDT, providing millisecond-latency price updates. It uses | |
Pyth as a redundant, cross-checking data source. | |
""" | |
import asyncio | |
import json | |
import logging | |
from typing import Dict, Optional | |
import httpx | |
import websockets | |
logger = logging.getLogger(__name__) | |
class PriceFetcher: | |
# Primary data source: Binance WebSocket for real-time trades | |
BINANCE_WS_URL = "wss://stream.binance.com:9443/ws/btcusdt@trade" | |
# Secondary source for sanity checks | |
PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43" | |
def __init__(self, client: httpx.AsyncClient): | |
self.client = client | |
self._prices: Dict[str, Optional[float]] = {"realtime_binance": None, "on_chain_pyth": None} | |
self._lock = asyncio.Lock() | |
self.last_binance_price: Optional[float] = None | |
async def get_current_prices(self) -> Dict[str, Optional[float]]: | |
"""Returns a copy of the latest prices.""" | |
# Fetch a fresh Pyth price for the most up-to-date comparison | |
pyth_price = await self._fetch_pyth() | |
async with self._lock: | |
self._prices["on_chain_pyth"] = pyth_price | |
# Use the last price received from the continuously running websocket | |
self._prices["realtime_binance"] = self.last_binance_price | |
return self._prices.copy() | |
async def _fetch_pyth(self) -> Optional[float]: | |
try: | |
resp = await self.client.get(self.PYTH_URL, timeout=5) | |
resp.raise_for_status() | |
price_data = resp.json()['parsed'][0]['price'] | |
return int(price_data['price']) / (10 ** abs(int(price_data['expo']))) | |
except Exception as e: | |
logger.error(f"β Failed to fetch from Pyth: {e}") | |
return None | |
async def run_websocket_listener(self): | |
""" | |
Connects to the Binance WebSocket and continuously updates the latest price. | |
This function runs forever in the background. | |
""" | |
while True: | |
try: | |
async with websockets.connect(self.BINANCE_WS_URL) as websocket: | |
logger.info("β Connected to Binance WebSocket stream.") | |
while True: | |
message = await websocket.recv() | |
data = json.loads(message) | |
# The 'p' key in the trade stream is the price | |
if 'p' in data: | |
price = float(data['p']) | |
async with self._lock: | |
self.last_binance_price = price | |
except Exception as e: | |
logger.error(f"β WebSocket connection error: {e}. Reconnecting in 5 seconds...") | |
await asyncio.sleep(5) |