Spaces:
Sleeping
Sleeping
""" | |
A high-frequency, multi-exchange price fetcher. | |
It concurrently fetches prices from multiple sources for arbitrage detection. | |
""" | |
import asyncio | |
import logging | |
from typing import Dict, Optional | |
import httpx | |
logger = logging.getLogger(__name__) | |
class PriceFetcher: | |
SOURCES = { | |
"binance": "https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT", | |
"coinbase": "https://api.coinbase.com/v2/prices/BTC-USD/spot", | |
"kraken": "https://api.kraken.com/0/public/Ticker?pair=XBTUSDT", | |
} | |
def __init__(self, client: httpx.AsyncClient): | |
self.client = client | |
self._prices: Dict[str, Optional[float]] = {name: None for name in self.SOURCES} | |
self._lock = asyncio.Lock() | |
async def _fetch_from_source(self, name: str, url: str) -> Optional[float]: | |
try: | |
resp = await self.client.get(url, timeout=5) | |
resp.raise_for_status() | |
data = resp.json() | |
if name == "binance": | |
return float(data['price']) | |
elif name == "coinbase": | |
return float(data['data']['amount']) | |
elif name == "kraken": | |
# Kraken has a more complex structure | |
return float(data['result']['XXBTZUSD']['c'][0]) | |
return None | |
except Exception as e: | |
logger.error(f"β Failed to fetch from {name}: {e}") | |
return None | |
async def update_prices_async(self): | |
tasks = [self._fetch_from_source(name, url) for name, url in self.SOURCES.items()] | |
results = await asyncio.gather(*tasks) | |
async with self._lock: | |
for name, price in zip(self.SOURCES.keys(), results): | |
if price: | |
self._prices[name] = price | |
logger.info(f"β Prices updated: {self._prices}") | |
def get_current_prices(self) -> Dict[str, Optional[float]]: | |
return self._prices.copy() |