File size: 2,981 Bytes
97873ec
2ac36a0
 
 
 
 
97873ec
fd31489
2ac36a0
476cee0
2ac36a0
fd31489
2ac36a0
fd31489
19f368e
fd31489
 
2ac36a0
 
3bb8a2f
2ac36a0
 
97873ec
19f368e
fd31489
2ac36a0
19f368e
2ac36a0
 
 
 
 
 
 
 
 
 
 
476cee0
2aa11ed
476cee0
2aa11ed
19f368e
3bb8a2f
 
19f368e
2ac36a0
 
19f368e
2ac36a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
A professional-grade, WebSocket-based, high-frequency price engine.

This engine establishes a persistent connection to the Binance real-time data
stream for BTC/USDT, providing millisecond-latency price updates. It uses
Pyth as a redundant, cross-checking data source.
"""
import asyncio
import json
import logging
from typing import Dict, Optional
import httpx
import websockets

logger = logging.getLogger(__name__)

class PriceFetcher:
    # Primary data source: Binance WebSocket for real-time trades
    BINANCE_WS_URL = "wss://stream.binance.com:9443/ws/btcusdt@trade"
    
    # Secondary source for sanity checks
    PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43"

    def __init__(self, client: httpx.AsyncClient):
        self.client = client
        self._prices: Dict[str, Optional[float]] = {"realtime_binance": None, "on_chain_pyth": None}
        self._lock = asyncio.Lock()
        self.last_binance_price: Optional[float] = None

    async def get_current_prices(self) -> Dict[str, Optional[float]]:
        """Returns a copy of the latest prices."""
        # Fetch a fresh Pyth price for the most up-to-date comparison
        pyth_price = await self._fetch_pyth()
        async with self._lock:
            self._prices["on_chain_pyth"] = pyth_price
            # Use the last price received from the continuously running websocket
            self._prices["realtime_binance"] = self.last_binance_price
            return self._prices.copy()

    async def _fetch_pyth(self) -> Optional[float]:
        try:
            resp = await self.client.get(self.PYTH_URL, timeout=5)
            resp.raise_for_status()
            price_data = resp.json()['parsed'][0]['price']
            return int(price_data['price']) / (10 ** abs(int(price_data['expo'])))
        except Exception as e:
            logger.error(f"❌ Failed to fetch from Pyth: {e}")
            return None

    async def run_websocket_listener(self):
        """
        Connects to the Binance WebSocket and continuously updates the latest price.
        This function runs forever in the background.
        """
        while True:
            try:
                async with websockets.connect(self.BINANCE_WS_URL) as websocket:
                    logger.info("βœ… Connected to Binance WebSocket stream.")
                    while True:
                        message = await websocket.recv()
                        data = json.loads(message)
                        # The 'p' key in the trade stream is the price
                        if 'p' in data:
                            price = float(data['p'])
                            async with self._lock:
                                self.last_binance_price = price
            except Exception as e:
                logger.error(f"❌ WebSocket connection error: {e}. Reconnecting in 5 seconds...")
                await asyncio.sleep(5)