CryptoSentinel_AI / app /price_fetcher.py
mgbam's picture
Update app/price_fetcher.py
c4fd2ca verified
raw
history blame
5.67 kB
"""
A professional-grade, multi-asset, multi-oracle price engine.
v12.0 FINAL: This version uses three independent data sources and calculates a
resilient median price to be immune to single-source API failures.
"""
import asyncio
import logging
from typing import Dict, Optional, List
import httpx
import statistics
logger = logging.getLogger(__name__)
# --- CONFIGURATION ---
ASSET_CONFIG = {
"BTC": {"pyth_id": "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43", "coingecko_id": "bitcoin", "coincap_id": "bitcoin"},
"ETH": {"pyth_id": "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace", "coingecko_id": "ethereum", "coincap_id": "ethereum"},
"SOL": {"pyth_id": "ef0d8b6fda2ceba41da15d4095d1da392a0d2f8ed0c6c7bc0f4cfac8c280b56d", "coingecko_id": "solana", "coincap_id": "solana"},
"XRP": {"pyth_id": "02a01e69981d314fd8a723be08253181e53b4945b4bf376d15a51980a37330c3", "coingecko_id": "ripple", "coincap_id": "xrp"},
"DOGE": {"pyth_id": "042f02faf4229babc62635593855b6a383d6a4a2a1b9b9a7c385a4a50b86a345", "coingecko_id": "dogecoin", "coincap_id": "dogecoin"},
"ADA": {"pyth_id": "34f544985c7943c093b5934963505a767f4749445244a852654c6017b28091ea", "coingecko_id": "cardano", "coincap_id": "cardano"},
"AVAX": {"pyth_id": "0x141f2a3c34c8035443a01d64380b52207991b16c14c5145f617eb578a994753c", "coingecko_id": "avalanche-2", "coincap_id": "avalanche"},
"LINK": {"pyth_id": "0x63f4f4755a5a67c64c781d45763b33a72666a15e6b91c0fbdf3b2f205d5a6b01", "coingecko_id": "chainlink", "coincap_id": "chainlink"},
"DOT": {"pyth_id": "0x00a896677493a74421b33362a7447785b13612f0e340d418641a33716a5067a3", "coingecko_id": "polkadot", "coincap_id": "polkadot"},
"MATIC": {"pyth_id": "0x737ac3c13709b45da8128ff9e1058a984f86a048035656111b8a365e4921648a", "coingecko_id": "matic-network", "coincap_id": "polygon"},
}
class PriceFetcher:
PYTH_URL = "https://hermes.pyth.network/v2/price_feeds"
COINGECKO_URL = "https://api.coingecko.com/api/v3/simple/price"
COINCAP_URL = "https://api.coincap.io/v2/assets"
def __init__(self, client: httpx.AsyncClient):
self.client = client
self._prices: Dict[str, Dict[str, Optional[float]]] = {}
self._lock = asyncio.Lock()
async def _fetch_from_source(self, source_name: str, asset_ids: Dict[str, str]) -> Dict[str, float]:
"""A generic fetcher for all our sources."""
prices = {}
try:
if source_name == "pyth":
params = [("ids[]", f"0x{pid}") for pid in asset_ids.values()]
resp = await self.client.get(self.PYTH_URL, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
id_to_symbol = {f"0x{v}": k for k, v in asset_ids.items()}
for item in data:
price = item.get('price', {})
if price and 'price' in price:
prices[id_to_symbol[item['id']]] = int(price['price']) / (10 ** abs(int(price['expo'])))
elif source_name == "coingecko":
params = {"ids": ",".join(asset_ids.values()), "vs_currencies": "usd"}
resp = await self.client.get(self.COINGECKO_URL, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
id_to_symbol = {v: k for k, v in asset_ids.items()}
for cg_id, price_data in data.items():
prices[id_to_symbol[cg_id]] = price_data['usd']
elif source_name == "coincap":
params = {"ids": ",".join(asset_ids.values())}
resp = await self.client.get(self.COINCAP_URL, params=params, timeout=10)
resp.raise_for_status()
data = resp.json().get('data', [])
id_to_symbol = {v: k for k, v in asset_ids.items()}
for item in data:
prices[id_to_symbol[item['id']]] = float(item['priceUsd'])
except Exception as e:
logger.error(f"❌ Oracle Error ({source_name}): {e}")
return prices
async def update_prices_async(self):
# Prepare asset ID maps for each source
pyth_ids = {k: v['pyth_id'] for k, v in ASSET_CONFIG.items()}
coingecko_ids = {k: v['coingecko_id'] for k, v in ASSET_CONFIG.items()}
coincap_ids = {k: v['coincap_id'] for k, v in ASSET_CONFIG.items()}
# Fetch all sources concurrently
tasks = [
self._fetch_from_source("pyth", pyth_ids),
self._fetch_from_source("coingecko", coingecko_ids),
self._fetch_from_source("coincap", coincap_ids),
]
pyth_prices, coingecko_prices, coincap_prices = await asyncio.gather(*tasks)
async with self._lock:
for symbol in ASSET_CONFIG.keys():
# Get a list of all valid prices for the current asset
valid_agg_prices = [
p for p in [coingecko_prices.get(symbol), coincap_prices.get(symbol)] if p is not None
]
# Calculate a resilient median price for the off-chain aggregators
median_agg_price = statistics.median(valid_agg_prices) if valid_agg_prices else None
self._prices[symbol] = {
"pyth": pyth_prices.get(symbol),
"chainlink_agg": median_agg_price
}
logger.info(f"βœ… Multi-Source Prices Updated")
def get_all_prices(self) -> Dict[str, Dict[str, Optional[float]]]:
return self._prices.copy()