from google import genai from google.genai import types import json from typing import Dict, Any, List from src.api_clients import coingecko_client, cryptocompare_client from src.cache_manager import cache_manager from src.config import config import asyncio class ResearchAgent: def __init__(self): self.client = genai.Client(api_key=config.GEMINI_API_KEY) self.symbol_map = { "btc": "bitcoin", "eth": "ethereum", "sol": "solana", "ada": "cardano", "dot": "polkadot", "bnb": "binancecoin", "usdc": "usd-coin", "usdt": "tether", "xrp": "ripple", "avax": "avalanche-2", "link": "chainlink", "matic": "matic-network" } def _format_coin_id(self, symbol: str) -> str: return self.symbol_map.get(symbol.lower(), symbol.lower()) async def get_market_overview(self) -> Dict[str, Any]: cache_key = "market_overview" cached = cache_manager.get(cache_key) if cached: return cached try: market_data = await coingecko_client.get_market_data(per_page=20) global_data = await coingecko_client.get_global_data() trending = await coingecko_client.get_trending() result = { "market_data": market_data, "global_data": global_data, "trending": trending } cache_manager.set(cache_key, result) return result except Exception as e: raise Exception(f"Failed to fetch market overview: {str(e)}") async def get_price_history(self, symbol: str) -> Dict[str, Any]: cache_key = f"price_history_{symbol.lower()}" cached = cache_manager.get(cache_key) if cached: return cached try: coin_id = self._format_coin_id(symbol) data = await coingecko_client.get_price_history(coin_id, days=30) cache_manager.set(cache_key, data) return data except Exception as e: raise Exception(f"Failed to fetch price history for {symbol}: {str(e)}") async def get_coin_analysis(self, symbol: str) -> Dict[str, Any]: cache_key = f"coin_analysis_{symbol.lower()}" cached = cache_manager.get(cache_key) if cached: return cached try: coin_id = self._format_coin_id(symbol) tasks = [ coingecko_client.get_coin_data(coin_id), coingecko_client.get_price_history(coin_id, days=7), cryptocompare_client.get_social_data(symbol.upper()) ] coin_data, price_history, social_data = await asyncio.gather(*tasks, return_exceptions=True) result = {} if not isinstance(coin_data, Exception): result["coin_data"] = coin_data if not isinstance(price_history, Exception): result["price_history"] = price_history if not isinstance(social_data, Exception): result["social_data"] = social_data cache_manager.set(cache_key, result) return result except Exception as e: raise Exception(f"Failed to analyze {symbol}: {str(e)}") def _format_market_data(self, data: Dict[str, Any]) -> str: if not data: return "No market data available" formatted = "📊 MARKET OVERVIEW\n\n" if "global_data" in data and "data" in data["global_data"]: global_info = data["global_data"]["data"] total_mcap = global_info.get("total_market_cap", {}).get("usd", 0) total_volume = global_info.get("total_volume", {}).get("usd", 0) btc_dominance = global_info.get("market_cap_percentage", {}).get("btc", 0) formatted += f"Total Market Cap: ${total_mcap:,.0f}\n" formatted += f"24h Volume: ${total_volume:,.0f}\n" formatted += f"Bitcoin Dominance: {btc_dominance:.1f}%\n\n" if "trending" in data and "coins" in data["trending"]: formatted += "🔥 TRENDING COINS\n" for i, coin in enumerate(data["trending"]["coins"][:5], 1): name = coin.get("item", {}).get("name", "Unknown") symbol = coin.get("item", {}).get("symbol", "") formatted += f"{i}. {name} ({symbol.upper()})\n" formatted += "\n" if "market_data" in data: formatted += "💰 TOP CRYPTOCURRENCIES\n" for i, coin in enumerate(data["market_data"][:10], 1): name = coin.get("name", "Unknown") symbol = coin.get("symbol", "").upper() price = coin.get("current_price", 0) change = coin.get("price_change_percentage_24h", 0) change_symbol = "📈" if change >= 0 else "📉" formatted += f"{i:2d}. {name} ({symbol}): ${price:,.4f} {change_symbol} {change:+.2f}%\n" return formatted async def research(self, query: str) -> str: try: if not config.GEMINI_API_KEY: return "❌ Gemini API key not configured. Please set GEMINI_API_KEY environment variable." system_prompt = """You are an expert Web3 and cryptocurrency research analyst. Provide comprehensive, accurate, and actionable insights based on real market data. Guidelines: - Give specific, data-driven analysis - Include price targets and risk assessments when relevant - Explain technical concepts clearly - Provide actionable recommendations - Use emojis for better readability - Be concise but thorough """ market_context = "" try: if any(keyword in query.lower() for keyword in ["market", "overview", "trending", "top"]): market_data = await self.get_market_overview() market_context = f"\n\nCURRENT MARKET DATA:\n{self._format_market_data(market_data)}" for symbol in ["btc", "eth", "sol", "ada", "dot", "bnb", "avax", "link"]: if symbol in query.lower() or symbol.upper() in query: analysis_data = await self.get_coin_analysis(symbol) if "coin_data" in analysis_data: coin_info = analysis_data["coin_data"] market_data = coin_info.get("market_data", {}) current_price = market_data.get("current_price", {}).get("usd", 0) price_change = market_data.get("price_change_percentage_24h", 0) market_cap = market_data.get("market_cap", {}).get("usd", 0) volume = market_data.get("total_volume", {}).get("usd", 0) market_context += f"\n\n{symbol.upper()} DATA:\n" market_context += f"Price: ${current_price:,.4f}\n" market_context += f"24h Change: {price_change:+.2f}%\n" market_context += f"Market Cap: ${market_cap:,.0f}\n" market_context += f"Volume: ${volume:,.0f}\n" break except Exception as e: market_context = f"\n\nNote: Some market data unavailable ({str(e)})" full_prompt = f"{query}{market_context}" response = self.client.models.generate_content( model="gemini-2.5-flash", contents=[ types.Content( role="user", parts=[types.Part(text=full_prompt)] ) ], config=types.GenerateContentConfig( system_instruction=system_prompt, temperature=0.3, max_output_tokens=2000 ) ) if response.text: return response.text else: return "❌ No response generated. Please try rephrasing your query." except Exception as e: return f"❌ Research failed: {str(e)}" async def close(self): await coingecko_client.close() await cryptocompare_client.close()