Spaces:
Sleeping
Sleeping
| """ | |
| Provides a robust, asynchronous PriceFetcher class for caching cryptocurrency prices. | |
| Features: | |
| - Asynchronous fetching using httpx.AsyncClient. | |
| - Multi-API fallback for high availability. | |
| - Rate-limit (429) and error handling. | |
| - Concurrency-safe in-memory cache. | |
| - Decoupled design for easy extension with new data sources. | |
| """ | |
| import asyncio | |
| import logging | |
| # ==================================================================== | |
| # FIX APPLIED HERE (1 of 2) | |
| # ==================================================================== | |
| # Import Union for Python 3.9 compatibility, and other necessary types. | |
| from typing import Callable, TypedDict, Awaitable, Union | |
| # ==================================================================== | |
| import httpx | |
| # --- Configuration --- | |
| # Set up a structured logger | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| ) | |
| # Define the structure for a price parsing function | |
| PriceParser = Callable[[dict], Awaitable[dict[str, float]]] | |
| # Define the structure for a data source, linking a URL to its parser | |
| class PriceSource(TypedDict): | |
| name: str | |
| url: str | |
| params: dict | |
| parser: PriceParser | |
| # --- Main Class: PriceFetcher --- | |
| class PriceFetcher: | |
| """Manages fetching and caching crypto prices from multiple APIs asynchronously.""" | |
| def __init__(self, client: httpx.AsyncClient, coins: list[str]): | |
| """ | |
| Initializes the PriceFetcher. | |
| Args: | |
| client: An instance of httpx.AsyncClient for making API calls. | |
| coins: A list of coin IDs to fetch (e.g., ['bitcoin', 'ethereum']). | |
| """ | |
| self.client = client | |
| self.coins = coins | |
| self._prices: dict[str, Union[float, str]] = {coin: "--" for coin in coins} | |
| self._lock = asyncio.Lock() # Lock to prevent race conditions on the cache | |
| self.sources: list[PriceSource] = self._configure_sources() | |
| def _configure_sources(self) -> list[PriceSource]: | |
| """Defines the API sources and their parsers.""" | |
| return [ | |
| { | |
| "name": "CoinGecko", | |
| "url": "https://api.coingecko.com/api/v3/simple/price", | |
| "params": { | |
| "ids": ",".join(self.coins), | |
| "vs_currencies": "usd" | |
| }, | |
| "parser": self._parse_coingecko, | |
| }, | |
| { | |
| "name": "CoinCap", | |
| "url": "https://api.coincap.io/v2/assets", | |
| "params": {"ids": ",".join(self.coins)}, | |
| "parser": self._parse_coincap, | |
| }, | |
| ] | |
| async def _parse_coingecko(self, data: dict) -> dict[str, float]: | |
| """Parses the JSON response from CoinGecko.""" | |
| try: | |
| return { | |
| coin: float(data[coin]["usd"]) | |
| for coin in self.coins if coin in data | |
| } | |
| except (KeyError, TypeError) as e: | |
| logging.error("β [CoinGecko] Failed to parse response: %s", e) | |
| return {} | |
| async def _parse_coincap(self, data: dict) -> dict[str, float]: | |
| """Parses the JSON response from CoinCap.""" | |
| try: | |
| # CoinCap returns a list under the 'data' key | |
| return { | |
| item["id"]: float(item["priceUsd"]) | |
| for item in data.get("data", []) if item.get("id") in self.coins | |
| } | |
| except (KeyError, TypeError, ValueError) as e: | |
| logging.error("β [CoinCap] Failed to parse response: %s", e) | |
| return {} | |
| # ==================================================================== | |
| # FIX APPLIED HERE (2 of 2) | |
| # ==================================================================== | |
| # Changed the type hint from `float | str` to `Union[float, str]`. | |
| def get_current_prices(self) -> dict[str, Union[float, str]]: | |
| # ==================================================================== | |
| """Returns a copy of the current price cache. Thread-safe read.""" | |
| return self._prices.copy() | |
| async def update_prices_async(self) -> None: | |
| """ | |
| Asynchronously fetches prices, trying each source until one succeeds. | |
| Updates the internal price cache in a concurrency-safe manner. | |
| """ | |
| for source in self.sources: | |
| name, url, params, parser = source.values() | |
| try: | |
| resp = await self.client.get(url, params=params, timeout=10) | |
| resp.raise_for_status() | |
| new_prices = await parser(resp.json()) | |
| if not new_prices: # Parser failed to extract data | |
| continue | |
| async with self._lock: | |
| self._prices.update(new_prices) | |
| logging.info("β [%s] Prices updated: %s", name, new_prices) | |
| return # Success, so we exit the loop | |
| except httpx.HTTPStatusError as e: | |
| status = e.response.status_code | |
| log_msg = f"β οΈ [{name}] HTTP error {status}" | |
| if status == 429: | |
| log_msg += " (Rate Limit). Trying next source..." | |
| logging.warning(log_msg) | |
| except (httpx.RequestError, asyncio.TimeoutError) as e: | |
| logging.warning("β οΈ [%s] Request failed: %s. Trying next source...", name, e) | |
| # Brief pause before trying the next API source | |
| await asyncio.sleep(1) | |
| logging.error("β All price APIs failed. Retaining stale prices.") |