Spaces:
Running
Running
import aiohttp | |
import asyncio | |
import time | |
from typing import Dict, Any, Optional, List | |
from src.config import config | |
import json | |
class RateLimiter: | |
def __init__(self, delay: float): | |
self.delay = delay | |
self.last_call = 0 | |
async def acquire(self): | |
now = time.time() | |
elapsed = now - self.last_call | |
if elapsed < self.delay: | |
await asyncio.sleep(self.delay - elapsed) | |
self.last_call = time.time() | |
class CoinGeckoClient: | |
def __init__(self): | |
self.rate_limiter = RateLimiter(config.RATE_LIMIT_DELAY) | |
self.session = None | |
async def get_session(self): | |
if self.session is None: | |
timeout = aiohttp.ClientTimeout(total=config.REQUEST_TIMEOUT) | |
self.session = aiohttp.ClientSession(timeout=timeout) | |
return self.session | |
async def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
await self.rate_limiter.acquire() | |
url = f"{config.COINGECKO_BASE_URL}/{endpoint}" | |
if params is None: | |
params = {} | |
if config.COINGECKO_API_KEY: | |
params["x_cg_demo_api_key"] = config.COINGECKO_API_KEY | |
session = await self.get_session() | |
for attempt in range(config.MAX_RETRIES): | |
try: | |
async with session.get(url, params=params) as response: | |
if response.status == 200: | |
return await response.json() | |
elif response.status == 429: | |
await asyncio.sleep(2 ** attempt) | |
continue | |
else: | |
raise Exception(f"API error: {response.status}") | |
except asyncio.TimeoutError: | |
if attempt == config.MAX_RETRIES - 1: | |
raise Exception("Request timeout") | |
await asyncio.sleep(1) | |
raise Exception("Max retries exceeded") | |
async def get_price(self, coin_ids: str, vs_currencies: str = "usd") -> Dict[str, Any]: | |
params = { | |
"ids": coin_ids, | |
"vs_currencies": vs_currencies, | |
"include_24hr_change": "true", | |
"include_24hr_vol": "true", | |
"include_market_cap": "true" | |
} | |
return await self._make_request("simple/price", params) | |
async def get_trending(self) -> Dict[str, Any]: | |
return await self._make_request("search/trending") | |
async def get_global_data(self) -> Dict[str, Any]: | |
return await self._make_request("global") | |
async def get_coin_data(self, coin_id: str) -> Dict[str, Any]: | |
params = {"localization": "false", "tickers": "false", "community_data": "false"} | |
return await self._make_request(f"coins/{coin_id}", params) | |
async def get_market_data(self, vs_currency: str = "usd", per_page: int = 10) -> Dict[str, Any]: | |
params = { | |
"vs_currency": vs_currency, | |
"order": "market_cap_desc", | |
"per_page": per_page, | |
"page": 1, | |
"sparkline": "false" | |
} | |
return await self._make_request("coins/markets", params) | |
async def get_price_history(self, coin_id: str, days: int = 7) -> Dict[str, Any]: | |
params = {"vs_currency": "usd", "days": days} | |
return await self._make_request(f"coins/{coin_id}/market_chart", params) | |
async def close(self): | |
if self.session: | |
await self.session.close() | |
class CryptoCompareClient: | |
def __init__(self): | |
self.rate_limiter = RateLimiter(config.RATE_LIMIT_DELAY) | |
self.session = None | |
async def get_session(self): | |
if self.session is None: | |
timeout = aiohttp.ClientTimeout(total=config.REQUEST_TIMEOUT) | |
self.session = aiohttp.ClientSession(timeout=timeout) | |
return self.session | |
async def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
await self.rate_limiter.acquire() | |
url = f"{config.CRYPTOCOMPARE_BASE_URL}/{endpoint}" | |
if params is None: | |
params = {} | |
if config.CRYPTOCOMPARE_API_KEY: | |
params["api_key"] = config.CRYPTOCOMPARE_API_KEY | |
session = await self.get_session() | |
for attempt in range(config.MAX_RETRIES): | |
try: | |
async with session.get(url, params=params) as response: | |
if response.status == 200: | |
data = await response.json() | |
if data.get("Response") == "Error": | |
raise Exception(data.get("Message", "API error")) | |
return data | |
elif response.status == 429: | |
await asyncio.sleep(2 ** attempt) | |
continue | |
else: | |
raise Exception(f"API error: {response.status}") | |
except asyncio.TimeoutError: | |
if attempt == config.MAX_RETRIES - 1: | |
raise Exception("Request timeout") | |
await asyncio.sleep(1) | |
raise Exception("Max retries exceeded") | |
async def get_price_multi(self, fsyms: str, tsyms: str = "USD") -> Dict[str, Any]: | |
params = {"fsyms": fsyms, "tsyms": tsyms} | |
return await self._make_request("pricemulti", params) | |
async def get_social_data(self, coin_symbol: str) -> Dict[str, Any]: | |
params = {"coinSymbol": coin_symbol} | |
return await self._make_request("social/coin/latest", params) | |
async def get_news(self, categories: str = "blockchain") -> Dict[str, Any]: | |
params = {"categories": categories} | |
return await self._make_request("news/", params) | |
async def close(self): | |
if self.session: | |
await self.session.close() | |
coingecko_client = CoinGeckoClient() | |
cryptocompare_client = CryptoCompareClient() | |