mgbam commited on
Commit
fd31489
Β·
verified Β·
1 Parent(s): aaab60e

Update app/price_fetcher.py

Browse files
Files changed (1) hide show
  1. app/price_fetcher.py +162 -57
app/price_fetcher.py CHANGED
@@ -1,70 +1,175 @@
1
  """
2
- Background price cache with multi-API fallback and rate-limit handling
 
 
 
 
 
 
 
3
  """
4
- import httpx
5
  import logging
 
6
 
7
- # Primary and secondary APIs for crypto prices
8
- COINGECKO_URL = (
9
- "https://api.coingecko.com/api/v3/simple/price"
10
- "?ids=bitcoin,ethereum,dogecoin&vs_currencies=usd"
11
- )
12
- COINCAP_URL = (
13
- "https://api.coincap.io/v2/assets?ids=bitcoin,ethereum,dogecoin"
14
  )
15
 
16
- # Shared price cache
17
- CURRENT_PRICES = {"bitcoin": "--", "ethereum": "--", "dogecoin": "--"}
 
 
 
 
 
 
 
 
18
 
19
- # How often to retry each API before falling back (seconds)
20
- RETRY_DELAY = 5
 
21
 
 
 
 
22
 
23
- def fetch_prices() -> None:
24
- """
25
- Try CoinGecko first; on 429 or error, fall back to CoinCap.
26
- Updates CURRENT_PRICES in-place.
27
- """
28
- global CURRENT_PRICES
29
- apis = [
30
- ("CoinGecko", COINGECKO_URL),
31
- ("CoinCap", COINCAP_URL)
32
- ]
33
 
34
- for name, url in apis:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  try:
36
- resp = httpx.get(url, timeout=10)
37
- # Handle CoinCap JSON structure separately
38
- if name == "CoinGecko":
39
- resp.raise_for_status()
40
- data = resp.json()
41
- prices = {
42
- "bitcoin": data["bitcoin"]["usd"],
43
- "ethereum": data["ethereum"]["usd"],
44
- "dogecoin": data["dogecoin"]["usd"]
45
- }
46
- else:
47
- resp.raise_for_status()
48
- data = resp.json().get("data", [])
49
- prices = {item["id"]: float(item["priceUsd"]) for item in data}
50
-
51
- CURRENT_PRICES.update(prices)
52
- logging.info("βœ… [%s] prices updated: %s", name, prices)
53
- return
54
-
55
- except httpx.HTTPStatusError as e:
56
- status = e.response.status_code
57
- if status == 429:
58
- logging.warning("⚠️ [%s] rate limit (429). Retrying fallback.", name)
59
- else:
60
- logging.warning("⚠️ [%s] HTTP error %s: %s", name, status, e)
61
- except Exception as e:
62
- logging.warning("⚠️ [%s] fetch error: %s", name, e)
63
-
64
- # If we reach here, wait a bit before trying next API
65
  try:
66
- import time; time.sleep(RETRY_DELAY)
67
- except Exception:
68
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
- logging.error("❌ All price APIs failed. Keeping previous prices: %s", CURRENT_PRICES)
 
1
  """
2
+ Provides a robust, asynchronous PriceFetcher class for caching cryptocurrency prices.
3
+
4
+ Features:
5
+ - Asynchronous fetching using httpx.AsyncClient.
6
+ - Multi-API fallback for high availability.
7
+ - Rate-limit (429) and error handling.
8
+ - Concurrency-safe in-memory cache.
9
+ - Decoupled design for easy extension with new data sources.
10
  """
11
+ import asyncio
12
  import logging
13
+ from typing import Callable, TypedDict, Awaitable
14
 
15
+ import httpx
16
+
17
+ # --- Configuration ---
18
+ # Set up a structured logger
19
+ logging.basicConfig(
20
+ level=logging.INFO,
21
+ format="%(asctime)s - %(levelname)s - %(message)s",
22
  )
23
 
24
+ # Define the structure for a price parsing function
25
+ PriceParser = Callable[[dict], Awaitable[dict[str, float]]]
26
+
27
+ # Define the structure for a data source, linking a URL to its parser
28
+ class PriceSource(TypedDict):
29
+ name: str
30
+ url: str
31
+ params: dict
32
+ parser: PriceParser
33
+
34
 
35
+ # --- Main Class: PriceFetcher ---
36
+ class PriceFetcher:
37
+ """Manages fetching and caching crypto prices from multiple APIs asynchronously."""
38
 
39
+ def __init__(self, client: httpx.AsyncClient, coins: list[str]):
40
+ """
41
+ Initializes the PriceFetcher.
42
 
43
+ Args:
44
+ client: An instance of httpx.AsyncClient for making API calls.
45
+ coins: A list of coin IDs to fetch (e.g., ['bitcoin', 'ethereum']).
46
+ """
47
+ self.client = client
48
+ self.coins = coins
49
+ self._prices: dict[str, float | str] = {coin: "--" for coin in coins}
50
+ self._lock = asyncio.Lock() # Lock to prevent race conditions on the cache
51
+ self.sources: list[PriceSource] = self._configure_sources()
 
52
 
53
+ def _configure_sources(self) -> list[PriceSource]:
54
+ """Defines the API sources and their parsers."""
55
+ return [
56
+ {
57
+ "name": "CoinGecko",
58
+ "url": "https://api.coingecko.com/api/v3/simple/price",
59
+ "params": {
60
+ "ids": ",".join(self.coins),
61
+ "vs_currencies": "usd"
62
+ },
63
+ "parser": self._parse_coingecko,
64
+ },
65
+ {
66
+ "name": "CoinCap",
67
+ "url": "https://api.coincap.io/v2/assets",
68
+ "params": {"ids": ",".join(self.coins)},
69
+ "parser": self._parse_coincap,
70
+ },
71
+ ]
72
+
73
+ async def _parse_coingecko(self, data: dict) -> dict[str, float]:
74
+ """Parses the JSON response from CoinGecko."""
75
  try:
76
+ return {
77
+ coin: float(data[coin]["usd"])
78
+ for coin in self.coins if coin in data
79
+ }
80
+ except (KeyError, TypeError) as e:
81
+ logging.error("❌ [CoinGecko] Failed to parse response: %s", e)
82
+ return {}
83
+
84
+ async def _parse_coincap(self, data: dict) -> dict[str, float]:
85
+ """Parses the JSON response from CoinCap."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  try:
87
+ # CoinCap returns a list under the 'data' key
88
+ return {
89
+ item["id"]: float(item["priceUsd"])
90
+ for item in data.get("data", []) if item.get("id") in self.coins
91
+ }
92
+ except (KeyError, TypeError, ValueError) as e:
93
+ logging.error("❌ [CoinCap] Failed to parse response: %s", e)
94
+ return {}
95
+
96
+ def get_current_prices(self) -> dict[str, float | str]:
97
+ """Returns a copy of the current price cache. Thread-safe read."""
98
+ return self._prices.copy()
99
+
100
+ async def update_prices_async(self) -> None:
101
+ """
102
+ Asynchronously fetches prices, trying each source until one succeeds.
103
+ Updates the internal price cache in a concurrency-safe manner.
104
+ """
105
+ for source in self.sources:
106
+ name, url, params, parser = source.values()
107
+ try:
108
+ resp = await self.client.get(url, params=params, timeout=10)
109
+ resp.raise_for_status()
110
+
111
+ new_prices = await parser(resp.json())
112
+
113
+ if not new_prices: # Parser failed to extract data
114
+ continue
115
+
116
+ async with self._lock:
117
+ self._prices.update(new_prices)
118
+
119
+ logging.info("βœ… [%s] Prices updated: %s", name, new_prices)
120
+ return # Success, so we exit the loop
121
+
122
+ except httpx.HTTPStatusError as e:
123
+ status = e.response.status_code
124
+ log_msg = f"⚠️ [{name}] HTTP error {status}"
125
+ if status == 429:
126
+ log_msg += " (Rate Limit). Trying next source..."
127
+ logging.warning(log_msg)
128
+
129
+ except (httpx.RequestError, asyncio.TimeoutError) as e:
130
+ logging.warning("⚠️ [%s] Request failed: %s. Trying next source...", name, e)
131
+
132
+ # Brief pause before trying the next API source
133
+ await asyncio.sleep(1)
134
+
135
+ logging.error("❌ All price APIs failed. Retaining stale prices.")
136
+
137
+
138
+ async def run_price_updates_periodically(fetcher: PriceFetcher, interval_seconds: int):
139
+ """A background task runner to keep prices updated."""
140
+ logging.info("πŸš€ Starting periodic price updates...")
141
+ while True:
142
+ await fetcher.update_prices_async()
143
+ await asyncio.sleep(interval_seconds)
144
+
145
+
146
+ # --- Example Usage ---
147
+ if __name__ == "__main__":
148
+ async def main():
149
+ """Demonstrates how to use the PriceFetcher."""
150
+ target_coins = ["bitcoin", "ethereum", "dogecoin"]
151
+
152
+ async with httpx.AsyncClient() as client:
153
+ price_fetcher = PriceFetcher(client, coins=target_coins)
154
+
155
+ # Run the price updates in the background
156
+ update_task = asyncio.create_task(
157
+ run_price_updates_periodically(price_fetcher, interval_seconds=10)
158
+ )
159
+
160
+ # In a real app, the server would be running. Here, we just print prices.
161
+ for i in range(5):
162
+ await asyncio.sleep(11)
163
+ current_prices = price_fetcher.get_current_prices()
164
+ print(f"--- Main App Reading Cache ({i+1}/5) ---")
165
+ for coin, price in current_prices.items():
166
+ print(f" {coin.capitalize()}: ${price}")
167
+
168
+ # Cleanly shut down the background task
169
+ update_task.cancel()
170
+ try:
171
+ await update_task
172
+ except asyncio.CancelledError:
173
+ logging.info("Shutdown complete.")
174
 
175
+ asyncio.run(main())