mgbam commited on
Commit
2ac36a0
Β·
verified Β·
1 Parent(s): c3b443a

Update app/price_fetcher.py

Browse files
Files changed (1) hide show
  1. app/price_fetcher.py +46 -72
app/price_fetcher.py CHANGED
@@ -1,34 +1,41 @@
1
  """
2
- A high-throughput, fault-tolerant, multi-source price fetcher engine.
3
- Designed to bypass rate-limiting and ensure constant data flow.
 
 
 
4
  """
5
  import asyncio
 
6
  import logging
7
- from typing import Dict, Optional, List
8
  import httpx
9
- import os
10
- import statistics
11
 
12
  logger = logging.getLogger(__name__)
13
 
14
  class PriceFetcher:
15
- # Pyth for our on-chain data
16
- PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43"
17
-
18
- # A list of our off-chain aggregator sources
19
- AGGREGATOR_SOURCES = {
20
- "coingecko": "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd",
21
- "coincap": "https://api.coincap.io/v2/assets/bitcoin",
22
- }
23
 
24
- # Use Pro API if a key is provided as a secret
25
- COINGECKO_PRO_URL = "https://pro-api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd"
26
 
27
  def __init__(self, client: httpx.AsyncClient):
28
  self.client = client
29
- self._prices: Dict[str, Optional[float]] = {"on_chain_pyth": None, "off_chain_agg": None}
30
  self._lock = asyncio.Lock()
31
- self.coingecko_api_key = os.getenv("COINGECKO_API_KEY")
 
 
 
 
 
 
 
 
 
 
32
 
33
  async def _fetch_pyth(self) -> Optional[float]:
34
  try:
@@ -36,60 +43,27 @@ class PriceFetcher:
36
  resp.raise_for_status()
37
  price_data = resp.json()['parsed'][0]['price']
38
  return int(price_data['price']) / (10 ** abs(int(price_data['expo'])))
39
- except Exception:
40
- return None # Fail silently, we have other sources
41
-
42
- async def _fetch_aggregator(self, name: str, url: str) -> Optional[float]:
43
- try:
44
- headers = {}
45
- if name == "coingecko" and self.coingecko_api_key:
46
- url = self.COINGECKO_PRO_URL
47
- headers["X-Cg-Pro-Api-Key"] = self.coingecko_api_key
48
-
49
- resp = await self.client.get(url, timeout=5, headers=headers)
50
- if resp.status_code == 429: # Explicitly handle rate limiting
51
- logger.warning(f"⚠️ Rate limited by {name}. Skipping.")
52
- return None
53
- resp.raise_for_status()
54
- data = resp.json()
55
-
56
- if name == "coingecko": return float(data['bitcoin']['usd'])
57
- if name == "coincap": return float(data['data']['priceUsd'])
58
-
59
  except Exception as e:
60
- logger.error(f"❌ Failed to fetch from {name}: {e}")
61
- return None
62
-
63
- def _get_sane_average(self, prices: List[float]) -> Optional[float]:
64
- """Calculates the average price, discarding outliers."""
65
- if not prices: return None
66
- if len(prices) < 3: return statistics.mean(prices)
67
-
68
- # For 3 or more sources, discard any price that is more than 2%
69
- # different from the median, then average the rest.
70
- median = statistics.median(prices)
71
- sane_prices = [p for p in prices if abs(p - median) / median < 0.02]
72
-
73
- if not sane_prices: return median # Fallback to median if all are outliers
74
- return statistics.mean(sane_prices)
75
-
76
- async def update_prices_async(self):
77
- # Fetch all sources concurrently
78
- pyth_task = self._fetch_pyth()
79
- agg_tasks = [self._fetch_aggregator(name, url) for name, url in self.AGGREGATOR_SOURCES.items()]
80
-
81
- results = await asyncio.gather(pyth_task, *agg_tasks)
82
-
83
- pyth_price = results[0]
84
- agg_prices = [p for p in results[1:] if p is not None]
85
-
86
- sane_agg_price = self._get_sane_average(agg_prices)
87
-
88
- async with self._lock:
89
- self._prices["on_chain_pyth"] = pyth_price
90
- self._prices["off_chain_agg"] = sane_agg_price
91
-
92
- logger.info(f"βœ… Prices updated: On-Chain={pyth_price}, Off-Chain Agg={sane_agg_price} (from {len(agg_prices)} sources)")
93
 
94
- def get_current_prices(self) -> Dict[str, Optional[float]]:
95
- return self._prices.copy()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ A professional-grade, WebSocket-based, high-frequency price engine.
3
+
4
+ This engine establishes a persistent connection to the Binance real-time data
5
+ stream for BTC/USDT, providing millisecond-latency price updates. It uses
6
+ Pyth as a redundant, cross-checking data source.
7
  """
8
  import asyncio
9
+ import json
10
  import logging
11
+ from typing import Dict, Optional
12
  import httpx
13
+ import websockets
 
14
 
15
  logger = logging.getLogger(__name__)
16
 
17
  class PriceFetcher:
18
+ # Primary data source: Binance WebSocket for real-time trades
19
+ BINANCE_WS_URL = "wss://stream.binance.com:9443/ws/btcusdt@trade"
 
 
 
 
 
 
20
 
21
+ # Secondary source for sanity checks
22
+ PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43"
23
 
24
  def __init__(self, client: httpx.AsyncClient):
25
  self.client = client
26
+ self._prices: Dict[str, Optional[float]] = {"realtime_binance": None, "on_chain_pyth": None}
27
  self._lock = asyncio.Lock()
28
+ self.last_binance_price: Optional[float] = None
29
+
30
+ async def get_current_prices(self) -> Dict[str, Optional[float]]:
31
+ """Returns a copy of the latest prices."""
32
+ # Fetch a fresh Pyth price for the most up-to-date comparison
33
+ pyth_price = await self._fetch_pyth()
34
+ async with self._lock:
35
+ self._prices["on_chain_pyth"] = pyth_price
36
+ # Use the last price received from the continuously running websocket
37
+ self._prices["realtime_binance"] = self.last_binance_price
38
+ return self._prices.copy()
39
 
40
  async def _fetch_pyth(self) -> Optional[float]:
41
  try:
 
43
  resp.raise_for_status()
44
  price_data = resp.json()['parsed'][0]['price']
45
  return int(price_data['price']) / (10 ** abs(int(price_data['expo'])))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  except Exception as e:
47
+ logger.error(f"❌ Failed to fetch from Pyth: {e}")
48
+ return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
+ async def run_websocket_listener(self):
51
+ """
52
+ Connects to the Binance WebSocket and continuously updates the latest price.
53
+ This function runs forever in the background.
54
+ """
55
+ while True:
56
+ try:
57
+ async with websockets.connect(self.BINANCE_WS_URL) as websocket:
58
+ logger.info("βœ… Connected to Binance WebSocket stream.")
59
+ while True:
60
+ message = await websocket.recv()
61
+ data = json.loads(message)
62
+ # The 'p' key in the trade stream is the price
63
+ if 'p' in data:
64
+ price = float(data['p'])
65
+ async with self._lock:
66
+ self.last_binance_price = price
67
+ except Exception as e:
68
+ logger.error(f"❌ WebSocket connection error: {e}. Reconnecting in 5 seconds...")
69
+ await asyncio.sleep(5)