CryptoSentinel_AI / app /price_fetcher.py
mgbam's picture
Update app/price_fetcher.py
2ac36a0 verified
raw
history blame
2.98 kB
"""
A professional-grade, WebSocket-based, high-frequency price engine.
This engine establishes a persistent connection to the Binance real-time data
stream for BTC/USDT, providing millisecond-latency price updates. It uses
Pyth as a redundant, cross-checking data source.
"""
import asyncio
import json
import logging
from typing import Dict, Optional
import httpx
import websockets
logger = logging.getLogger(__name__)
class PriceFetcher:
# Primary data source: Binance WebSocket for real-time trades
BINANCE_WS_URL = "wss://stream.binance.com:9443/ws/btcusdt@trade"
# Secondary source for sanity checks
PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43"
def __init__(self, client: httpx.AsyncClient):
self.client = client
self._prices: Dict[str, Optional[float]] = {"realtime_binance": None, "on_chain_pyth": None}
self._lock = asyncio.Lock()
self.last_binance_price: Optional[float] = None
async def get_current_prices(self) -> Dict[str, Optional[float]]:
"""Returns a copy of the latest prices."""
# Fetch a fresh Pyth price for the most up-to-date comparison
pyth_price = await self._fetch_pyth()
async with self._lock:
self._prices["on_chain_pyth"] = pyth_price
# Use the last price received from the continuously running websocket
self._prices["realtime_binance"] = self.last_binance_price
return self._prices.copy()
async def _fetch_pyth(self) -> Optional[float]:
try:
resp = await self.client.get(self.PYTH_URL, timeout=5)
resp.raise_for_status()
price_data = resp.json()['parsed'][0]['price']
return int(price_data['price']) / (10 ** abs(int(price_data['expo'])))
except Exception as e:
logger.error(f"❌ Failed to fetch from Pyth: {e}")
return None
async def run_websocket_listener(self):
"""
Connects to the Binance WebSocket and continuously updates the latest price.
This function runs forever in the background.
"""
while True:
try:
async with websockets.connect(self.BINANCE_WS_URL) as websocket:
logger.info("βœ… Connected to Binance WebSocket stream.")
while True:
message = await websocket.recv()
data = json.loads(message)
# The 'p' key in the trade stream is the price
if 'p' in data:
price = float(data['p'])
async with self._lock:
self.last_binance_price = price
except Exception as e:
logger.error(f"❌ WebSocket connection error: {e}. Reconnecting in 5 seconds...")
await asyncio.sleep(5)