import google.generativeai as genai import json import asyncio from typing import Dict, Any, List, Optional from src.api_clients import coingecko_client, cryptocompare_client from src.defillama_client import defillama_client from src.news_aggregator import news_aggregator from src.cache_manager import cache_manager from src.config import config class EnhancedResearchAgent: def __init__(self): if config.GEMINI_API_KEY: genai.configure(api_key=config.GEMINI_API_KEY) self.model = genai.GenerativeModel('gemini-1.5-flash') else: self.model = None self.symbol_map = { "btc": "bitcoin", "eth": "ethereum", "sol": "solana", "ada": "cardano", "dot": "polkadot", "bnb": "binancecoin", "usdc": "usd-coin", "usdt": "tether", "xrp": "ripple", "avax": "avalanche-2", "link": "chainlink", "matic": "matic-network", "uni": "uniswap", "atom": "cosmos", "near": "near", "icp": "internet-computer", "ftm": "fantom", "algo": "algorand", "xlm": "stellar" } def _format_coin_id(self, symbol: str) -> str: return self.symbol_map.get(symbol.lower(), symbol.lower()) async def get_comprehensive_market_data(self) -> Dict[str, Any]: cache_key = "comprehensive_market" cached = cache_manager.get(cache_key) if cached: return cached try: tasks = [ coingecko_client.get_market_data(per_page=50), coingecko_client.get_global_data(), coingecko_client.get_trending(), defillama_client.get_protocols(), defillama_client.get_tvl_data(), news_aggregator.get_crypto_news(5) ] results = await asyncio.gather(*tasks, return_exceptions=True) data = {} for i, result in enumerate(results): if not isinstance(result, Exception): if i == 0: data["market_data"] = result elif i == 1: data["global_data"] = result elif i == 2: data["trending"] = result elif i == 3: data["defi_protocols"] = result[:20] if isinstance(result, list) and result else [] elif i == 4: data["tvl_data"] = result elif i == 5: data["news"] = result cache_manager.set(cache_key, data, 180) return data except Exception as e: raise Exception(f"Failed to fetch comprehensive market data: {str(e)}") async def get_defi_analysis(self, protocol: Optional[str] = None) -> Dict[str, Any]: cache_key = f"defi_analysis_{protocol or 'overview'}" cached = cache_manager.get(cache_key) if cached: return cached try: if protocol: data = await defillama_client.get_protocol_data(protocol) else: protocols = await defillama_client.get_protocols() tvl_data = await defillama_client.get_tvl_data() yields_data = await defillama_client.get_yields() data = { "top_protocols": protocols[:20] if isinstance(protocols, list) and protocols else [], "tvl_overview": tvl_data, "top_yields": yields_data[:10] if isinstance(yields_data, list) and yields_data else [] } cache_manager.set(cache_key, data, 300) return data except Exception as e: raise Exception(f"Failed to get DeFi analysis: {str(e)}") async def get_price_history(self, symbol: str, days: int = 30) -> Dict[str, Any]: cache_key = f"price_history_{symbol}_{days}" cached = cache_manager.get(cache_key) if cached: return cached try: coin_id = self._format_coin_id(symbol) data = await coingecko_client.get_price_history(coin_id, days) cache_manager.set(cache_key, data, 900) return data except Exception as e: raise Exception(f"Failed to get price history for {symbol}: {str(e)}") async def get_advanced_coin_analysis(self, symbol: str) -> Dict[str, Any]: cache_key = f"advanced_analysis_{symbol.lower()}" cached = cache_manager.get(cache_key) if cached: return cached try: coin_id = self._format_coin_id(symbol) tasks = [ coingecko_client.get_coin_data(coin_id), coingecko_client.get_price_history(coin_id, days=30), cryptocompare_client.get_social_data(symbol.upper()), self._get_defi_involvement(symbol.upper()) ] results = await asyncio.gather(*tasks, return_exceptions=True) analysis = {} for i, result in enumerate(results): if not isinstance(result, Exception): if i == 0: analysis["coin_data"] = result elif i == 1: analysis["price_history"] = result elif i == 2: analysis["social_data"] = result elif i == 3: analysis["defi_data"] = result cache_manager.set(cache_key, analysis, 300) return analysis except Exception as e: raise Exception(f"Failed advanced analysis for {symbol}: {str(e)}") async def _get_defi_involvement(self, symbol: str) -> Dict[str, Any]: try: protocols = await defillama_client.get_protocols() if protocols: relevant_protocols = [p for p in protocols if symbol.lower() in p.get("name", "").lower()] return {"protocols": relevant_protocols[:5]} return {"protocols": []} except: return {"protocols": []} def _format_comprehensive_data(self, data: Dict[str, Any]) -> str: formatted = "📊 COMPREHENSIVE CRYPTO MARKET ANALYSIS\n\n" if "global_data" in data and data["global_data"].get("data"): global_info = data["global_data"]["data"] total_mcap = global_info.get("total_market_cap", {}).get("usd", 0) total_volume = global_info.get("total_volume", {}).get("usd", 0) btc_dominance = global_info.get("market_cap_percentage", {}).get("btc", 0) eth_dominance = global_info.get("market_cap_percentage", {}).get("eth", 0) formatted += f"💰 Total Market Cap: ${total_mcap/1e12:.2f}T\n" formatted += f"📈 24h Volume: ${total_volume/1e9:.1f}B\n" formatted += f"₿ Bitcoin Dominance: {btc_dominance:.1f}%\n" formatted += f"Ξ Ethereum Dominance: {eth_dominance:.1f}%\n\n" if "trending" in data and data["trending"].get("coins"): formatted += "🔥 TRENDING CRYPTOCURRENCIES\n" for i, coin in enumerate(data["trending"]["coins"][:5], 1): name = coin.get("item", {}).get("name", "Unknown") symbol = coin.get("item", {}).get("symbol", "") score = coin.get("item", {}).get("score", 0) formatted += f"{i}. {name} ({symbol.upper()}) - Score: {score}\n" formatted += "\n" if "defi_protocols" in data and data["defi_protocols"]: formatted += "🏦 TOP DeFi PROTOCOLS\n" for i, protocol in enumerate(data["defi_protocols"][:5], 1): name = protocol.get("name", "Unknown") tvl = protocol.get("tvl", 0) chain = protocol.get("chain", "Unknown") formatted += f"{i}. {name} ({chain}): ${tvl/1e9:.2f}B TVL\n" formatted += "\n" if "news" in data and data["news"]: formatted += "📰 LATEST CRYPTO NEWS\n" for i, article in enumerate(data["news"][:3], 1): title = article.get("title", "No title")[:60] + "..." source = article.get("source", "Unknown") formatted += f"{i}. {title} - {source}\n" formatted += "\n" if "market_data" in data and data["market_data"]: formatted += "💎 TOP PERFORMING COINS (24h)\n" valid_coins = [coin for coin in data["market_data"][:20] if coin.get("price_change_percentage_24h") is not None] sorted_coins = sorted(valid_coins, key=lambda x: x.get("price_change_percentage_24h", 0), reverse=True) for i, coin in enumerate(sorted_coins[:5], 1): name = coin.get("name", "Unknown") symbol = coin.get("symbol", "").upper() price = coin.get("current_price", 0) change = coin.get("price_change_percentage_24h", 0) formatted += f"{i}. {name} ({symbol}): ${price:,.4f} (+{change:.2f}%)\n" return formatted async def research_with_context(self, query: str) -> str: try: if not config.GEMINI_API_KEY or not self.model: return "❌ Gemini API key not configured. Please set GEMINI_API_KEY environment variable." system_prompt = """You are an advanced Web3 and DeFi research analyst with access to real-time market data, DeFi protocol information, social sentiment, and breaking news. Provide comprehensive, actionable insights that combine multiple data sources for superior analysis. Guidelines: - Synthesize data from multiple sources (price, DeFi, social, news) - Provide specific recommendations with risk assessments - Include both technical and fundamental analysis - Reference current market conditions and news events - Use clear, professional language with data-driven insights - Highlight opportunities and risks clearly """ market_context = "" try: if any(keyword in query.lower() for keyword in ["market", "overview", "analysis", "trending", "defi", "protocols"]): comprehensive_data = await self.get_comprehensive_market_data() market_context = f"\n\nCURRENT MARKET ANALYSIS:\n{self._format_comprehensive_data(comprehensive_data)}" for symbol in self.symbol_map.keys(): if symbol in query.lower() or symbol.upper() in query: analysis_data = await self.get_advanced_coin_analysis(symbol) if "coin_data" in analysis_data: coin_info = analysis_data["coin_data"] market_data = coin_info.get("market_data", {}) current_price = market_data.get("current_price", {}).get("usd", 0) price_change = market_data.get("price_change_percentage_24h", 0) market_cap = market_data.get("market_cap", {}).get("usd", 0) volume = market_data.get("total_volume", {}).get("usd", 0) ath = market_data.get("ath", {}).get("usd", 0) ath_change = market_data.get("ath_change_percentage", {}).get("usd", 0) market_context += f"\n\n{symbol.upper()} DETAILED ANALYSIS:\n" market_context += f"Current Price: ${current_price:,.4f}\n" market_context += f"24h Change: {price_change:+.2f}%\n" market_context += f"Market Cap: ${market_cap/1e9:.2f}B\n" market_context += f"24h Volume: ${volume/1e9:.2f}B\n" market_context += f"ATH: ${ath:,.4f} ({ath_change:+.2f}% from ATH)\n" break if "defi" in query.lower(): defi_data = await self.get_defi_analysis() if "top_protocols" in defi_data and defi_data["top_protocols"]: market_context += "\n\nTOP DeFi PROTOCOLS BY TVL:\n" for protocol in defi_data["top_protocols"][:5]: name = protocol.get("name", "Unknown") tvl = protocol.get("tvl", 0) change = protocol.get("change_1d", 0) market_context += f"• {name}: ${tvl/1e9:.2f}B TVL ({change:+.2f}%)\n" except Exception as e: market_context = f"\n\nNote: Some enhanced data unavailable ({str(e)})" full_prompt = f"{system_prompt}\n\nQuery: {query}\n\nReal-time Market Context:{market_context}" response = self.model.generate_content(full_prompt) return response.text if response.text else "❌ No response generated. Please try rephrasing your query." except Exception as e: return f"❌ Enhanced research failed: {str(e)}" async def close(self): await coingecko_client.close() await cryptocompare_client.close() await defillama_client.close() await news_aggregator.close()