Spaces:
Running
Running
File size: 2,134 Bytes
97873ec 431e338 97873ec fd31489 476cee0 2ac36a0 fd31489 19f368e fd31489 2ac36a0 431e338 19f368e fd31489 431e338 19f368e 476cee0 2aa11ed 476cee0 2aa11ed 19f368e 3bb8a2f 19f368e 431e338 2ac36a0 19f368e 431e338 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
"""
A professional-grade, multi-oracle price engine.
This engine fetches prices from two independent, globally-accessible
decentralized oracles (Pyth and Chainlink) to detect market dislocations.
"""
import asyncio
import logging
from typing import Dict, Optional
import httpx
logger = logging.getLogger(__name__)
class PriceFetcher:
PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43"
CHAINLINK_URL = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd"
def __init__(self, client: httpx.AsyncClient):
self.client = client
self._prices: Dict[str, Optional[float]] = {"pyth": None, "chainlink_agg": None}
self._lock = asyncio.Lock()
async def _fetch_pyth(self) -> Optional[float]:
try:
resp = await self.client.get(self.PYTH_URL, timeout=5)
resp.raise_for_status()
price_data = resp.json()['parsed'][0]['price']
return int(price_data['price']) / (10 ** abs(int(price_data['expo'])))
except Exception as e:
logger.error(f"β Oracle Error (Pyth): {e}")
return None
async def _fetch_chainlink_agg(self) -> Optional[float]:
try:
resp = await self.client.get(self.CHAINLINK_URL, timeout=5)
resp.raise_for_status()
return float(resp.json()['bitcoin']['usd'])
except Exception as e:
logger.error(f"β Oracle Error (Chainlink Agg): {e}")
return None
async def update_prices_async(self):
pyth_task = self._fetch_pyth()
chainlink_task = self._fetch_chainlink_agg()
pyth_price, chainlink_price = await asyncio.gather(pyth_task, chainlink_task)
async with self._lock:
self._prices["pyth"] = pyth_price
self._prices["chainlink_agg"] = chainlink_price
logger.info(f"β
Oracle Prices Updated: Pyth=${pyth_price}, Chainlink Agg=${chainlink_price}")
def get_current_prices(self) -> Dict[str, Optional[float]]:
return self._prices.copy() |