Spaces:
Running
Running
""" | |
A professional-grade, multi-asset, multi-oracle price engine. | |
This engine dynamically fetches prices for a configured list of assets | |
from decentralized oracles to detect market dislocations. | |
v11.1: Corrected Pyth Network API endpoint. | |
""" | |
import asyncio | |
import logging | |
from typing import Dict, Optional | |
import httpx | |
logger = logging.getLogger(__name__) | |
# --- CONFIGURATION --- | |
ASSET_CONFIG = { | |
"BTC": { | |
"pyth_id": "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43", | |
"coingecko_id": "bitcoin", | |
}, | |
"ETH": { | |
"pyth_id": "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace", | |
"coingecko_id": "ethereum", | |
}, | |
"SOL": { | |
"pyth_id": "ef0d8b6fda2ceba41da15d4095d1da392a0d2f8ed0c6c7bc0f4cfac8c280b56d", | |
"coingecko_id": "solana", | |
}, | |
"XRP": { | |
"pyth_id": "02a01e69981d314fd8a723be08253181e53b4945b4bf376d15a51980a37330c3", | |
"coingecko_id": "ripple", | |
}, | |
"DOGE": { | |
"pyth_id": "042f02faf4229babc62635593855b6a383d6a4a2a1b9b9a7c385a4a50b86a345", | |
"coingecko_id": "dogecoin", | |
}, | |
"ADA": { | |
"pyth_id": "34f544985c7943c093b5934963505a767f4749445244a852654c6017b28091ea", | |
"coingecko_id": "cardano", | |
}, | |
"AVAX": { | |
"pyth_id": "0x141f2a3c34c8035443a01d64380b52207991b16c14c5145f617eb578a994753c", | |
"coingecko_id": "avalanche-2", | |
}, | |
"LINK": { | |
"pyth_id": "0x63f4f4755a5a67c64c781d45763b33a72666a15e6b91c0fbdf3b2f205d5a6b01", | |
"coingecko_id": "chainlink", | |
}, | |
"DOT": { | |
"pyth_id": "0x00a896677493a74421b33362a7447785b13612f0e340d418641a33716a5067a3", | |
"coingecko_id": "polkadot", | |
}, | |
"MATIC": { | |
"pyth_id": "0x737ac3c13709b45da8128ff9e1058a984f86a048035656111b8a365e4921648a", | |
"coingecko_id": "matic-network", | |
}, | |
} | |
class PriceFetcher: | |
# ==================================================================== | |
# THE CRITICAL FIX IS HERE | |
# ==================================================================== | |
# Using the new, correct endpoint for Pyth Network price feeds. | |
PYTH_URL = "https://hermes.pyth.network/v2/price_feeds" | |
# ==================================================================== | |
AGGREGATOR_URL = "https://api.coingecko.com/api/v3/simple/price" | |
def __init__(self, client: httpx.AsyncClient): | |
self.client = client | |
self._prices: Dict[str, Dict[str, Optional[float]]] = {} | |
self._lock = asyncio.Lock() | |
async def _fetch_pyth_prices(self) -> Dict[str, float]: | |
pyth_ids = [v['pyth_id'] for v in ASSET_CONFIG.values()] | |
# The new endpoint requires the '0x' prefix for IDs | |
params = [("ids[]", f"0x{pid}") for pid in pyth_ids] | |
try: | |
resp = await self.client.get(self.PYTH_URL, params=params, timeout=10) | |
resp.raise_for_status() | |
# The new endpoint returns a list directly | |
price_data_list = resp.json() | |
# Map the returned prices back to our asset symbols | |
id_to_symbol = {f"0x{v['pyth_id']}": k for k, v in ASSET_CONFIG.items()} | |
return { | |
id_to_symbol[item['id']]: int(item['price']['price']) / (10 ** abs(int(item['price']['expo']))) | |
for item in price_data_list if item['id'] in id_to_symbol | |
} | |
except Exception as e: | |
logger.error(f"β Oracle Error (Pyth): {e}") | |
return {} | |
async def _fetch_aggregator_prices(self) -> Dict[str, float]: | |
coingecko_ids = ",".join([v['coingecko_id'] for v in ASSET_CONFIG.values()]) | |
params = {"ids": coingecko_ids, "vs_currencies": "usd"} | |
try: | |
resp = await self.client.get(self.AGGREGATOR_URL, params=params, timeout=10) | |
resp.raise_for_status() | |
data = resp.json() | |
id_to_symbol = {v['coingecko_id']: k for k, v in ASSET_CONFIG.items()} | |
return {id_to_symbol[cg_id]: prices['usd'] for cg_id, prices in data.items()} | |
except Exception as e: | |
logger.error(f"β Oracle Error (Aggregator): {e}") | |
return {} | |
async def update_prices_async(self): | |
pyth_task = self._fetch_pyth_prices() | |
agg_task = self._fetch_aggregator_prices() | |
pyth_prices, agg_prices = await asyncio.gather(pyth_task, agg_task) | |
async with self._lock: | |
for symbol in ASSET_CONFIG.keys(): | |
self._prices[symbol] = { | |
"pyth": pyth_prices.get(symbol), | |
"chainlink_agg": agg_prices.get(symbol) | |
} | |
logger.info(f"β Multi-Asset Prices Updated") | |
def get_all_prices(self) -> Dict[str, Dict[str, Optional[float]]]: | |
return self._prices.copy() |