import requests import json from typing import Dict, List, Any, Optional from datetime import datetime, timedelta import time from utils.logging import setup_logger from utils.error_handling import handle_exceptions, IntegrationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class CryptoIntegration: """Cryptocurrency API integration for market data""" def __init__(self, api_key: Optional[str] = None, provider: str = "coingecko"): """Initialize Cryptocurrency API integration Args: api_key: API key for the crypto provider (optional for some providers) provider: Crypto data provider (default: coingecko) """ self.api_key = api_key self.provider = provider.lower() self.cache = {} self.cache_expiry = {} # Set up provider-specific configurations self.providers = { "coingecko": { "base_url": "https://api.coingecko.com/api/v3", "pro_url": "https://pro-api.coingecko.com/api/v3", "cache_duration": 300 # 5 minutes }, "coinmarketcap": { "base_url": "https://pro-api.coinmarketcap.com/v1", "cache_duration": 300 # 5 minutes }, "binance": { "base_url": "https://api.binance.com/api/v3", "cache_duration": 60 # 1 minute } } @handle_exceptions def set_api_key(self, api_key: str) -> None: """Set API key for the crypto provider Args: api_key: API key """ self.api_key = api_key # Clear cache when API key changes self.cache = {} self.cache_expiry = {} @handle_exceptions def set_provider(self, provider: str) -> None: """Set crypto data provider Args: provider: Crypto data provider """ provider = provider.lower() if provider not in self.providers: raise IntegrationError(f"Unsupported crypto provider: {provider}") self.provider = provider # Clear cache when provider changes self.cache = {} self.cache_expiry = {} @handle_exceptions def test_connection(self) -> bool: """Test cryptocurrency API connection Returns: True if connection is successful, False otherwise """ try: if self.provider == "coingecko": # Test with a simple ping request url = f"{self.providers[self.provider]['base_url']}/ping" response = requests.get(url) return response.status_code == 200 elif self.provider == "coinmarketcap": # API key is required for CoinMarketCap if not self.api_key: logger.error("CoinMarketCap API key not set") return False # Test with a simple status request url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map" headers = { "X-CMC_PRO_API_KEY": self.api_key } params = { "limit": 1 } response = requests.get(url, headers=headers, params=params) return response.status_code == 200 elif self.provider == "binance": # Test with a simple ping request url = f"{self.providers[self.provider]['base_url']}/ping" response = requests.get(url) return response.status_code == 200 return False except Exception as e: logger.error(f"Crypto API connection test failed: {str(e)}") return False @handle_exceptions def get_market_data(self, coins: Optional[List[str]] = None, vs_currency: str = "usd", page: int = 1, per_page: int = 100) -> List[Dict[str, Any]]: """Get cryptocurrency market data Args: coins: List of coin IDs (optional) vs_currency: Currency to compare against (default: usd) page: Page number (default: 1) per_page: Number of results per page (default: 100) Returns: List of cryptocurrency market data """ # Check cache cache_key = f"market_{','.join(coins) if coins else 'all'}_{vs_currency}_{page}_{per_page}_{self.provider}" if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): return self.cache[cache_key] try: if self.provider == "coingecko": url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/markets" params = { "vs_currency": vs_currency, "page": page, "per_page": per_page, "order": "market_cap_desc" } # Add API key if available if self.api_key: params["x_cg_pro_api_key"] = self.api_key # Add optional parameters if coins: params["ids"] = ','.join(coins) response = requests.get(url, params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get market data: {response.text}") data = response.json() # Process data into a standardized format market_data = self._process_coingecko_market_data(data) elif self.provider == "coinmarketcap": if not self.api_key: raise IntegrationError("CoinMarketCap API key not set") url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/listings/latest" headers = { "X-CMC_PRO_API_KEY": self.api_key } params = { "start": (page - 1) * per_page + 1, "limit": per_page, "convert": vs_currency.upper() } response = requests.get(url, headers=headers, params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get market data: {response.text}") data = response.json() # Process data into a standardized format market_data = self._process_coinmarketcap_market_data(data, vs_currency) elif self.provider == "binance": # Binance doesn't have a direct market data endpoint like CoinGecko # We'll use the ticker price endpoint and then fetch additional data url = f"{self.providers[self.provider]['base_url']}/ticker/24hr" response = requests.get(url) if response.status_code != 200: raise IntegrationError(f"Failed to get market data: {response.text}") data = response.json() # Process data into a standardized format market_data = self._process_binance_market_data(data, vs_currency) # Filter by coins if specified if coins: market_data = [item for item in market_data if item["id"] in coins] # Apply pagination start_idx = (page - 1) * per_page end_idx = start_idx + per_page market_data = market_data[start_idx:end_idx] else: raise IntegrationError(f"Unsupported crypto provider: {self.provider}") # Cache the result self.cache[cache_key] = market_data self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] return market_data except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to get market data: {str(e)}") raise IntegrationError(f"Failed to get market data: {str(e)}") raise @handle_exceptions def get_coin_details(self, coin_id: str) -> Dict[str, Any]: """Get detailed information about a specific cryptocurrency Args: coin_id: Coin ID Returns: Detailed coin information """ # Check cache cache_key = f"coin_{coin_id}_{self.provider}" if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): return self.cache[cache_key] try: if self.provider == "coingecko": url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/{coin_id}" params = { "localization": "false", "tickers": "false", "market_data": "true", "community_data": "true", "developer_data": "true" } # Add API key if available if self.api_key: params["x_cg_pro_api_key"] = self.api_key response = requests.get(url, params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get coin details: {response.text}") data = response.json() # Process data into a standardized format coin_details = self._process_coingecko_coin_details(data) elif self.provider == "coinmarketcap": if not self.api_key: raise IntegrationError("CoinMarketCap API key not set") # First, get the CMC ID for the coin url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map" headers = { "X-CMC_PRO_API_KEY": self.api_key } params = { "symbol": coin_id.upper() } response = requests.get(url, headers=headers, params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get coin ID: {response.text}") data = response.json() if not data.get("data") or len(data["data"]) == 0: raise IntegrationError(f"Coin not found: {coin_id}") cmc_id = data["data"][0]["id"] # Now get the coin details url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/info" params = { "id": cmc_id } response = requests.get(url, headers=headers, params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get coin details: {response.text}") data = response.json() # Process data into a standardized format coin_details = self._process_coinmarketcap_coin_details(data, cmc_id) elif self.provider == "binance": # Binance doesn't have a detailed coin info endpoint # We'll use the ticker price and 24hr stats endpoints url = f"{self.providers[self.provider]['base_url']}/ticker/24hr" params = { "symbol": f"{coin_id.upper()}USDT" # Assuming USDT pair } response = requests.get(url, params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get coin details: {response.text}") data = response.json() # Process data into a standardized format coin_details = self._process_binance_coin_details(data, coin_id) else: raise IntegrationError(f"Unsupported crypto provider: {self.provider}") # Cache the result self.cache[cache_key] = coin_details self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] return coin_details except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to get coin details: {str(e)}") raise IntegrationError(f"Failed to get coin details: {str(e)}") raise @handle_exceptions def get_price_history(self, coin_id: str, vs_currency: str = "usd", days: str = "30", interval: str = "daily") -> Dict[str, Any]: """Get price history for a specific cryptocurrency Args: coin_id: Coin ID vs_currency: Currency to compare against (default: usd) days: Number of days of data to retrieve (default: 30) interval: Data interval (default: daily) Returns: Price history data """ # Check cache cache_key = f"history_{coin_id}_{vs_currency}_{days}_{interval}_{self.provider}" if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): return self.cache[cache_key] try: if self.provider == "coingecko": url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/{coin_id}/market_chart" params = { "vs_currency": vs_currency, "days": days, "interval": interval if days != "max" else "daily" } # Add API key if available if self.api_key: params["x_cg_pro_api_key"] = self.api_key response = requests.get(url, params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get price history: {response.text}") data = response.json() # Process data into a standardized format price_history = self._process_coingecko_price_history(data) elif self.provider == "coinmarketcap": if not self.api_key: raise IntegrationError("CoinMarketCap API key not set") # First, get the CMC ID for the coin url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map" headers = { "X-CMC_PRO_API_KEY": self.api_key } params = { "symbol": coin_id.upper() } response = requests.get(url, headers=headers, params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get coin ID: {response.text}") data = response.json() if not data.get("data") or len(data["data"]) == 0: raise IntegrationError(f"Coin not found: {coin_id}") cmc_id = data["data"][0]["id"] # Now get the historical data url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/quotes/historical" # Convert days to time range time_end = datetime.now() time_start = time_end - timedelta(days=int(days)) params = { "id": cmc_id, "time_start": time_start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), "time_end": time_end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), "interval": self._convert_interval(interval), "convert": vs_currency.upper() } response = requests.get(url, headers=headers, params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get price history: {response.text}") data = response.json() # Process data into a standardized format price_history = self._process_coinmarketcap_price_history(data, vs_currency) elif self.provider == "binance": # Convert interval to Binance format binance_interval = self._convert_binance_interval(interval) url = f"{self.providers[self.provider]['base_url']}/klines" params = { "symbol": f"{coin_id.upper()}USDT", # Assuming USDT pair "interval": binance_interval, "limit": 1000 # Maximum allowed } response = requests.get(url, params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get price history: {response.text}") data = response.json() # Process data into a standardized format price_history = self._process_binance_price_history(data) else: raise IntegrationError(f"Unsupported crypto provider: {self.provider}") # Cache the result self.cache[cache_key] = price_history self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] return price_history except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to get price history: {str(e)}") raise IntegrationError(f"Failed to get price history: {str(e)}") raise @handle_exceptions def search_coins(self, query: str) -> List[Dict[str, Any]]: """Search for cryptocurrencies Args: query: Search query Returns: List of matching cryptocurrencies """ # Check cache cache_key = f"search_{query}_{self.provider}" if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): return self.cache[cache_key] try: if self.provider == "coingecko": url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/search" params = { "query": query } # Add API key if available if self.api_key: params["x_cg_pro_api_key"] = self.api_key response = requests.get(url, params=params) if response.status_code != 200: raise IntegrationError(f"Failed to search coins: {response.text}") data = response.json() # Process data search_results = [] for coin in data.get("coins", []): search_results.append({ "id": coin.get("id", ""), "symbol": coin.get("symbol", "").upper(), "name": coin.get("name", ""), "market_cap_rank": coin.get("market_cap_rank", None) }) elif self.provider == "coinmarketcap": if not self.api_key: raise IntegrationError("CoinMarketCap API key not set") url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map" headers = { "X-CMC_PRO_API_KEY": self.api_key } response = requests.get(url, headers=headers) if response.status_code != 200: raise IntegrationError(f"Failed to search coins: {response.text}") data = response.json() # Process data and filter by query search_results = [] for coin in data.get("data", []): if query.lower() in coin.get("name", "").lower() or query.lower() in coin.get("symbol", "").lower(): search_results.append({ "id": coin.get("symbol", "").lower(), "symbol": coin.get("symbol", "").upper(), "name": coin.get("name", ""), "market_cap_rank": coin.get("rank", None) }) elif self.provider == "binance": # Binance doesn't have a search endpoint # We'll use the exchange info endpoint and filter the results url = f"{self.providers[self.provider]['base_url']}/exchangeInfo" response = requests.get(url) if response.status_code != 200: raise IntegrationError(f"Failed to search coins: {response.text}") data = response.json() # Process data and filter by query search_results = [] for symbol in data.get("symbols", []): base_asset = symbol.get("baseAsset", "") if query.lower() in base_asset.lower() and symbol.get("quoteAsset") == "USDT": search_results.append({ "id": base_asset.lower(), "symbol": base_asset.upper(), "name": base_asset.upper(), # Binance doesn't provide full names "market_cap_rank": None }) # Remove duplicates unique_results = [] seen_ids = set() for result in search_results: if result["id"] not in seen_ids: unique_results.append(result) seen_ids.add(result["id"]) search_results = unique_results else: raise IntegrationError(f"Unsupported crypto provider: {self.provider}") # Cache the result self.cache[cache_key] = search_results self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] return search_results except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to search coins: {str(e)}") raise IntegrationError(f"Failed to search coins: {str(e)}") raise def _process_coingecko_market_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Process CoinGecko market data Args: data: Raw API response data Returns: Processed market data """ market_data = [] for coin in data: processed_coin = { "id": coin.get("id", ""), "symbol": coin.get("symbol", "").upper(), "name": coin.get("name", ""), "image": coin.get("image", ""), "current_price": coin.get("current_price", 0), "market_cap": coin.get("market_cap", 0), "market_cap_rank": coin.get("market_cap_rank", 0), "total_volume": coin.get("total_volume", 0), "high_24h": coin.get("high_24h", 0), "low_24h": coin.get("low_24h", 0), "price_change_24h": coin.get("price_change_24h", 0), "price_change_percentage_24h": coin.get("price_change_percentage_24h", 0), "market_cap_change_24h": coin.get("market_cap_change_24h", 0), "market_cap_change_percentage_24h": coin.get("market_cap_change_percentage_24h", 0), "circulating_supply": coin.get("circulating_supply", 0), "total_supply": coin.get("total_supply", 0), "max_supply": coin.get("max_supply", 0), "last_updated": coin.get("last_updated", "") } market_data.append(processed_coin) return market_data def _process_coinmarketcap_market_data(self, data: Dict[str, Any], vs_currency: str) -> List[Dict[str, Any]]: """Process CoinMarketCap market data Args: data: Raw API response data vs_currency: Currency to compare against Returns: Processed market data """ market_data = [] vs_currency = vs_currency.upper() for coin in data.get("data", []): quote = coin.get("quote", {}).get(vs_currency, {}) processed_coin = { "id": coin.get("symbol", "").lower(), "symbol": coin.get("symbol", "").upper(), "name": coin.get("name", ""), "image": "", # CoinMarketCap doesn't provide images in this endpoint "current_price": quote.get("price", 0), "market_cap": quote.get("market_cap", 0), "market_cap_rank": coin.get("cmc_rank", 0), "total_volume": quote.get("volume_24h", 0), "high_24h": 0, # Not provided "low_24h": 0, # Not provided "price_change_24h": quote.get("volume_change_24h", 0), "price_change_percentage_24h": quote.get("percent_change_24h", 0), "market_cap_change_24h": 0, # Not provided "market_cap_change_percentage_24h": quote.get("market_cap_change_percentage_24h", 0), "circulating_supply": coin.get("circulating_supply", 0), "total_supply": coin.get("total_supply", 0), "max_supply": coin.get("max_supply", 0), "last_updated": coin.get("last_updated", "") } market_data.append(processed_coin) return market_data def _process_binance_market_data(self, data: List[Dict[str, Any]], vs_currency: str) -> List[Dict[str, Any]]: """Process Binance market data Args: data: Raw API response data vs_currency: Currency to compare against Returns: Processed market data """ market_data = [] vs_currency = vs_currency.upper() for ticker in data: symbol = ticker.get("symbol", "") # Only include pairs with the specified currency if not symbol.endswith(vs_currency): continue base_asset = symbol[:-len(vs_currency)].lower() processed_coin = { "id": base_asset, "symbol": base_asset.upper(), "name": base_asset.upper(), # Binance doesn't provide full names "image": "", # Not provided "current_price": float(ticker.get("lastPrice", 0)), "market_cap": 0, # Not provided "market_cap_rank": 0, # Not provided "total_volume": float(ticker.get("volume", 0)), "high_24h": float(ticker.get("highPrice", 0)), "low_24h": float(ticker.get("lowPrice", 0)), "price_change_24h": float(ticker.get("priceChange", 0)), "price_change_percentage_24h": float(ticker.get("priceChangePercent", 0)), "market_cap_change_24h": 0, # Not provided "market_cap_change_percentage_24h": 0, # Not provided "circulating_supply": 0, # Not provided "total_supply": 0, # Not provided "max_supply": 0, # Not provided "last_updated": datetime.fromtimestamp(int(ticker.get("closeTime", 0)) / 1000).isoformat() } market_data.append(processed_coin) return market_data def _process_coingecko_coin_details(self, data: Dict[str, Any]) -> Dict[str, Any]: """Process CoinGecko coin details Args: data: Raw API response data Returns: Processed coin details """ market_data = data.get("market_data", {}) return { "id": data.get("id", ""), "symbol": data.get("symbol", "").upper(), "name": data.get("name", ""), "description": data.get("description", {}).get("en", ""), "image": data.get("image", {}).get("large", ""), "current_price": market_data.get("current_price", {}).get("usd", 0), "market_cap": market_data.get("market_cap", {}).get("usd", 0), "market_cap_rank": market_data.get("market_cap_rank", 0), "total_volume": market_data.get("total_volume", {}).get("usd", 0), "high_24h": market_data.get("high_24h", {}).get("usd", 0), "low_24h": market_data.get("low_24h", {}).get("usd", 0), "price_change_24h": market_data.get("price_change_24h", 0), "price_change_percentage_24h": market_data.get("price_change_percentage_24h", 0), "market_cap_change_24h": market_data.get("market_cap_change_24h", 0), "market_cap_change_percentage_24h": market_data.get("market_cap_change_percentage_24h", 0), "circulating_supply": market_data.get("circulating_supply", 0), "total_supply": market_data.get("total_supply", 0), "max_supply": market_data.get("max_supply", 0), "ath": market_data.get("ath", {}).get("usd", 0), "ath_change_percentage": market_data.get("ath_change_percentage", {}).get("usd", 0), "ath_date": market_data.get("ath_date", {}).get("usd", ""), "atl": market_data.get("atl", {}).get("usd", 0), "atl_change_percentage": market_data.get("atl_change_percentage", {}).get("usd", 0), "atl_date": market_data.get("atl_date", {}).get("usd", ""), "last_updated": data.get("last_updated", ""), "links": { "homepage": data.get("links", {}).get("homepage", [""])[0], "blockchain_site": data.get("links", {}).get("blockchain_site", [""])[0], "official_forum_url": data.get("links", {}).get("official_forum_url", [""])[0], "chat_url": data.get("links", {}).get("chat_url", [""])[0], "announcement_url": data.get("links", {}).get("announcement_url", [""])[0], "twitter_screen_name": data.get("links", {}).get("twitter_screen_name", ""), "facebook_username": data.get("links", {}).get("facebook_username", ""), "github_url": data.get("links", {}).get("repos_url", {}).get("github", [""])[0], "subreddit_url": data.get("links", {}).get("subreddit_url", "") }, "categories": data.get("categories", []), "genesis_date": data.get("genesis_date", ""), "sentiment_votes_up_percentage": data.get("sentiment_votes_up_percentage", 0), "sentiment_votes_down_percentage": data.get("sentiment_votes_down_percentage", 0), "developer_data": { "forks": data.get("developer_data", {}).get("forks", 0), "stars": data.get("developer_data", {}).get("stars", 0), "subscribers": data.get("developer_data", {}).get("subscribers", 0), "total_issues": data.get("developer_data", {}).get("total_issues", 0), "closed_issues": data.get("developer_data", {}).get("closed_issues", 0), "pull_requests_merged": data.get("developer_data", {}).get("pull_requests_merged", 0), "pull_request_contributors": data.get("developer_data", {}).get("pull_request_contributors", 0), "commit_count_4_weeks": data.get("developer_data", {}).get("commit_count_4_weeks", 0) }, "community_data": { "twitter_followers": data.get("community_data", {}).get("twitter_followers", 0), "reddit_subscribers": data.get("community_data", {}).get("reddit_subscribers", 0), "reddit_average_posts_48h": data.get("community_data", {}).get("reddit_average_posts_48h", 0), "reddit_average_comments_48h": data.get("community_data", {}).get("reddit_average_comments_48h", 0), "telegram_channel_user_count": data.get("community_data", {}).get("telegram_channel_user_count", 0) } } def _process_coinmarketcap_coin_details(self, data: Dict[str, Any], cmc_id: str) -> Dict[str, Any]: """Process CoinMarketCap coin details Args: data: Raw API response data cmc_id: CoinMarketCap ID Returns: Processed coin details """ coin_data = data.get("data", {}).get(str(cmc_id), {}) return { "id": coin_data.get("symbol", "").lower(), "symbol": coin_data.get("symbol", "").upper(), "name": coin_data.get("name", ""), "description": coin_data.get("description", ""), "image": coin_data.get("logo", ""), "current_price": 0, # Not provided in this endpoint "market_cap": 0, # Not provided in this endpoint "market_cap_rank": 0, # Not provided in this endpoint "total_volume": 0, # Not provided in this endpoint "high_24h": 0, # Not provided in this endpoint "low_24h": 0, # Not provided in this endpoint "price_change_24h": 0, # Not provided in this endpoint "price_change_percentage_24h": 0, # Not provided in this endpoint "market_cap_change_24h": 0, # Not provided in this endpoint "market_cap_change_percentage_24h": 0, # Not provided in this endpoint "circulating_supply": 0, # Not provided in this endpoint "total_supply": 0, # Not provided in this endpoint "max_supply": 0, # Not provided in this endpoint "ath": 0, # Not provided in this endpoint "ath_change_percentage": 0, # Not provided in this endpoint "ath_date": "", # Not provided in this endpoint "atl": 0, # Not provided in this endpoint "atl_change_percentage": 0, # Not provided in this endpoint "atl_date": "", # Not provided in this endpoint "last_updated": coin_data.get("last_updated", ""), "links": { "homepage": coin_data.get("urls", {}).get("website", [""])[0], "blockchain_site": coin_data.get("urls", {}).get("explorer", [""])[0], "official_forum_url": coin_data.get("urls", {}).get("message_board", [""])[0], "chat_url": coin_data.get("urls", {}).get("chat", [""])[0], "announcement_url": coin_data.get("urls", {}).get("announcement", [""])[0], "twitter_screen_name": coin_data.get("urls", {}).get("twitter", [""])[0], "facebook_username": coin_data.get("urls", {}).get("facebook", [""])[0], "github_url": coin_data.get("urls", {}).get("source_code", [""])[0], "subreddit_url": coin_data.get("urls", {}).get("reddit", [""])[0] }, "categories": coin_data.get("category", "").split(",") if coin_data.get("category") else [], "genesis_date": coin_data.get("date_added", ""), "sentiment_votes_up_percentage": 0, # Not provided "sentiment_votes_down_percentage": 0, # Not provided "developer_data": { "forks": 0, # Not provided "stars": 0, # Not provided "subscribers": 0, # Not provided "total_issues": 0, # Not provided "closed_issues": 0, # Not provided "pull_requests_merged": 0, # Not provided "pull_request_contributors": 0, # Not provided "commit_count_4_weeks": 0 # Not provided }, "community_data": { "twitter_followers": 0, # Not provided "reddit_subscribers": 0, # Not provided "reddit_average_posts_48h": 0, # Not provided "reddit_average_comments_48h": 0, # Not provided "telegram_channel_user_count": 0 # Not provided } } def _process_binance_coin_details(self, data: Dict[str, Any], coin_id: str) -> Dict[str, Any]: """Process Binance coin details Args: data: Raw API response data coin_id: Coin ID Returns: Processed coin details """ return { "id": coin_id.lower(), "symbol": coin_id.upper(), "name": coin_id.upper(), # Binance doesn't provide full names "description": "", # Not provided "image": "", # Not provided "current_price": float(data.get("lastPrice", 0)), "market_cap": 0, # Not provided "market_cap_rank": 0, # Not provided "total_volume": float(data.get("volume", 0)), "high_24h": float(data.get("highPrice", 0)), "low_24h": float(data.get("lowPrice", 0)), "price_change_24h": float(data.get("priceChange", 0)), "price_change_percentage_24h": float(data.get("priceChangePercent", 0)), "market_cap_change_24h": 0, # Not provided "market_cap_change_percentage_24h": 0, # Not provided "circulating_supply": 0, # Not provided "total_supply": 0, # Not provided "max_supply": 0, # Not provided "ath": 0, # Not provided "ath_change_percentage": 0, # Not provided "ath_date": "", # Not provided "atl": 0, # Not provided "atl_change_percentage": 0, # Not provided "atl_date": "", # Not provided "last_updated": datetime.fromtimestamp(int(data.get("closeTime", 0)) / 1000).isoformat(), "links": { "homepage": "", # Not provided "blockchain_site": "", # Not provided "official_forum_url": "", # Not provided "chat_url": "", # Not provided "announcement_url": "", # Not provided "twitter_screen_name": "", # Not provided "facebook_username": "", # Not provided "github_url": "", # Not provided "subreddit_url": "" # Not provided }, "categories": [], # Not provided "genesis_date": "", # Not provided "sentiment_votes_up_percentage": 0, # Not provided "sentiment_votes_down_percentage": 0, # Not provided "developer_data": { "forks": 0, # Not provided "stars": 0, # Not provided "subscribers": 0, # Not provided "total_issues": 0, # Not provided "closed_issues": 0, # Not provided "pull_requests_merged": 0, # Not provided "pull_request_contributors": 0, # Not provided "commit_count_4_weeks": 0 # Not provided }, "community_data": { "twitter_followers": 0, # Not provided "reddit_subscribers": 0, # Not provided "reddit_average_posts_48h": 0, # Not provided "reddit_average_comments_48h": 0, # Not provided "telegram_channel_user_count": 0 # Not provided } } def _process_coingecko_price_history(self, data: Dict[str, Any]) -> Dict[str, Any]: """Process CoinGecko price history data Args: data: Raw API response data Returns: Processed price history data """ prices = [] market_caps = [] total_volumes = [] for price_data in data.get("prices", []): if len(price_data) >= 2: timestamp = price_data[0] price = price_data[1] prices.append({ "timestamp": timestamp, "datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), "price": price }) for market_cap_data in data.get("market_caps", []): if len(market_cap_data) >= 2: timestamp = market_cap_data[0] market_cap = market_cap_data[1] market_caps.append({ "timestamp": timestamp, "datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), "market_cap": market_cap }) for volume_data in data.get("total_volumes", []): if len(volume_data) >= 2: timestamp = volume_data[0] volume = volume_data[1] total_volumes.append({ "timestamp": timestamp, "datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), "volume": volume }) return { "prices": prices, "market_caps": market_caps, "total_volumes": total_volumes } def _process_coinmarketcap_price_history(self, data: Dict[str, Any], vs_currency: str) -> Dict[str, Any]: """Process CoinMarketCap price history data Args: data: Raw API response data vs_currency: Currency to compare against Returns: Processed price history data """ prices = [] market_caps = [] total_volumes = [] vs_currency = vs_currency.upper() for quote_data in data.get("data", {}).get("quotes", []): timestamp = datetime.fromisoformat(quote_data.get("timestamp", "").replace("Z", "+00:00")).timestamp() * 1000 quote = quote_data.get("quote", {}).get(vs_currency, {}) prices.append({ "timestamp": timestamp, "datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), "price": quote.get("price", 0) }) market_caps.append({ "timestamp": timestamp, "datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), "market_cap": quote.get("market_cap", 0) }) total_volumes.append({ "timestamp": timestamp, "datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), "volume": quote.get("volume_24h", 0) }) return { "prices": prices, "market_caps": market_caps, "total_volumes": total_volumes } def _process_binance_price_history(self, data: List[List[Any]]) -> Dict[str, Any]: """Process Binance price history data Args: data: Raw API response data Returns: Processed price history data """ prices = [] total_volumes = [] for kline in data: if len(kline) >= 8: timestamp = kline[0] open_price = float(kline[1]) high_price = float(kline[2]) low_price = float(kline[3]) close_price = float(kline[4]) volume = float(kline[5]) prices.append({ "timestamp": timestamp, "datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), "price": close_price, "open": open_price, "high": high_price, "low": low_price, "close": close_price }) total_volumes.append({ "timestamp": timestamp, "datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), "volume": volume }) return { "prices": prices, "market_caps": [], # Not provided by Binance "total_volumes": total_volumes } def _convert_interval(self, interval: str) -> str: """Convert interval to CoinMarketCap format Args: interval: Interval string Returns: CoinMarketCap interval format """ if interval == "daily": return "1d" elif interval == "hourly": return "1h" elif interval == "minutely": return "1m" else: return "1d" def _convert_binance_interval(self, interval: str) -> str: """Convert interval to Binance format Args: interval: Interval string Returns: Binance interval format """ if interval == "daily": return "1d" elif interval == "hourly": return "1h" elif interval == "minutely": return "1m" else: return "1d"