mrradix's picture
Upload 48 files
8e4018d verified
raw
history blame
47.3 kB
import requests
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import time
from utils.logging import setup_logger
from utils.error_handling import handle_exceptions, IntegrationError
from utils.storage import load_data, save_data
# Initialize logger
logger = setup_logger(__name__)
class CryptoIntegration:
"""Cryptocurrency API integration for market data"""
def __init__(self, api_key: Optional[str] = None, provider: str = "coingecko"):
"""Initialize Cryptocurrency API integration
Args:
api_key: API key for the crypto provider (optional for some providers)
provider: Crypto data provider (default: coingecko)
"""
self.api_key = api_key
self.provider = provider.lower()
self.cache = {}
self.cache_expiry = {}
# Set up provider-specific configurations
self.providers = {
"coingecko": {
"base_url": "https://api.coingecko.com/api/v3",
"pro_url": "https://pro-api.coingecko.com/api/v3",
"cache_duration": 300 # 5 minutes
},
"coinmarketcap": {
"base_url": "https://pro-api.coinmarketcap.com/v1",
"cache_duration": 300 # 5 minutes
},
"binance": {
"base_url": "https://api.binance.com/api/v3",
"cache_duration": 60 # 1 minute
}
}
@handle_exceptions
def set_api_key(self, api_key: str) -> None:
"""Set API key for the crypto provider
Args:
api_key: API key
"""
self.api_key = api_key
# Clear cache when API key changes
self.cache = {}
self.cache_expiry = {}
@handle_exceptions
def set_provider(self, provider: str) -> None:
"""Set crypto data provider
Args:
provider: Crypto data provider
"""
provider = provider.lower()
if provider not in self.providers:
raise IntegrationError(f"Unsupported crypto provider: {provider}")
self.provider = provider
# Clear cache when provider changes
self.cache = {}
self.cache_expiry = {}
@handle_exceptions
def test_connection(self) -> bool:
"""Test cryptocurrency API connection
Returns:
True if connection is successful, False otherwise
"""
try:
if self.provider == "coingecko":
# Test with a simple ping request
url = f"{self.providers[self.provider]['base_url']}/ping"
response = requests.get(url)
return response.status_code == 200
elif self.provider == "coinmarketcap":
# API key is required for CoinMarketCap
if not self.api_key:
logger.error("CoinMarketCap API key not set")
return False
# Test with a simple status request
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map"
headers = {
"X-CMC_PRO_API_KEY": self.api_key
}
params = {
"limit": 1
}
response = requests.get(url, headers=headers, params=params)
return response.status_code == 200
elif self.provider == "binance":
# Test with a simple ping request
url = f"{self.providers[self.provider]['base_url']}/ping"
response = requests.get(url)
return response.status_code == 200
return False
except Exception as e:
logger.error(f"Crypto API connection test failed: {str(e)}")
return False
@handle_exceptions
def get_market_data(self, coins: Optional[List[str]] = None, vs_currency: str = "usd",
page: int = 1, per_page: int = 100) -> List[Dict[str, Any]]:
"""Get cryptocurrency market data
Args:
coins: List of coin IDs (optional)
vs_currency: Currency to compare against (default: usd)
page: Page number (default: 1)
per_page: Number of results per page (default: 100)
Returns:
List of cryptocurrency market data
"""
# Check cache
cache_key = f"market_{','.join(coins) if coins else 'all'}_{vs_currency}_{page}_{per_page}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "coingecko":
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/markets"
params = {
"vs_currency": vs_currency,
"page": page,
"per_page": per_page,
"order": "market_cap_desc"
}
# Add API key if available
if self.api_key:
params["x_cg_pro_api_key"] = self.api_key
# Add optional parameters
if coins:
params["ids"] = ','.join(coins)
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get market data: {response.text}")
data = response.json()
# Process data into a standardized format
market_data = self._process_coingecko_market_data(data)
elif self.provider == "coinmarketcap":
if not self.api_key:
raise IntegrationError("CoinMarketCap API key not set")
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/listings/latest"
headers = {
"X-CMC_PRO_API_KEY": self.api_key
}
params = {
"start": (page - 1) * per_page + 1,
"limit": per_page,
"convert": vs_currency.upper()
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get market data: {response.text}")
data = response.json()
# Process data into a standardized format
market_data = self._process_coinmarketcap_market_data(data, vs_currency)
elif self.provider == "binance":
# Binance doesn't have a direct market data endpoint like CoinGecko
# We'll use the ticker price endpoint and then fetch additional data
url = f"{self.providers[self.provider]['base_url']}/ticker/24hr"
response = requests.get(url)
if response.status_code != 200:
raise IntegrationError(f"Failed to get market data: {response.text}")
data = response.json()
# Process data into a standardized format
market_data = self._process_binance_market_data(data, vs_currency)
# Filter by coins if specified
if coins:
market_data = [item for item in market_data if item["id"] in coins]
# Apply pagination
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
market_data = market_data[start_idx:end_idx]
else:
raise IntegrationError(f"Unsupported crypto provider: {self.provider}")
# Cache the result
self.cache[cache_key] = market_data
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return market_data
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to get market data: {str(e)}")
raise IntegrationError(f"Failed to get market data: {str(e)}")
raise
@handle_exceptions
def get_coin_details(self, coin_id: str) -> Dict[str, Any]:
"""Get detailed information about a specific cryptocurrency
Args:
coin_id: Coin ID
Returns:
Detailed coin information
"""
# Check cache
cache_key = f"coin_{coin_id}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "coingecko":
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/{coin_id}"
params = {
"localization": "false",
"tickers": "false",
"market_data": "true",
"community_data": "true",
"developer_data": "true"
}
# Add API key if available
if self.api_key:
params["x_cg_pro_api_key"] = self.api_key
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get coin details: {response.text}")
data = response.json()
# Process data into a standardized format
coin_details = self._process_coingecko_coin_details(data)
elif self.provider == "coinmarketcap":
if not self.api_key:
raise IntegrationError("CoinMarketCap API key not set")
# First, get the CMC ID for the coin
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map"
headers = {
"X-CMC_PRO_API_KEY": self.api_key
}
params = {
"symbol": coin_id.upper()
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get coin ID: {response.text}")
data = response.json()
if not data.get("data") or len(data["data"]) == 0:
raise IntegrationError(f"Coin not found: {coin_id}")
cmc_id = data["data"][0]["id"]
# Now get the coin details
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/info"
params = {
"id": cmc_id
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get coin details: {response.text}")
data = response.json()
# Process data into a standardized format
coin_details = self._process_coinmarketcap_coin_details(data, cmc_id)
elif self.provider == "binance":
# Binance doesn't have a detailed coin info endpoint
# We'll use the ticker price and 24hr stats endpoints
url = f"{self.providers[self.provider]['base_url']}/ticker/24hr"
params = {
"symbol": f"{coin_id.upper()}USDT" # Assuming USDT pair
}
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get coin details: {response.text}")
data = response.json()
# Process data into a standardized format
coin_details = self._process_binance_coin_details(data, coin_id)
else:
raise IntegrationError(f"Unsupported crypto provider: {self.provider}")
# Cache the result
self.cache[cache_key] = coin_details
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return coin_details
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to get coin details: {str(e)}")
raise IntegrationError(f"Failed to get coin details: {str(e)}")
raise
@handle_exceptions
def get_price_history(self, coin_id: str, vs_currency: str = "usd",
days: str = "30", interval: str = "daily") -> Dict[str, Any]:
"""Get price history for a specific cryptocurrency
Args:
coin_id: Coin ID
vs_currency: Currency to compare against (default: usd)
days: Number of days of data to retrieve (default: 30)
interval: Data interval (default: daily)
Returns:
Price history data
"""
# Check cache
cache_key = f"history_{coin_id}_{vs_currency}_{days}_{interval}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "coingecko":
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/{coin_id}/market_chart"
params = {
"vs_currency": vs_currency,
"days": days,
"interval": interval if days != "max" else "daily"
}
# Add API key if available
if self.api_key:
params["x_cg_pro_api_key"] = self.api_key
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get price history: {response.text}")
data = response.json()
# Process data into a standardized format
price_history = self._process_coingecko_price_history(data)
elif self.provider == "coinmarketcap":
if not self.api_key:
raise IntegrationError("CoinMarketCap API key not set")
# First, get the CMC ID for the coin
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map"
headers = {
"X-CMC_PRO_API_KEY": self.api_key
}
params = {
"symbol": coin_id.upper()
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get coin ID: {response.text}")
data = response.json()
if not data.get("data") or len(data["data"]) == 0:
raise IntegrationError(f"Coin not found: {coin_id}")
cmc_id = data["data"][0]["id"]
# Now get the historical data
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/quotes/historical"
# Convert days to time range
time_end = datetime.now()
time_start = time_end - timedelta(days=int(days))
params = {
"id": cmc_id,
"time_start": time_start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"time_end": time_end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"interval": self._convert_interval(interval),
"convert": vs_currency.upper()
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get price history: {response.text}")
data = response.json()
# Process data into a standardized format
price_history = self._process_coinmarketcap_price_history(data, vs_currency)
elif self.provider == "binance":
# Convert interval to Binance format
binance_interval = self._convert_binance_interval(interval)
url = f"{self.providers[self.provider]['base_url']}/klines"
params = {
"symbol": f"{coin_id.upper()}USDT", # Assuming USDT pair
"interval": binance_interval,
"limit": 1000 # Maximum allowed
}
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get price history: {response.text}")
data = response.json()
# Process data into a standardized format
price_history = self._process_binance_price_history(data)
else:
raise IntegrationError(f"Unsupported crypto provider: {self.provider}")
# Cache the result
self.cache[cache_key] = price_history
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return price_history
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to get price history: {str(e)}")
raise IntegrationError(f"Failed to get price history: {str(e)}")
raise
@handle_exceptions
def search_coins(self, query: str) -> List[Dict[str, Any]]:
"""Search for cryptocurrencies
Args:
query: Search query
Returns:
List of matching cryptocurrencies
"""
# Check cache
cache_key = f"search_{query}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "coingecko":
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/search"
params = {
"query": query
}
# Add API key if available
if self.api_key:
params["x_cg_pro_api_key"] = self.api_key
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to search coins: {response.text}")
data = response.json()
# Process data
search_results = []
for coin in data.get("coins", []):
search_results.append({
"id": coin.get("id", ""),
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"market_cap_rank": coin.get("market_cap_rank", None)
})
elif self.provider == "coinmarketcap":
if not self.api_key:
raise IntegrationError("CoinMarketCap API key not set")
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map"
headers = {
"X-CMC_PRO_API_KEY": self.api_key
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise IntegrationError(f"Failed to search coins: {response.text}")
data = response.json()
# Process data and filter by query
search_results = []
for coin in data.get("data", []):
if query.lower() in coin.get("name", "").lower() or query.lower() in coin.get("symbol", "").lower():
search_results.append({
"id": coin.get("symbol", "").lower(),
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"market_cap_rank": coin.get("rank", None)
})
elif self.provider == "binance":
# Binance doesn't have a search endpoint
# We'll use the exchange info endpoint and filter the results
url = f"{self.providers[self.provider]['base_url']}/exchangeInfo"
response = requests.get(url)
if response.status_code != 200:
raise IntegrationError(f"Failed to search coins: {response.text}")
data = response.json()
# Process data and filter by query
search_results = []
for symbol in data.get("symbols", []):
base_asset = symbol.get("baseAsset", "")
if query.lower() in base_asset.lower() and symbol.get("quoteAsset") == "USDT":
search_results.append({
"id": base_asset.lower(),
"symbol": base_asset.upper(),
"name": base_asset.upper(), # Binance doesn't provide full names
"market_cap_rank": None
})
# Remove duplicates
unique_results = []
seen_ids = set()
for result in search_results:
if result["id"] not in seen_ids:
unique_results.append(result)
seen_ids.add(result["id"])
search_results = unique_results
else:
raise IntegrationError(f"Unsupported crypto provider: {self.provider}")
# Cache the result
self.cache[cache_key] = search_results
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return search_results
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to search coins: {str(e)}")
raise IntegrationError(f"Failed to search coins: {str(e)}")
raise
def _process_coingecko_market_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process CoinGecko market data
Args:
data: Raw API response data
Returns:
Processed market data
"""
market_data = []
for coin in data:
processed_coin = {
"id": coin.get("id", ""),
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"image": coin.get("image", ""),
"current_price": coin.get("current_price", 0),
"market_cap": coin.get("market_cap", 0),
"market_cap_rank": coin.get("market_cap_rank", 0),
"total_volume": coin.get("total_volume", 0),
"high_24h": coin.get("high_24h", 0),
"low_24h": coin.get("low_24h", 0),
"price_change_24h": coin.get("price_change_24h", 0),
"price_change_percentage_24h": coin.get("price_change_percentage_24h", 0),
"market_cap_change_24h": coin.get("market_cap_change_24h", 0),
"market_cap_change_percentage_24h": coin.get("market_cap_change_percentage_24h", 0),
"circulating_supply": coin.get("circulating_supply", 0),
"total_supply": coin.get("total_supply", 0),
"max_supply": coin.get("max_supply", 0),
"last_updated": coin.get("last_updated", "")
}
market_data.append(processed_coin)
return market_data
def _process_coinmarketcap_market_data(self, data: Dict[str, Any], vs_currency: str) -> List[Dict[str, Any]]:
"""Process CoinMarketCap market data
Args:
data: Raw API response data
vs_currency: Currency to compare against
Returns:
Processed market data
"""
market_data = []
vs_currency = vs_currency.upper()
for coin in data.get("data", []):
quote = coin.get("quote", {}).get(vs_currency, {})
processed_coin = {
"id": coin.get("symbol", "").lower(),
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"image": "", # CoinMarketCap doesn't provide images in this endpoint
"current_price": quote.get("price", 0),
"market_cap": quote.get("market_cap", 0),
"market_cap_rank": coin.get("cmc_rank", 0),
"total_volume": quote.get("volume_24h", 0),
"high_24h": 0, # Not provided
"low_24h": 0, # Not provided
"price_change_24h": quote.get("volume_change_24h", 0),
"price_change_percentage_24h": quote.get("percent_change_24h", 0),
"market_cap_change_24h": 0, # Not provided
"market_cap_change_percentage_24h": quote.get("market_cap_change_percentage_24h", 0),
"circulating_supply": coin.get("circulating_supply", 0),
"total_supply": coin.get("total_supply", 0),
"max_supply": coin.get("max_supply", 0),
"last_updated": coin.get("last_updated", "")
}
market_data.append(processed_coin)
return market_data
def _process_binance_market_data(self, data: List[Dict[str, Any]], vs_currency: str) -> List[Dict[str, Any]]:
"""Process Binance market data
Args:
data: Raw API response data
vs_currency: Currency to compare against
Returns:
Processed market data
"""
market_data = []
vs_currency = vs_currency.upper()
for ticker in data:
symbol = ticker.get("symbol", "")
# Only include pairs with the specified currency
if not symbol.endswith(vs_currency):
continue
base_asset = symbol[:-len(vs_currency)].lower()
processed_coin = {
"id": base_asset,
"symbol": base_asset.upper(),
"name": base_asset.upper(), # Binance doesn't provide full names
"image": "", # Not provided
"current_price": float(ticker.get("lastPrice", 0)),
"market_cap": 0, # Not provided
"market_cap_rank": 0, # Not provided
"total_volume": float(ticker.get("volume", 0)),
"high_24h": float(ticker.get("highPrice", 0)),
"low_24h": float(ticker.get("lowPrice", 0)),
"price_change_24h": float(ticker.get("priceChange", 0)),
"price_change_percentage_24h": float(ticker.get("priceChangePercent", 0)),
"market_cap_change_24h": 0, # Not provided
"market_cap_change_percentage_24h": 0, # Not provided
"circulating_supply": 0, # Not provided
"total_supply": 0, # Not provided
"max_supply": 0, # Not provided
"last_updated": datetime.fromtimestamp(int(ticker.get("closeTime", 0)) / 1000).isoformat()
}
market_data.append(processed_coin)
return market_data
def _process_coingecko_coin_details(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process CoinGecko coin details
Args:
data: Raw API response data
Returns:
Processed coin details
"""
market_data = data.get("market_data", {})
return {
"id": data.get("id", ""),
"symbol": data.get("symbol", "").upper(),
"name": data.get("name", ""),
"description": data.get("description", {}).get("en", ""),
"image": data.get("image", {}).get("large", ""),
"current_price": market_data.get("current_price", {}).get("usd", 0),
"market_cap": market_data.get("market_cap", {}).get("usd", 0),
"market_cap_rank": market_data.get("market_cap_rank", 0),
"total_volume": market_data.get("total_volume", {}).get("usd", 0),
"high_24h": market_data.get("high_24h", {}).get("usd", 0),
"low_24h": market_data.get("low_24h", {}).get("usd", 0),
"price_change_24h": market_data.get("price_change_24h", 0),
"price_change_percentage_24h": market_data.get("price_change_percentage_24h", 0),
"market_cap_change_24h": market_data.get("market_cap_change_24h", 0),
"market_cap_change_percentage_24h": market_data.get("market_cap_change_percentage_24h", 0),
"circulating_supply": market_data.get("circulating_supply", 0),
"total_supply": market_data.get("total_supply", 0),
"max_supply": market_data.get("max_supply", 0),
"ath": market_data.get("ath", {}).get("usd", 0),
"ath_change_percentage": market_data.get("ath_change_percentage", {}).get("usd", 0),
"ath_date": market_data.get("ath_date", {}).get("usd", ""),
"atl": market_data.get("atl", {}).get("usd", 0),
"atl_change_percentage": market_data.get("atl_change_percentage", {}).get("usd", 0),
"atl_date": market_data.get("atl_date", {}).get("usd", ""),
"last_updated": data.get("last_updated", ""),
"links": {
"homepage": data.get("links", {}).get("homepage", [""])[0],
"blockchain_site": data.get("links", {}).get("blockchain_site", [""])[0],
"official_forum_url": data.get("links", {}).get("official_forum_url", [""])[0],
"chat_url": data.get("links", {}).get("chat_url", [""])[0],
"announcement_url": data.get("links", {}).get("announcement_url", [""])[0],
"twitter_screen_name": data.get("links", {}).get("twitter_screen_name", ""),
"facebook_username": data.get("links", {}).get("facebook_username", ""),
"github_url": data.get("links", {}).get("repos_url", {}).get("github", [""])[0],
"subreddit_url": data.get("links", {}).get("subreddit_url", "")
},
"categories": data.get("categories", []),
"genesis_date": data.get("genesis_date", ""),
"sentiment_votes_up_percentage": data.get("sentiment_votes_up_percentage", 0),
"sentiment_votes_down_percentage": data.get("sentiment_votes_down_percentage", 0),
"developer_data": {
"forks": data.get("developer_data", {}).get("forks", 0),
"stars": data.get("developer_data", {}).get("stars", 0),
"subscribers": data.get("developer_data", {}).get("subscribers", 0),
"total_issues": data.get("developer_data", {}).get("total_issues", 0),
"closed_issues": data.get("developer_data", {}).get("closed_issues", 0),
"pull_requests_merged": data.get("developer_data", {}).get("pull_requests_merged", 0),
"pull_request_contributors": data.get("developer_data", {}).get("pull_request_contributors", 0),
"commit_count_4_weeks": data.get("developer_data", {}).get("commit_count_4_weeks", 0)
},
"community_data": {
"twitter_followers": data.get("community_data", {}).get("twitter_followers", 0),
"reddit_subscribers": data.get("community_data", {}).get("reddit_subscribers", 0),
"reddit_average_posts_48h": data.get("community_data", {}).get("reddit_average_posts_48h", 0),
"reddit_average_comments_48h": data.get("community_data", {}).get("reddit_average_comments_48h", 0),
"telegram_channel_user_count": data.get("community_data", {}).get("telegram_channel_user_count", 0)
}
}
def _process_coinmarketcap_coin_details(self, data: Dict[str, Any], cmc_id: str) -> Dict[str, Any]:
"""Process CoinMarketCap coin details
Args:
data: Raw API response data
cmc_id: CoinMarketCap ID
Returns:
Processed coin details
"""
coin_data = data.get("data", {}).get(str(cmc_id), {})
return {
"id": coin_data.get("symbol", "").lower(),
"symbol": coin_data.get("symbol", "").upper(),
"name": coin_data.get("name", ""),
"description": coin_data.get("description", ""),
"image": coin_data.get("logo", ""),
"current_price": 0, # Not provided in this endpoint
"market_cap": 0, # Not provided in this endpoint
"market_cap_rank": 0, # Not provided in this endpoint
"total_volume": 0, # Not provided in this endpoint
"high_24h": 0, # Not provided in this endpoint
"low_24h": 0, # Not provided in this endpoint
"price_change_24h": 0, # Not provided in this endpoint
"price_change_percentage_24h": 0, # Not provided in this endpoint
"market_cap_change_24h": 0, # Not provided in this endpoint
"market_cap_change_percentage_24h": 0, # Not provided in this endpoint
"circulating_supply": 0, # Not provided in this endpoint
"total_supply": 0, # Not provided in this endpoint
"max_supply": 0, # Not provided in this endpoint
"ath": 0, # Not provided in this endpoint
"ath_change_percentage": 0, # Not provided in this endpoint
"ath_date": "", # Not provided in this endpoint
"atl": 0, # Not provided in this endpoint
"atl_change_percentage": 0, # Not provided in this endpoint
"atl_date": "", # Not provided in this endpoint
"last_updated": coin_data.get("last_updated", ""),
"links": {
"homepage": coin_data.get("urls", {}).get("website", [""])[0],
"blockchain_site": coin_data.get("urls", {}).get("explorer", [""])[0],
"official_forum_url": coin_data.get("urls", {}).get("message_board", [""])[0],
"chat_url": coin_data.get("urls", {}).get("chat", [""])[0],
"announcement_url": coin_data.get("urls", {}).get("announcement", [""])[0],
"twitter_screen_name": coin_data.get("urls", {}).get("twitter", [""])[0],
"facebook_username": coin_data.get("urls", {}).get("facebook", [""])[0],
"github_url": coin_data.get("urls", {}).get("source_code", [""])[0],
"subreddit_url": coin_data.get("urls", {}).get("reddit", [""])[0]
},
"categories": coin_data.get("category", "").split(",") if coin_data.get("category") else [],
"genesis_date": coin_data.get("date_added", ""),
"sentiment_votes_up_percentage": 0, # Not provided
"sentiment_votes_down_percentage": 0, # Not provided
"developer_data": {
"forks": 0, # Not provided
"stars": 0, # Not provided
"subscribers": 0, # Not provided
"total_issues": 0, # Not provided
"closed_issues": 0, # Not provided
"pull_requests_merged": 0, # Not provided
"pull_request_contributors": 0, # Not provided
"commit_count_4_weeks": 0 # Not provided
},
"community_data": {
"twitter_followers": 0, # Not provided
"reddit_subscribers": 0, # Not provided
"reddit_average_posts_48h": 0, # Not provided
"reddit_average_comments_48h": 0, # Not provided
"telegram_channel_user_count": 0 # Not provided
}
}
def _process_binance_coin_details(self, data: Dict[str, Any], coin_id: str) -> Dict[str, Any]:
"""Process Binance coin details
Args:
data: Raw API response data
coin_id: Coin ID
Returns:
Processed coin details
"""
return {
"id": coin_id.lower(),
"symbol": coin_id.upper(),
"name": coin_id.upper(), # Binance doesn't provide full names
"description": "", # Not provided
"image": "", # Not provided
"current_price": float(data.get("lastPrice", 0)),
"market_cap": 0, # Not provided
"market_cap_rank": 0, # Not provided
"total_volume": float(data.get("volume", 0)),
"high_24h": float(data.get("highPrice", 0)),
"low_24h": float(data.get("lowPrice", 0)),
"price_change_24h": float(data.get("priceChange", 0)),
"price_change_percentage_24h": float(data.get("priceChangePercent", 0)),
"market_cap_change_24h": 0, # Not provided
"market_cap_change_percentage_24h": 0, # Not provided
"circulating_supply": 0, # Not provided
"total_supply": 0, # Not provided
"max_supply": 0, # Not provided
"ath": 0, # Not provided
"ath_change_percentage": 0, # Not provided
"ath_date": "", # Not provided
"atl": 0, # Not provided
"atl_change_percentage": 0, # Not provided
"atl_date": "", # Not provided
"last_updated": datetime.fromtimestamp(int(data.get("closeTime", 0)) / 1000).isoformat(),
"links": {
"homepage": "", # Not provided
"blockchain_site": "", # Not provided
"official_forum_url": "", # Not provided
"chat_url": "", # Not provided
"announcement_url": "", # Not provided
"twitter_screen_name": "", # Not provided
"facebook_username": "", # Not provided
"github_url": "", # Not provided
"subreddit_url": "" # Not provided
},
"categories": [], # Not provided
"genesis_date": "", # Not provided
"sentiment_votes_up_percentage": 0, # Not provided
"sentiment_votes_down_percentage": 0, # Not provided
"developer_data": {
"forks": 0, # Not provided
"stars": 0, # Not provided
"subscribers": 0, # Not provided
"total_issues": 0, # Not provided
"closed_issues": 0, # Not provided
"pull_requests_merged": 0, # Not provided
"pull_request_contributors": 0, # Not provided
"commit_count_4_weeks": 0 # Not provided
},
"community_data": {
"twitter_followers": 0, # Not provided
"reddit_subscribers": 0, # Not provided
"reddit_average_posts_48h": 0, # Not provided
"reddit_average_comments_48h": 0, # Not provided
"telegram_channel_user_count": 0 # Not provided
}
}
def _process_coingecko_price_history(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process CoinGecko price history data
Args:
data: Raw API response data
Returns:
Processed price history data
"""
prices = []
market_caps = []
total_volumes = []
for price_data in data.get("prices", []):
if len(price_data) >= 2:
timestamp = price_data[0]
price = price_data[1]
prices.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"price": price
})
for market_cap_data in data.get("market_caps", []):
if len(market_cap_data) >= 2:
timestamp = market_cap_data[0]
market_cap = market_cap_data[1]
market_caps.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"market_cap": market_cap
})
for volume_data in data.get("total_volumes", []):
if len(volume_data) >= 2:
timestamp = volume_data[0]
volume = volume_data[1]
total_volumes.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"volume": volume
})
return {
"prices": prices,
"market_caps": market_caps,
"total_volumes": total_volumes
}
def _process_coinmarketcap_price_history(self, data: Dict[str, Any], vs_currency: str) -> Dict[str, Any]:
"""Process CoinMarketCap price history data
Args:
data: Raw API response data
vs_currency: Currency to compare against
Returns:
Processed price history data
"""
prices = []
market_caps = []
total_volumes = []
vs_currency = vs_currency.upper()
for quote_data in data.get("data", {}).get("quotes", []):
timestamp = datetime.fromisoformat(quote_data.get("timestamp", "").replace("Z", "+00:00")).timestamp() * 1000
quote = quote_data.get("quote", {}).get(vs_currency, {})
prices.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"price": quote.get("price", 0)
})
market_caps.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"market_cap": quote.get("market_cap", 0)
})
total_volumes.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"volume": quote.get("volume_24h", 0)
})
return {
"prices": prices,
"market_caps": market_caps,
"total_volumes": total_volumes
}
def _process_binance_price_history(self, data: List[List[Any]]) -> Dict[str, Any]:
"""Process Binance price history data
Args:
data: Raw API response data
Returns:
Processed price history data
"""
prices = []
total_volumes = []
for kline in data:
if len(kline) >= 8:
timestamp = kline[0]
open_price = float(kline[1])
high_price = float(kline[2])
low_price = float(kline[3])
close_price = float(kline[4])
volume = float(kline[5])
prices.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"price": close_price,
"open": open_price,
"high": high_price,
"low": low_price,
"close": close_price
})
total_volumes.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"volume": volume
})
return {
"prices": prices,
"market_caps": [], # Not provided by Binance
"total_volumes": total_volumes
}
def _convert_interval(self, interval: str) -> str:
"""Convert interval to CoinMarketCap format
Args:
interval: Interval string
Returns:
CoinMarketCap interval format
"""
if interval == "daily":
return "1d"
elif interval == "hourly":
return "1h"
elif interval == "minutely":
return "1m"
else:
return "1d"
def _convert_binance_interval(self, interval: str) -> str:
"""Convert interval to Binance format
Args:
interval: Interval string
Returns:
Binance interval format
"""
if interval == "daily":
return "1d"
elif interval == "hourly":
return "1h"
elif interval == "minutely":
return "1m"
else:
return "1d"