Spaces:
Sleeping
Sleeping
""" | |
A high-availability price fetcher using a decentralized oracle (Pyth) | |
and a reliable data aggregator to bypass geoblocking. | |
""" | |
import asyncio | |
import logging | |
from typing import Dict, Optional | |
import httpx | |
logger = logging.getLogger(__name__) | |
class PriceFetcher: | |
# Pyth provides real-time, on-chain prices. We will use their public API. | |
# We will fetch BTC/USD. | |
PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43" | |
# A reliable aggregator that is not typically geoblocked. | |
AGGREGATOR_URL = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd" | |
def __init__(self, client: httpx.AsyncClient): | |
self.client = client | |
self._prices: Dict[str, Optional[float]] = {"on_chain_pyth": None, "off_chain_agg": None} | |
self._lock = asyncio.Lock() | |
async def _fetch_pyth(self) -> Optional[float]: | |
try: | |
resp = await self.client.get(self.PYTH_URL, timeout=5) | |
resp.raise_for_status() | |
data = resp.json() | |
# The price is in the 'parsed' part of the response | |
price_data = data['parsed'][0]['price'] | |
# Price is given with an exponent, e.g., price=119123, expo=-2 -> 1191.23 | |
# But for BTC/USD, the expo is -8, and price is in sats. We need to adjust. | |
price = int(price_data['price']) / (10 ** abs(int(price_data['expo']))) | |
return price | |
except Exception as e: | |
logger.error(f"β Failed to fetch from Pyth: {e}") | |
return None | |
async def _fetch_aggregator(self) -> Optional[float]: | |
try: | |
resp = await self.client.get(self.AGGREGATOR_URL, timeout=5) | |
resp.raise_for_status() | |
return float(resp.json()['bitcoin']['usd']) | |
except Exception as e: | |
logger.error(f"β Failed to fetch from Aggregator: {e}") | |
return None | |
async def update_prices_async(self): | |
pyth_task = self._fetch_pyth() | |
agg_task = self._fetch_aggregator() | |
pyth_price, agg_price = await asyncio.gather(pyth_task, agg_task) | |
async with self._lock: | |
self._prices["on_chain_pyth"] = pyth_price | |
self._prices["off_chain_agg"] = agg_price | |
logger.info(f"β Prices updated: {self._prices}") | |
def get_current_prices(self) -> Dict[str, Optional[float]]: | |
return self._prices.copy() |