from langchain.tools import BaseTool from pydantic import BaseModel, Field from typing import Dict, Any, List, Optional import json import asyncio from src.utils.logger import get_logger logger = get_logger(__name__) class ChartDataInput(BaseModel): """Input schema for chart data requests""" chart_type: str = Field(description="Chart type: price_chart, market_overview, defi_tvl, portfolio_pie, gas_tracker") symbol: Optional[str] = Field(default=None, description="Asset symbol (e.g., bitcoin, ethereum)") timeframe: Optional[str] = Field(default="30d", description="Time range: 1d, 7d, 30d, 90d, 365d") protocols: Optional[List[str]] = Field(default=None, description="DeFi protocol names") network: Optional[str] = Field(default="ethereum", description="Blockchain network") class ChartDataTool(BaseTool): """ Chart Data Provider Tool This tool provides structured data that can be used to create charts. Instead of returning HTML, it returns clean JSON data for visualization. """ name: str = "chart_data_provider" description: str = """Provides structured data for creating cryptocurrency charts. Returns JSON data in this format: {{ "chart_type": "price_chart|market_overview|defi_tvl|portfolio_pie|gas_tracker", "data": {{...}}, "config": {{...}} }} Chart types: - price_chart: Bitcoin/crypto price and volume data - market_overview: Top cryptocurrencies market data - defi_tvl: DeFi protocol TVL comparison - portfolio_pie: Portfolio allocation breakdown - gas_tracker: Gas fees across networks """ args_schema: type[ChartDataInput] = ChartDataInput def _run(self, chart_type: str, symbol: str = None, timeframe: str = "30d", protocols: List[str] = None, network: str = "ethereum") -> str: """Synchronous execution""" return asyncio.run(self._arun(chart_type, symbol, timeframe, protocols, network)) async def _arun(self, chart_type: str, symbol: str = None, timeframe: str = "30d", protocols: List[str] = None, network: str = "ethereum") -> str: """Provide chart data based on request""" try: logger.info(f"Providing {chart_type} data for {symbol or 'general'}") # Convert timeframe to days days = self._parse_timeframe(timeframe) if chart_type == "price_chart": return await self._get_price_chart_data(symbol or "bitcoin", days) elif chart_type == "market_overview": return await self._get_market_overview_data() elif chart_type == "defi_tvl": return await self._get_defi_tvl_data(protocols or ["uniswap", "aave", "compound"]) elif chart_type == "portfolio_pie": return await self._get_portfolio_data() elif chart_type == "gas_tracker": return await self._get_gas_data(network) else: return json.dumps({ "chart_type": "error", "error": f"Unknown chart type: {chart_type}", "available_types": ["price_chart", "market_overview", "defi_tvl", "portfolio_pie", "gas_tracker"] }) except Exception as e: logger.error(f"Chart data error: {e}") return json.dumps({ "chart_type": "error", "error": str(e), "message": "Failed to generate chart data" }) async def _get_price_chart_data(self, symbol: str, days: int) -> str: """Get real price chart data from CoinGecko API""" try: # Import the CoinGecko tool to get real data from src.tools.coingecko_tool import CoinGeckoTool coingecko = CoinGeckoTool() # Map common symbols to CoinGecko IDs symbol_map = { "btc": "bitcoin", "bitcoin": "bitcoin", "eth": "ethereum", "ethereum": "ethereum", "sol": "solana", "solana": "solana", "ada": "cardano", "cardano": "cardano", "bnb": "binancecoin", "binance": "binancecoin", "matic": "matic-network", "polygon": "matic-network", "avax": "avalanche-2", "avalanche": "avalanche-2", "dot": "polkadot", "polkadot": "polkadot", "link": "chainlink", "chainlink": "chainlink", "uni": "uniswap", "uniswap": "uniswap" } coin_id = symbol_map.get(symbol.lower(), symbol.lower()) # Get price history from CoinGecko url = f"https://api.coingecko.com/api/v3/coins/{coin_id}/market_chart" params = {"vs_currency": "usd", "days": days, "interval": "daily" if days > 90 else "hourly"} data = await coingecko.make_request(url, params=params) if not data or "prices" not in data: # Fallback to mock data if API fails logger.warning(f"CoinGecko API failed for {symbol}, using fallback data") return await self._get_mock_price_data(symbol, days) # Format the real data price_data = data.get("prices", []) volume_data = data.get("total_volumes", []) # Get current coin info coin_info = await coingecko.make_request(f"https://api.coingecko.com/api/v3/coins/{coin_id}") coin_name = coin_info.get("name", symbol.title()) if coin_info else symbol.title() return json.dumps({ "chart_type": "price_chart", "data": { "prices": price_data, "total_volumes": volume_data, "symbol": symbol.upper(), "name": coin_name }, "config": { "title": f"{coin_name} Price Analysis ({days} days)", "timeframe": f"{days}d", "currency": "USD" } }) except Exception as e: logger.error(f"Real price data failed: {e}") return await self._get_mock_price_data(symbol, days) async def _get_mock_price_data(self, symbol: str, days: int) -> str: """Fallback mock price data""" import time import random base_price = 35000 if symbol.lower() == "bitcoin" else 1800 if symbol.lower() == "ethereum" else 100 base_timestamp = int(time.time() * 1000) - (days * 24 * 60 * 60 * 1000) price_data = [] volume_data = [] for i in range(days): timestamp = base_timestamp + (i * 24 * 60 * 60 * 1000) price_change = random.uniform(-0.05, 0.05) price = base_price * (1 + price_change * i / days) price += random.uniform(-price*0.02, price*0.02) volume = random.uniform(1000000000, 5000000000) price_data.append([timestamp, round(price, 2)]) volume_data.append([timestamp, int(volume)]) return json.dumps({ "chart_type": "price_chart", "data": { "prices": price_data, "total_volumes": volume_data, "symbol": symbol.upper(), "name": symbol.title() }, "config": { "title": f"{symbol.title()} Price Analysis ({days} days)", "timeframe": f"{days}d", "currency": "USD" } }) async def _get_market_overview_data(self) -> str: """Get real market overview data from CoinGecko API""" try: from src.tools.coingecko_tool import CoinGeckoTool coingecko = CoinGeckoTool() # Get top market cap coins url = "https://api.coingecko.com/api/v3/coins/markets" params = { "vs_currency": "usd", "order": "market_cap_desc", "per_page": 10, "page": 1, "sparkline": False } data = await coingecko.make_request(url, params=params) if not data: logger.warning("CoinGecko market data failed, using fallback") return await self._get_mock_market_data() # Format real market data coins = [] for coin in data[:10]: coins.append({ "name": coin.get("name", "Unknown"), "symbol": coin.get("symbol", "").upper(), "current_price": coin.get("current_price", 0), "market_cap_rank": coin.get("market_cap_rank", 0), "price_change_percentage_24h": coin.get("price_change_percentage_24h", 0), "market_cap": coin.get("market_cap", 0), "total_volume": coin.get("total_volume", 0) }) return json.dumps({ "chart_type": "market_overview", "data": {"coins": coins}, "config": { "title": "Top Cryptocurrencies Market Overview", "currency": "USD" } }) except Exception as e: logger.error(f"Market overview API failed: {e}") return await self._get_mock_market_data() async def _get_mock_market_data(self) -> str: """Fallback mock market data""" return json.dumps({ "chart_type": "market_overview", "data": { "coins": [ {"name": "Bitcoin", "symbol": "BTC", "current_price": 35000, "market_cap_rank": 1, "price_change_percentage_24h": 2.5}, {"name": "Ethereum", "symbol": "ETH", "current_price": 1800, "market_cap_rank": 2, "price_change_percentage_24h": -1.2}, {"name": "Cardano", "symbol": "ADA", "current_price": 0.25, "market_cap_rank": 3, "price_change_percentage_24h": 3.1}, {"name": "Solana", "symbol": "SOL", "current_price": 22.5, "market_cap_rank": 4, "price_change_percentage_24h": -2.8}, {"name": "Polygon", "symbol": "MATIC", "current_price": 0.52, "market_cap_rank": 5, "price_change_percentage_24h": 1.9} ] }, "config": { "title": "Top Cryptocurrencies Market Overview", "currency": "USD" } }) async def _get_defi_tvl_data(self, protocols: List[str]) -> str: """Get real DeFi TVL data from DeFiLlama API""" try: from src.tools.defillama_tool import DeFiLlamaTool defillama = DeFiLlamaTool() # Get protocols data data = await defillama.make_request(f"{defillama._base_url}/protocols") if not data: logger.warning("DeFiLlama API failed, using fallback") return await self._get_mock_defi_data(protocols) # Filter for requested protocols or top protocols if protocols: filtered_protocols = [] for protocol_name in protocols: for protocol in data: if protocol_name.lower() in protocol.get("name", "").lower(): filtered_protocols.append(protocol) break protocols_data = filtered_protocols[:8] # Limit to 8 else: # Get top protocols by TVL protocols_data = sorted([p for p in data if p.get("tvl", 0) > 0], key=lambda x: x.get("tvl", 0), reverse=True)[:8] if not protocols_data: return await self._get_mock_defi_data(protocols) # Format TVL data tvl_data = [] for protocol in protocols_data: tvl_data.append({ "name": protocol.get("name", "Unknown"), "tvl": protocol.get("tvl", 0), "change_1d": protocol.get("change_1d", 0), "chain": protocol.get("chain", "Multi-chain"), "category": protocol.get("category", "DeFi") }) return json.dumps({ "chart_type": "defi_tvl", "data": {"protocols": tvl_data}, "config": { "title": "DeFi Protocols by Total Value Locked", "currency": "USD" } }) except Exception as e: logger.error(f"DeFi TVL API failed: {e}") return await self._get_mock_defi_data(protocols) async def _get_mock_defi_data(self, protocols: List[str]) -> str: """Fallback mock DeFi data""" import random protocol_names = protocols or ["Uniswap", "Aave", "Compound", "Curve", "MakerDAO"] tvl_data = [] for protocol in protocol_names[:5]: tvl = random.uniform(500000000, 5000000000) change = random.uniform(-10, 15) tvl_data.append({ "name": protocol, "tvl": tvl, "change_1d": change, "chain": "Ethereum", "category": "DeFi" }) return json.dumps({ "chart_type": "defi_tvl", "data": {"protocols": tvl_data}, "config": { "title": "DeFi Protocols by Total Value Locked", "currency": "USD" } }) async def _get_portfolio_data(self) -> str: """Get portfolio allocation data""" return json.dumps({ "chart_type": "portfolio_pie", "data": { "allocations": [ {"name": "Bitcoin", "symbol": "BTC", "value": 40, "color": "#f7931a"}, {"name": "Ethereum", "symbol": "ETH", "value": 30, "color": "#627eea"}, {"name": "Cardano", "symbol": "ADA", "value": 15, "color": "#0033ad"}, {"name": "Solana", "symbol": "SOL", "value": 10, "color": "#9945ff"}, {"name": "Other", "symbol": "OTHER", "value": 5, "color": "#666666"} ] }, "config": { "title": "Sample Portfolio Allocation", "currency": "Percentage" } }) async def _get_gas_data(self, network: str) -> str: """Get gas fee data""" import random import time # Generate 24 hours of gas data gas_data = [] base_timestamp = int(time.time() * 1000) - (24 * 60 * 60 * 1000) for i in range(24): timestamp = base_timestamp + (i * 60 * 60 * 1000) gas_price = random.uniform(20, 100) if network == "ethereum" else random.uniform(1, 10) gas_data.append([timestamp, round(gas_price, 2)]) return json.dumps({ "chart_type": "gas_tracker", "data": { "gas_prices": gas_data, "network": network.title() }, "config": { "title": f"{network.title()} Gas Fee Tracker (24h)", "unit": "Gwei" } }) def _parse_timeframe(self, timeframe: str) -> int: """Convert timeframe string to days""" timeframe_map = { "1d": 1, "7d": 7, "30d": 30, "90d": 90, "365d": 365, "1y": 365 } return timeframe_map.get(timeframe, 30)