mgbam commited on
Commit
76f2683
Β·
verified Β·
1 Parent(s): 5041ee3

Update app/price_fetcher.py

Browse files
Files changed (1) hide show
  1. app/price_fetcher.py +60 -23
app/price_fetcher.py CHANGED
@@ -1,7 +1,7 @@
1
  """
2
- A professional-grade, multi-oracle price engine.
3
- This engine fetches prices from two independent, globally-accessible
4
- decentralized oracles (Pyth and Chainlink) to detect market dislocations.
5
  """
6
  import asyncio
7
  import logging
@@ -10,43 +10,80 @@ import httpx
10
 
11
  logger = logging.getLogger(__name__)
12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  class PriceFetcher:
14
- PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43"
15
- CHAINLINK_URL = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd"
16
 
17
  def __init__(self, client: httpx.AsyncClient):
18
  self.client = client
19
- self._prices: Dict[str, Optional[float]] = {"pyth": None, "chainlink_agg": None}
20
  self._lock = asyncio.Lock()
21
 
22
- async def _fetch_pyth(self) -> Optional[float]:
 
 
23
  try:
24
- resp = await self.client.get(self.PYTH_URL, timeout=5)
25
  resp.raise_for_status()
26
- price_data = resp.json()['parsed'][0]['price']
27
- return int(price_data['price']) / (10 ** abs(int(price_data['expo'])))
 
 
 
 
 
 
28
  except Exception as e:
29
  logger.error(f"❌ Oracle Error (Pyth): {e}")
30
- return None
31
 
32
- async def _fetch_chainlink_agg(self) -> Optional[float]:
 
 
33
  try:
34
- resp = await self.client.get(self.CHAINLINK_URL, timeout=5)
35
  resp.raise_for_status()
36
- return float(resp.json()['bitcoin']['usd'])
 
 
 
 
37
  except Exception as e:
38
- logger.error(f"❌ Oracle Error (Chainlink Agg): {e}")
39
- return None
40
 
41
  async def update_prices_async(self):
42
- pyth_task = self._fetch_pyth()
43
- chainlink_task = self._fetch_chainlink_agg()
44
- pyth_price, chainlink_price = await asyncio.gather(pyth_task, chainlink_task)
45
 
46
  async with self._lock:
47
- self._prices["pyth"] = pyth_price
48
- self._prices["chainlink_agg"] = chainlink_price
49
- logger.info(f"βœ… Oracle Prices Updated: Pyth=${pyth_price}, Chainlink Agg=${chainlink_price}")
 
 
 
 
 
50
 
51
- def get_current_prices(self) -> Dict[str, Optional[float]]:
52
  return self._prices.copy()
 
1
  """
2
+ A professional-grade, multi-asset, multi-oracle price engine.
3
+ This engine dynamically fetches prices for a configured list of assets
4
+ from decentralized oracles to detect market dislocations.
5
  """
6
  import asyncio
7
  import logging
 
10
 
11
  logger = logging.getLogger(__name__)
12
 
13
+ # --- CONFIGURATION ---
14
+ # Define the assets we want to track and their corresponding oracle IDs.
15
+ # Find more IDs at https://pyth.network/developers/price-feeds
16
+ ASSET_CONFIG = {
17
+ "BTC": {
18
+ "pyth_id": "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415B43",
19
+ "coingecko_id": "bitcoin",
20
+ },
21
+ "ETH": {
22
+ "pyth_id": "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace",
23
+ "coingecko_id": "ethereum",
24
+ },
25
+ "SOL": {
26
+ "pyth_id": "ef0d8b6fda2ceba41da15d4095d1da392a0d2f8ed0c6c7bc0f4cfac8c280b56d",
27
+ "coingecko_id": "solana",
28
+ },
29
+ }
30
+
31
  class PriceFetcher:
32
+ PYTH_URL = "https://hermes.pyth.network/v2/updates/price/latest"
33
+ AGGREGATOR_URL = "https://api.coingecko.com/api/v3/simple/price"
34
 
35
  def __init__(self, client: httpx.AsyncClient):
36
  self.client = client
37
+ self._prices: Dict[str, Dict[str, Optional[float]]] = {}
38
  self._lock = asyncio.Lock()
39
 
40
+ async def _fetch_pyth_prices(self) -> Dict[str, float]:
41
+ pyth_ids = [v['pyth_id'] for v in ASSET_CONFIG.values()]
42
+ params = [("ids[]", pid) for pid in pyth_ids]
43
  try:
44
+ resp = await self.client.get(self.PYTH_URL, params=params, timeout=5)
45
  resp.raise_for_status()
46
+ parsed_data = resp.json().get('parsed', [])
47
+
48
+ # Map the returned prices back to our asset symbols (BTC, ETH, etc.)
49
+ id_to_symbol = {v['pyth_id']: k for k, v in ASSET_CONFIG.items()}
50
+ return {
51
+ id_to_symbol[item['id']]: int(item['price']['price']) / (10 ** abs(int(item['price']['expo'])))
52
+ for item in parsed_data if item['id'] in id_to_symbol
53
+ }
54
  except Exception as e:
55
  logger.error(f"❌ Oracle Error (Pyth): {e}")
56
+ return {}
57
 
58
+ async def _fetch_aggregator_prices(self) -> Dict[str, float]:
59
+ coingecko_ids = ",".join([v['coingecko_id'] for v in ASSET_CONFIG.values()])
60
+ params = {"ids": coingecko_ids, "vs_currencies": "usd"}
61
  try:
62
+ resp = await self.client.get(self.AGGREGATOR_URL, params=params, timeout=5)
63
  resp.raise_for_status()
64
+ data = resp.json()
65
+
66
+ # Map the returned prices back to our asset symbols
67
+ id_to_symbol = {v['coingecko_id']: k for k, v in ASSET_CONFIG.items()}
68
+ return {id_to_symbol[cg_id]: prices['usd'] for cg_id, prices in data.items()}
69
  except Exception as e:
70
+ logger.error(f"❌ Oracle Error (Aggregator): {e}")
71
+ return {}
72
 
73
  async def update_prices_async(self):
74
+ pyth_task = self._fetch_pyth_prices()
75
+ agg_task = self._fetch_aggregator_prices()
76
+ pyth_prices, agg_prices = await asyncio.gather(pyth_task, agg_task)
77
 
78
  async with self._lock:
79
+ # Combine the results for each asset
80
+ for symbol in ASSET_CONFIG.keys():
81
+ self._prices[symbol] = {
82
+ "pyth": pyth_prices.get(symbol),
83
+ "chainlink_agg": agg_prices.get(symbol)
84
+ }
85
+
86
+ logger.info(f"βœ… Multi-Asset Prices Updated: {self._prices}")
87
 
88
+ def get_all_prices(self) -> Dict[str, Dict[str, Optional[float]]]:
89
  return self._prices.copy()