Spaces:
Running
Running
File size: 6,041 Bytes
20eee66 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import aiohttp
import asyncio
import time
from typing import Dict, Any, Optional, List
from src.config import config
import json
class RateLimiter:
def __init__(self, delay: float):
self.delay = delay
self.last_call = 0
async def acquire(self):
now = time.time()
elapsed = now - self.last_call
if elapsed < self.delay:
await asyncio.sleep(self.delay - elapsed)
self.last_call = time.time()
class CoinGeckoClient:
def __init__(self):
self.rate_limiter = RateLimiter(config.RATE_LIMIT_DELAY)
self.session = None
async def get_session(self):
if self.session is None:
timeout = aiohttp.ClientTimeout(total=config.REQUEST_TIMEOUT)
self.session = aiohttp.ClientSession(timeout=timeout)
return self.session
async def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
await self.rate_limiter.acquire()
url = f"{config.COINGECKO_BASE_URL}/{endpoint}"
if params is None:
params = {}
if config.COINGECKO_API_KEY:
params["x_cg_demo_api_key"] = config.COINGECKO_API_KEY
session = await self.get_session()
for attempt in range(config.MAX_RETRIES):
try:
async with session.get(url, params=params) as response:
if response.status == 200:
return await response.json()
elif response.status == 429:
await asyncio.sleep(2 ** attempt)
continue
else:
raise Exception(f"API error: {response.status}")
except asyncio.TimeoutError:
if attempt == config.MAX_RETRIES - 1:
raise Exception("Request timeout")
await asyncio.sleep(1)
raise Exception("Max retries exceeded")
async def get_price(self, coin_ids: str, vs_currencies: str = "usd") -> Dict[str, Any]:
params = {
"ids": coin_ids,
"vs_currencies": vs_currencies,
"include_24hr_change": "true",
"include_24hr_vol": "true",
"include_market_cap": "true"
}
return await self._make_request("simple/price", params)
async def get_trending(self) -> Dict[str, Any]:
return await self._make_request("search/trending")
async def get_global_data(self) -> Dict[str, Any]:
return await self._make_request("global")
async def get_coin_data(self, coin_id: str) -> Dict[str, Any]:
params = {"localization": "false", "tickers": "false", "community_data": "false"}
return await self._make_request(f"coins/{coin_id}", params)
async def get_market_data(self, vs_currency: str = "usd", per_page: int = 10) -> Dict[str, Any]:
params = {
"vs_currency": vs_currency,
"order": "market_cap_desc",
"per_page": per_page,
"page": 1,
"sparkline": "false"
}
return await self._make_request("coins/markets", params)
async def get_price_history(self, coin_id: str, days: int = 7) -> Dict[str, Any]:
params = {"vs_currency": "usd", "days": days}
return await self._make_request(f"coins/{coin_id}/market_chart", params)
async def close(self):
if self.session:
await self.session.close()
class CryptoCompareClient:
def __init__(self):
self.rate_limiter = RateLimiter(config.RATE_LIMIT_DELAY)
self.session = None
async def get_session(self):
if self.session is None:
timeout = aiohttp.ClientTimeout(total=config.REQUEST_TIMEOUT)
self.session = aiohttp.ClientSession(timeout=timeout)
return self.session
async def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
await self.rate_limiter.acquire()
url = f"{config.CRYPTOCOMPARE_BASE_URL}/{endpoint}"
if params is None:
params = {}
if config.CRYPTOCOMPARE_API_KEY:
params["api_key"] = config.CRYPTOCOMPARE_API_KEY
session = await self.get_session()
for attempt in range(config.MAX_RETRIES):
try:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
if data.get("Response") == "Error":
raise Exception(data.get("Message", "API error"))
return data
elif response.status == 429:
await asyncio.sleep(2 ** attempt)
continue
else:
raise Exception(f"API error: {response.status}")
except asyncio.TimeoutError:
if attempt == config.MAX_RETRIES - 1:
raise Exception("Request timeout")
await asyncio.sleep(1)
raise Exception("Max retries exceeded")
async def get_price_multi(self, fsyms: str, tsyms: str = "USD") -> Dict[str, Any]:
params = {"fsyms": fsyms, "tsyms": tsyms}
return await self._make_request("pricemulti", params)
async def get_social_data(self, coin_symbol: str) -> Dict[str, Any]:
params = {"coinSymbol": coin_symbol}
return await self._make_request("social/coin/latest", params)
async def get_news(self, categories: str = "blockchain") -> Dict[str, Any]:
params = {"categories": categories}
return await self._make_request("news/", params)
async def close(self):
if self.session:
await self.session.close()
coingecko_client = CoinGeckoClient()
cryptocompare_client = CryptoCompareClient()
|