web3-copilot / src /api_clients.py
“Transcendental-Programmer”
feat: Initial commit with project structure and initial files
20eee66
raw
history blame
6.04 kB
import aiohttp
import asyncio
import time
from typing import Dict, Any, Optional, List
from src.config import config
import json
class RateLimiter:
def __init__(self, delay: float):
self.delay = delay
self.last_call = 0
async def acquire(self):
now = time.time()
elapsed = now - self.last_call
if elapsed < self.delay:
await asyncio.sleep(self.delay - elapsed)
self.last_call = time.time()
class CoinGeckoClient:
def __init__(self):
self.rate_limiter = RateLimiter(config.RATE_LIMIT_DELAY)
self.session = None
async def get_session(self):
if self.session is None:
timeout = aiohttp.ClientTimeout(total=config.REQUEST_TIMEOUT)
self.session = aiohttp.ClientSession(timeout=timeout)
return self.session
async def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
await self.rate_limiter.acquire()
url = f"{config.COINGECKO_BASE_URL}/{endpoint}"
if params is None:
params = {}
if config.COINGECKO_API_KEY:
params["x_cg_demo_api_key"] = config.COINGECKO_API_KEY
session = await self.get_session()
for attempt in range(config.MAX_RETRIES):
try:
async with session.get(url, params=params) as response:
if response.status == 200:
return await response.json()
elif response.status == 429:
await asyncio.sleep(2 ** attempt)
continue
else:
raise Exception(f"API error: {response.status}")
except asyncio.TimeoutError:
if attempt == config.MAX_RETRIES - 1:
raise Exception("Request timeout")
await asyncio.sleep(1)
raise Exception("Max retries exceeded")
async def get_price(self, coin_ids: str, vs_currencies: str = "usd") -> Dict[str, Any]:
params = {
"ids": coin_ids,
"vs_currencies": vs_currencies,
"include_24hr_change": "true",
"include_24hr_vol": "true",
"include_market_cap": "true"
}
return await self._make_request("simple/price", params)
async def get_trending(self) -> Dict[str, Any]:
return await self._make_request("search/trending")
async def get_global_data(self) -> Dict[str, Any]:
return await self._make_request("global")
async def get_coin_data(self, coin_id: str) -> Dict[str, Any]:
params = {"localization": "false", "tickers": "false", "community_data": "false"}
return await self._make_request(f"coins/{coin_id}", params)
async def get_market_data(self, vs_currency: str = "usd", per_page: int = 10) -> Dict[str, Any]:
params = {
"vs_currency": vs_currency,
"order": "market_cap_desc",
"per_page": per_page,
"page": 1,
"sparkline": "false"
}
return await self._make_request("coins/markets", params)
async def get_price_history(self, coin_id: str, days: int = 7) -> Dict[str, Any]:
params = {"vs_currency": "usd", "days": days}
return await self._make_request(f"coins/{coin_id}/market_chart", params)
async def close(self):
if self.session:
await self.session.close()
class CryptoCompareClient:
def __init__(self):
self.rate_limiter = RateLimiter(config.RATE_LIMIT_DELAY)
self.session = None
async def get_session(self):
if self.session is None:
timeout = aiohttp.ClientTimeout(total=config.REQUEST_TIMEOUT)
self.session = aiohttp.ClientSession(timeout=timeout)
return self.session
async def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
await self.rate_limiter.acquire()
url = f"{config.CRYPTOCOMPARE_BASE_URL}/{endpoint}"
if params is None:
params = {}
if config.CRYPTOCOMPARE_API_KEY:
params["api_key"] = config.CRYPTOCOMPARE_API_KEY
session = await self.get_session()
for attempt in range(config.MAX_RETRIES):
try:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
if data.get("Response") == "Error":
raise Exception(data.get("Message", "API error"))
return data
elif response.status == 429:
await asyncio.sleep(2 ** attempt)
continue
else:
raise Exception(f"API error: {response.status}")
except asyncio.TimeoutError:
if attempt == config.MAX_RETRIES - 1:
raise Exception("Request timeout")
await asyncio.sleep(1)
raise Exception("Max retries exceeded")
async def get_price_multi(self, fsyms: str, tsyms: str = "USD") -> Dict[str, Any]:
params = {"fsyms": fsyms, "tsyms": tsyms}
return await self._make_request("pricemulti", params)
async def get_social_data(self, coin_symbol: str) -> Dict[str, Any]:
params = {"coinSymbol": coin_symbol}
return await self._make_request("social/coin/latest", params)
async def get_news(self, categories: str = "blockchain") -> Dict[str, Any]:
params = {"categories": categories}
return await self._make_request("news/", params)
async def close(self):
if self.session:
await self.session.close()
coingecko_client = CoinGeckoClient()
cryptocompare_client = CryptoCompareClient()