Spaces:
Sleeping
Sleeping
File size: 2,981 Bytes
97873ec 2ac36a0 97873ec fd31489 2ac36a0 476cee0 2ac36a0 fd31489 2ac36a0 fd31489 19f368e fd31489 2ac36a0 3bb8a2f 2ac36a0 97873ec 19f368e fd31489 2ac36a0 19f368e 2ac36a0 476cee0 2aa11ed 476cee0 2aa11ed 19f368e 3bb8a2f 19f368e 2ac36a0 19f368e 2ac36a0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
"""
A professional-grade, WebSocket-based, high-frequency price engine.
This engine establishes a persistent connection to the Binance real-time data
stream for BTC/USDT, providing millisecond-latency price updates. It uses
Pyth as a redundant, cross-checking data source.
"""
import asyncio
import json
import logging
from typing import Dict, Optional
import httpx
import websockets
logger = logging.getLogger(__name__)
class PriceFetcher:
# Primary data source: Binance WebSocket for real-time trades
BINANCE_WS_URL = "wss://stream.binance.com:9443/ws/btcusdt@trade"
# Secondary source for sanity checks
PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43"
def __init__(self, client: httpx.AsyncClient):
self.client = client
self._prices: Dict[str, Optional[float]] = {"realtime_binance": None, "on_chain_pyth": None}
self._lock = asyncio.Lock()
self.last_binance_price: Optional[float] = None
async def get_current_prices(self) -> Dict[str, Optional[float]]:
"""Returns a copy of the latest prices."""
# Fetch a fresh Pyth price for the most up-to-date comparison
pyth_price = await self._fetch_pyth()
async with self._lock:
self._prices["on_chain_pyth"] = pyth_price
# Use the last price received from the continuously running websocket
self._prices["realtime_binance"] = self.last_binance_price
return self._prices.copy()
async def _fetch_pyth(self) -> Optional[float]:
try:
resp = await self.client.get(self.PYTH_URL, timeout=5)
resp.raise_for_status()
price_data = resp.json()['parsed'][0]['price']
return int(price_data['price']) / (10 ** abs(int(price_data['expo'])))
except Exception as e:
logger.error(f"β Failed to fetch from Pyth: {e}")
return None
async def run_websocket_listener(self):
"""
Connects to the Binance WebSocket and continuously updates the latest price.
This function runs forever in the background.
"""
while True:
try:
async with websockets.connect(self.BINANCE_WS_URL) as websocket:
logger.info("β
Connected to Binance WebSocket stream.")
while True:
message = await websocket.recv()
data = json.loads(message)
# The 'p' key in the trade stream is the price
if 'p' in data:
price = float(data['p'])
async with self._lock:
self.last_binance_price = price
except Exception as e:
logger.error(f"β WebSocket connection error: {e}. Reconnecting in 5 seconds...")
await asyncio.sleep(5) |