Spaces:
Running
Running
""" | |
Provides a robust, asynchronous PriceFetcher class for caching cryptocurrency prices. | |
Features: | |
- Asynchronous fetching using httpx.AsyncClient. | |
- Multi-API fallback for high availability. | |
- Rate-limit (429) and error handling. | |
- Concurrency-safe in-memory cache. | |
- Decoupled design for easy extension with new data sources. | |
""" | |
import asyncio | |
import logging | |
# ==================================================================== | |
# FIX APPLIED HERE (1 of 2) | |
# ==================================================================== | |
# Import Union for Python 3.9 compatibility, and other necessary types. | |
from typing import Callable, TypedDict, Awaitable, Union | |
# ==================================================================== | |
import httpx | |
# --- Configuration --- | |
# Set up a structured logger | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
) | |
# Define the structure for a price parsing function | |
PriceParser = Callable[[dict], Awaitable[dict[str, float]]] | |
# Define the structure for a data source, linking a URL to its parser | |
class PriceSource(TypedDict): | |
name: str | |
url: str | |
params: dict | |
parser: PriceParser | |
# --- Main Class: PriceFetcher --- | |
class PriceFetcher: | |
"""Manages fetching and caching crypto prices from multiple APIs asynchronously.""" | |
def __init__(self, client: httpx.AsyncClient, coins: list[str]): | |
""" | |
Initializes the PriceFetcher. | |
Args: | |
client: An instance of httpx.AsyncClient for making API calls. | |
coins: A list of coin IDs to fetch (e.g., ['bitcoin', 'ethereum']). | |
""" | |
self.client = client | |
self.coins = coins | |
self._prices: dict[str, Union[float, str]] = {coin: "--" for coin in coins} | |
self._lock = asyncio.Lock() # Lock to prevent race conditions on the cache | |
self.sources: list[PriceSource] = self._configure_sources() | |
def _configure_sources(self) -> list[PriceSource]: | |
"""Defines the API sources and their parsers.""" | |
return [ | |
{ | |
"name": "CoinGecko", | |
"url": "https://api.coingecko.com/api/v3/simple/price", | |
"params": { | |
"ids": ",".join(self.coins), | |
"vs_currencies": "usd" | |
}, | |
"parser": self._parse_coingecko, | |
}, | |
{ | |
"name": "CoinCap", | |
"url": "https://api.coincap.io/v2/assets", | |
"params": {"ids": ",".join(self.coins)}, | |
"parser": self._parse_coincap, | |
}, | |
] | |
async def _parse_coingecko(self, data: dict) -> dict[str, float]: | |
"""Parses the JSON response from CoinGecko.""" | |
try: | |
return { | |
coin: float(data[coin]["usd"]) | |
for coin in self.coins if coin in data | |
} | |
except (KeyError, TypeError) as e: | |
logging.error("β [CoinGecko] Failed to parse response: %s", e) | |
return {} | |
async def _parse_coincap(self, data: dict) -> dict[str, float]: | |
"""Parses the JSON response from CoinCap.""" | |
try: | |
# CoinCap returns a list under the 'data' key | |
return { | |
item["id"]: float(item["priceUsd"]) | |
for item in data.get("data", []) if item.get("id") in self.coins | |
} | |
except (KeyError, TypeError, ValueError) as e: | |
logging.error("β [CoinCap] Failed to parse response: %s", e) | |
return {} | |
# ==================================================================== | |
# FIX APPLIED HERE (2 of 2) | |
# ==================================================================== | |
# Changed the type hint from `float | str` to `Union[float, str]`. | |
def get_current_prices(self) -> dict[str, Union[float, str]]: | |
# ==================================================================== | |
"""Returns a copy of the current price cache. Thread-safe read.""" | |
return self._prices.copy() | |
async def update_prices_async(self) -> None: | |
""" | |
Asynchronously fetches prices, trying each source until one succeeds. | |
Updates the internal price cache in a concurrency-safe manner. | |
""" | |
for source in self.sources: | |
name, url, params, parser = source.values() | |
try: | |
resp = await self.client.get(url, params=params, timeout=10) | |
resp.raise_for_status() | |
new_prices = await parser(resp.json()) | |
if not new_prices: # Parser failed to extract data | |
continue | |
async with self._lock: | |
self._prices.update(new_prices) | |
logging.info("β [%s] Prices updated: %s", name, new_prices) | |
return # Success, so we exit the loop | |
except httpx.HTTPStatusError as e: | |
status = e.response.status_code | |
log_msg = f"β οΈ [{name}] HTTP error {status}" | |
if status == 429: | |
log_msg += " (Rate Limit). Trying next source..." | |
logging.warning(log_msg) | |
except (httpx.RequestError, asyncio.TimeoutError) as e: | |
logging.warning("β οΈ [%s] Request failed: %s. Trying next source...", name, e) | |
# Brief pause before trying the next API source | |
await asyncio.sleep(1) | |
logging.error("β All price APIs failed. Retaining stale prices.") |