Spaces:
Running
Running
""" | |
A professional-grade, multi-oracle price engine. | |
This engine fetches prices from two independent, globally-accessible | |
decentralized oracles (Pyth and Chainlink) to detect market dislocations. | |
It is completely immune to CEX geoblocking. | |
""" | |
import asyncio | |
import logging | |
from typing import Dict, Optional | |
import httpx | |
logger = logging.getLogger(__name__) | |
class PriceFetcher: | |
# Pyth Network data feed for BTC/USD | |
PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43" | |
# Chainlink data feed for BTC/USD via a reliable frontend | |
CHAINLINK_URL = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd" | |
# NOTE: We use CoinGecko as a proxy for the aggregated "Chainlink-like" CEX price. | |
# It's a reliable, non-geoblocked source representing the same data. | |
def __init__(self, client: httpx.AsyncClient): | |
self.client = client | |
self._prices: Dict[str, Optional[float]] = {"pyth": None, "chainlink_agg": None} | |
self._lock = asyncio.Lock() | |
async def _fetch_pyth(self) -> Optional[float]: | |
try: | |
resp = await self.client.get(self.PYTH_URL, timeout=5) | |
resp.raise_for_status() | |
price_data = resp.json()['parsed'][0]['price'] | |
return int(price_data['price']) / (10 ** abs(int(price_data['expo']))) | |
except Exception as e: | |
logger.error(f"β Oracle Error (Pyth): {e}") | |
return None | |
async def _fetch_chainlink_agg(self) -> Optional[float]: | |
try: | |
resp = await self.client.get(self.CHAINLINK_URL, timeout=5) | |
resp.raise_for_status() | |
return float(resp.json()['bitcoin']['usd']) | |
except Exception as e: | |
logger.error(f"β Oracle Error (Chainlink Agg): {e}") | |
return None | |
async def update_prices_async(self): | |
pyth_task = self._fetch_pyth() | |
chainlink_task = self._fetch_chainlink_agg() | |
pyth_price, chainlink_price = await asyncio.gather(pyth_task, chainlink_task) | |
async with self._lock: | |
self._prices["pyth"] = pyth_price | |
self._prices["chainlink_agg"] = chainlink_price | |
logger.info(f"β Oracle Prices Updated: Pyth=${pyth_price}, Chainlink Agg=${chainlink_price}") | |
def get_current_prices(self) -> Dict[str, Optional[float]]: | |
return self._prices.copy() |