CryptoSentinel_AI / app /price_fetcher.py
mgbam's picture
Update app/price_fetcher.py
3a73c60 verified
raw
history blame
2.13 kB
"""
A professional-grade, multi-oracle price engine.
This engine fetches prices from two independent, globally-accessible
decentralized oracles (Pyth and Chainlink) to detect market dislocations.
"""
import asyncio
import logging
from typing import Dict, Optional
import httpx
logger = logging.getLogger(__name__)
class PriceFetcher:
PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43"
CHAINLINK_URL = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd"
def __init__(self, client: httpx.AsyncClient):
self.client = client
self._prices: Dict[str, Optional[float]] = {"pyth": None, "chainlink_agg": None}
self._lock = asyncio.Lock()
async def _fetch_pyth(self) -> Optional[float]:
try:
resp = await self.client.get(self.PYTH_URL, timeout=5)
resp.raise_for_status()
price_data = resp.json()['parsed'][0]['price']
return int(price_data['price']) / (10 ** abs(int(price_data['expo'])))
except Exception as e:
logger.error(f"❌ Oracle Error (Pyth): {e}")
return None
async def _fetch_chainlink_agg(self) -> Optional[float]:
try:
resp = await self.client.get(self.CHAINLINK_URL, timeout=5)
resp.raise_for_status()
return float(resp.json()['bitcoin']['usd'])
except Exception as e:
logger.error(f"❌ Oracle Error (Chainlink Agg): {e}")
return None
async def update_prices_async(self):
pyth_task = self._fetch_pyth()
chainlink_task = self._fetch_chainlink_agg()
pyth_price, chainlink_price = await asyncio.gather(pyth_task, chainlink_task)
async with self._lock:
self._prices["pyth"] = pyth_price
self._prices["chainlink_agg"] = chainlink_price
logger.info(f"βœ… Oracle Prices Updated: Pyth=${pyth_price}, Chainlink Agg=${chainlink_price}")
def get_current_prices(self) -> Dict[str, Optional[float]]:
return self._prices.copy()