CryptoSentinel_AI / app /price_fetcher.py
mgbam's picture
Update app/price_fetcher.py
19f368e verified
raw
history blame
1.93 kB
"""
A high-frequency, multi-exchange price fetcher.
It concurrently fetches prices from multiple sources for arbitrage detection.
"""
import asyncio
import logging
from typing import Dict, Optional
import httpx
logger = logging.getLogger(__name__)
class PriceFetcher:
SOURCES = {
"binance": "https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT",
"coinbase": "https://api.coinbase.com/v2/prices/BTC-USD/spot",
"kraken": "https://api.kraken.com/0/public/Ticker?pair=XBTUSDT",
}
def __init__(self, client: httpx.AsyncClient):
self.client = client
self._prices: Dict[str, Optional[float]] = {name: None for name in self.SOURCES}
self._lock = asyncio.Lock()
async def _fetch_from_source(self, name: str, url: str) -> Optional[float]:
try:
resp = await self.client.get(url, timeout=5)
resp.raise_for_status()
data = resp.json()
if name == "binance":
return float(data['price'])
elif name == "coinbase":
return float(data['data']['amount'])
elif name == "kraken":
# Kraken has a more complex structure
return float(data['result']['XXBTZUSD']['c'][0])
return None
except Exception as e:
logger.error(f"❌ Failed to fetch from {name}: {e}")
return None
async def update_prices_async(self):
tasks = [self._fetch_from_source(name, url) for name, url in self.SOURCES.items()]
results = await asyncio.gather(*tasks)
async with self._lock:
for name, price in zip(self.SOURCES.keys(), results):
if price:
self._prices[name] = price
logger.info(f"βœ… Prices updated: {self._prices}")
def get_current_prices(self) -> Dict[str, Optional[float]]:
return self._prices.copy()