mgbam commited on
Commit
19f368e
·
verified ·
1 Parent(s): ea758f6

Update app/price_fetcher.py

Browse files
Files changed (1) hide show
  1. app/price_fetcher.py +41 -133
app/price_fetcher.py CHANGED
@@ -1,145 +1,53 @@
1
  """
2
- Provides a robust, asynchronous PriceFetcher class for caching cryptocurrency prices.
3
-
4
- Features:
5
- - Asynchronous fetching using httpx.AsyncClient.
6
- - Multi-API fallback for high availability.
7
- - Rate-limit (429) and error handling.
8
- - Concurrency-safe in-memory cache.
9
- - Decoupled design for easy extension with new data sources.
10
  """
11
  import asyncio
12
  import logging
13
- # ====================================================================
14
- # FIX APPLIED HERE (1 of 2)
15
- # ====================================================================
16
- # Import Union for Python 3.9 compatibility, and other necessary types.
17
- from typing import Callable, TypedDict, Awaitable, Union
18
- # ====================================================================
19
-
20
  import httpx
21
 
22
- # --- Configuration ---
23
- # Set up a structured logger
24
- logging.basicConfig(
25
- level=logging.INFO,
26
- format="%(asctime)s - %(levelname)s - %(message)s",
27
- )
28
-
29
- # Define the structure for a price parsing function
30
- PriceParser = Callable[[dict], Awaitable[dict[str, float]]]
31
-
32
- # Define the structure for a data source, linking a URL to its parser
33
- class PriceSource(TypedDict):
34
- name: str
35
- url: str
36
- params: dict
37
- parser: PriceParser
38
 
39
-
40
- # --- Main Class: PriceFetcher ---
41
  class PriceFetcher:
42
- """Manages fetching and caching crypto prices from multiple APIs asynchronously."""
43
-
44
- def __init__(self, client: httpx.AsyncClient, coins: list[str]):
45
- """
46
- Initializes the PriceFetcher.
47
 
48
- Args:
49
- client: An instance of httpx.AsyncClient for making API calls.
50
- coins: A list of coin IDs to fetch (e.g., ['bitcoin', 'ethereum']).
51
- """
52
  self.client = client
53
- self.coins = coins
54
- self._prices: dict[str, Union[float, str]] = {coin: "--" for coin in coins}
55
- self._lock = asyncio.Lock() # Lock to prevent race conditions on the cache
56
- self.sources: list[PriceSource] = self._configure_sources()
57
 
58
- def _configure_sources(self) -> list[PriceSource]:
59
- """Defines the API sources and their parsers."""
60
- return [
61
- {
62
- "name": "CoinGecko",
63
- "url": "https://api.coingecko.com/api/v3/simple/price",
64
- "params": {
65
- "ids": ",".join(self.coins),
66
- "vs_currencies": "usd"
67
- },
68
- "parser": self._parse_coingecko,
69
- },
70
- {
71
- "name": "CoinCap",
72
- "url": "https://api.coincap.io/v2/assets",
73
- "params": {"ids": ",".join(self.coins)},
74
- "parser": self._parse_coincap,
75
- },
76
- ]
77
-
78
- async def _parse_coingecko(self, data: dict) -> dict[str, float]:
79
- """Parses the JSON response from CoinGecko."""
80
  try:
81
- return {
82
- coin: float(data[coin]["usd"])
83
- for coin in self.coins if coin in data
84
- }
85
- except (KeyError, TypeError) as e:
86
- logging.error("❌ [CoinGecko] Failed to parse response: %s", e)
87
- return {}
88
-
89
- async def _parse_coincap(self, data: dict) -> dict[str, float]:
90
- """Parses the JSON response from CoinCap."""
91
- try:
92
- # CoinCap returns a list under the 'data' key
93
- return {
94
- item["id"]: float(item["priceUsd"])
95
- for item in data.get("data", []) if item.get("id") in self.coins
96
- }
97
- except (KeyError, TypeError, ValueError) as e:
98
- logging.error("❌ [CoinCap] Failed to parse response: %s", e)
99
- return {}
100
-
101
- # ====================================================================
102
- # FIX APPLIED HERE (2 of 2)
103
- # ====================================================================
104
- # Changed the type hint from `float | str` to `Union[float, str]`.
105
- def get_current_prices(self) -> dict[str, Union[float, str]]:
106
- # ====================================================================
107
- """Returns a copy of the current price cache. Thread-safe read."""
108
- return self._prices.copy()
109
-
110
- async def update_prices_async(self) -> None:
111
- """
112
- Asynchronously fetches prices, trying each source until one succeeds.
113
- Updates the internal price cache in a concurrency-safe manner.
114
- """
115
- for source in self.sources:
116
- name, url, params, parser = source.values()
117
- try:
118
- resp = await self.client.get(url, params=params, timeout=10)
119
- resp.raise_for_status()
120
-
121
- new_prices = await parser(resp.json())
122
-
123
- if not new_prices: # Parser failed to extract data
124
- continue
125
-
126
- async with self._lock:
127
- self._prices.update(new_prices)
128
-
129
- logging.info("✅ [%s] Prices updated: %s", name, new_prices)
130
- return # Success, so we exit the loop
131
-
132
- except httpx.HTTPStatusError as e:
133
- status = e.response.status_code
134
- log_msg = f"⚠️ [{name}] HTTP error {status}"
135
- if status == 429:
136
- log_msg += " (Rate Limit). Trying next source..."
137
- logging.warning(log_msg)
138
-
139
- except (httpx.RequestError, asyncio.TimeoutError) as e:
140
- logging.warning("⚠️ [%s] Request failed: %s. Trying next source...", name, e)
141
-
142
- # Brief pause before trying the next API source
143
- await asyncio.sleep(1)
144
-
145
- logging.error("❌ All price APIs failed. Retaining stale prices.")
 
1
  """
2
+ A high-frequency, multi-exchange price fetcher.
3
+ It concurrently fetches prices from multiple sources for arbitrage detection.
 
 
 
 
 
 
4
  """
5
  import asyncio
6
  import logging
7
+ from typing import Dict, Optional
 
 
 
 
 
 
8
  import httpx
9
 
10
+ logger = logging.getLogger(__name__)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
 
 
 
12
  class PriceFetcher:
13
+ SOURCES = {
14
+ "binance": "https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT",
15
+ "coinbase": "https://api.coinbase.com/v2/prices/BTC-USD/spot",
16
+ "kraken": "https://api.kraken.com/0/public/Ticker?pair=XBTUSDT",
17
+ }
18
 
19
+ def __init__(self, client: httpx.AsyncClient):
 
 
 
20
  self.client = client
21
+ self._prices: Dict[str, Optional[float]] = {name: None for name in self.SOURCES}
22
+ self._lock = asyncio.Lock()
 
 
23
 
24
+ async def _fetch_from_source(self, name: str, url: str) -> Optional[float]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  try:
26
+ resp = await self.client.get(url, timeout=5)
27
+ resp.raise_for_status()
28
+ data = resp.json()
29
+ if name == "binance":
30
+ return float(data['price'])
31
+ elif name == "coinbase":
32
+ return float(data['data']['amount'])
33
+ elif name == "kraken":
34
+ # Kraken has a more complex structure
35
+ return float(data['result']['XXBTZUSD']['c'][0])
36
+ return None
37
+ except Exception as e:
38
+ logger.error(f"❌ Failed to fetch from {name}: {e}")
39
+ return None
40
+
41
+ async def update_prices_async(self):
42
+ tasks = [self._fetch_from_source(name, url) for name, url in self.SOURCES.items()]
43
+ results = await asyncio.gather(*tasks)
44
+
45
+ async with self._lock:
46
+ for name, price in zip(self.SOURCES.keys(), results):
47
+ if price:
48
+ self._prices[name] = price
49
+
50
+ logger.info(f"✅ Prices updated: {self._prices}")
51
+
52
+ def get_current_prices(self) -> Dict[str, Optional[float]]:
53
+ return self._prices.copy()