CryptoSentinel_AI / app /price_fetcher.py
mgbam's picture
Update app/price_fetcher.py
2aa11ed verified
raw
history blame
2.5 kB
"""
A high-availability price fetcher using a decentralized oracle (Pyth)
and a reliable data aggregator to bypass geoblocking.
"""
import asyncio
import logging
from typing import Dict, Optional
import httpx
logger = logging.getLogger(__name__)
class PriceFetcher:
# Pyth provides real-time, on-chain prices. We will use their public API.
# We will fetch BTC/USD.
PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43"
# A reliable aggregator that is not typically geoblocked.
AGGREGATOR_URL = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd"
def __init__(self, client: httpx.AsyncClient):
self.client = client
self._prices: Dict[str, Optional[float]] = {"on_chain_pyth": None, "off_chain_agg": None}
self._lock = asyncio.Lock()
async def _fetch_pyth(self) -> Optional[float]:
try:
resp = await self.client.get(self.PYTH_URL, timeout=5)
resp.raise_for_status()
data = resp.json()
# The price is in the 'parsed' part of the response
price_data = data['parsed'][0]['price']
# Price is given with an exponent, e.g., price=119123, expo=-2 -> 1191.23
# But for BTC/USD, the expo is -8, and price is in sats. We need to adjust.
price = int(price_data['price']) / (10 ** abs(int(price_data['expo'])))
return price
except Exception as e:
logger.error(f"❌ Failed to fetch from Pyth: {e}")
return None
async def _fetch_aggregator(self) -> Optional[float]:
try:
resp = await self.client.get(self.AGGREGATOR_URL, timeout=5)
resp.raise_for_status()
return float(resp.json()['bitcoin']['usd'])
except Exception as e:
logger.error(f"❌ Failed to fetch from Aggregator: {e}")
return None
async def update_prices_async(self):
pyth_task = self._fetch_pyth()
agg_task = self._fetch_aggregator()
pyth_price, agg_price = await asyncio.gather(pyth_task, agg_task)
async with self._lock:
self._prices["on_chain_pyth"] = pyth_price
self._prices["off_chain_agg"] = agg_price
logger.info(f"βœ… Prices updated: {self._prices}")
def get_current_prices(self) -> Dict[str, Optional[float]]:
return self._prices.copy()