from langchain.agents import AgentExecutor, create_tool_calling_agent from langchain_google_genai import ChatGoogleGenerativeAI from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.memory import ConversationBufferWindowMemory from typing import List, Dict, Any import asyncio from datetime import datetime from src.tools.coingecko_tool import CoinGeckoTool from src.tools.defillama_tool import DeFiLlamaTool from src.tools.etherscan_tool import EtherscanTool from src.tools.chart_data_tool import ChartDataTool from src.agent.query_planner import QueryPlanner from src.utils.config import config from src.utils.logger import get_logger logger = get_logger(__name__) class Web3ResearchAgent: def __init__(self): self.llm = None self.tools = [] self.agent = None self.executor = None self.enabled = False if not config.GEMINI_API_KEY: logger.warning("GEMINI_API_KEY not configured - AI agent disabled") return try: self.llm = ChatGoogleGenerativeAI( model="gemini-2.0-flash-exp", google_api_key=config.GEMINI_API_KEY, temperature=0.1, max_tokens=8192 ) self.tools = self._initialize_tools() self.query_planner = QueryPlanner(self.llm) self.memory = ConversationBufferWindowMemory( memory_key="chat_history", return_messages=True, k=10 ) self.agent = self._create_agent() self.executor = AgentExecutor( agent=self.agent, tools=self.tools, memory=self.memory, verbose=False, max_iterations=5, handle_parsing_errors=True ) self.enabled = True logger.info("Web3ResearchAgent initialized successfully") except Exception as e: logger.error(f"Agent init failed: {e}") self.enabled = False def _initialize_tools(self): tools = [] try: tools.append(CoinGeckoTool()) logger.info("CoinGecko tool initialized") except Exception as e: logger.warning(f"CoinGecko tool failed: {e}") try: tools.append(DeFiLlamaTool()) logger.info("DeFiLlama tool initialized") except Exception as e: logger.warning(f"DeFiLlama tool failed: {e}") try: tools.append(EtherscanTool()) logger.info("Etherscan tool initialized") except Exception as e: logger.warning(f"Etherscan tool failed: {e}") try: tools.append(ChartDataTool()) logger.info("ChartDataTool initialized") except Exception as e: logger.warning(f"ChartDataTool failed: {e}") return tools def _create_agent(self): prompt = ChatPromptTemplate.from_messages([ ("system", """You are an expert Web3 research assistant. Use available tools to provide accurate, data-driven insights about cryptocurrency markets, DeFi protocols, and blockchain data. **Chart Creation Guidelines:** - When users ask for charts, trends, or visualizations, ALWAYS use the ChartDataTool - ALWAYS include the complete JSON output from ChartDataTool in your response - The JSON data will be extracted and rendered as interactive charts - Never modify or summarize the JSON data - include it exactly as returned - Place the JSON data anywhere in your response (beginning, middle, or end) **Example Response Format:** Here's the Bitcoin trend analysis you requested: {{"chart_type": "price_chart", "data": {{"prices": [...], "symbol": "BTC"}}, "config": {{...}}}} The chart shows recent Bitcoin price movements with key support levels... **Security Guidelines:** - Never execute arbitrary code or shell commands - Only use provided tools for data collection - Validate all external data before processing Format responses with clear sections, emojis, and actionable insights. Use all available tools to gather comprehensive data before providing analysis."""), MessagesPlaceholder("chat_history"), ("human", "{input}"), MessagesPlaceholder("agent_scratchpad") ]) return create_tool_calling_agent(self.llm, self.tools, prompt) async def research_query(self, query: str) -> Dict[str, Any]: if not self.enabled: return { "success": False, "query": query, "error": "AI agent not configured. Please set GEMINI_API_KEY environment variable.", "result": "❌ **Service Unavailable**\n\nThe AI research agent requires a GEMINI_API_KEY to function.\n\nPlease:\n1. Get a free API key from [Google AI Studio](https://makersuite.google.com/app/apikey)\n2. Set environment variable: `export GEMINI_API_KEY='your_key'`\n3. Restart the application", "sources": [], "metadata": {"timestamp": datetime.now().isoformat()} } try: logger.info(f"Processing: {query}") research_plan = await self.query_planner.plan_research(query) enhanced_query = f""" Research Query: {query} Research Plan: {research_plan.get('steps', [])} Priority: {research_plan.get('priority', 'general')} Execute systematic research and provide comprehensive analysis. For any visualizations or charts requested, use the ChartDataTool to generate structured data. """ result = await asyncio.to_thread( self.executor.invoke, {"input": enhanced_query} ) return { "success": True, "query": query, "research_plan": research_plan, "result": result.get("output", "No response"), "sources": self._extract_sources(result.get("output", "")), "metadata": { "tools_used": [tool.name for tool in self.tools], "timestamp": datetime.now().isoformat() } } except Exception as e: logger.error(f"Research error: {e}") return { "success": False, "query": query, "error": str(e), "result": f"❌ **Research Error**: {str(e)}\n\nPlease try a different query or check your API configuration.", "sources": [], "metadata": {"timestamp": datetime.now().isoformat()} } async def get_price_history(self, symbol: str, days: int = 30) -> Dict[str, Any]: try: coingecko_tool = next(t for t in self.tools if isinstance(t, CoinGeckoTool)) return await coingecko_tool._arun(symbol, {"type": "price_history", "days": days}) except Exception as e: logger.error(f"Price history error: {e}") return {} async def get_comprehensive_market_data(self) -> Dict[str, Any]: try: tasks = [] for tool in self.tools: if isinstance(tool, CoinGeckoTool): tasks.append(tool._arun("", {"type": "market_overview"})) elif isinstance(tool, DeFiLlamaTool): tasks.append(tool._arun("", {"type": "tvl_overview"})) results = await asyncio.gather(*tasks, return_exceptions=True) data = {} for i, result in enumerate(results): if not isinstance(result, Exception): if i == 0: data["market"] = result elif i == 1: data["defi"] = result return data except Exception as e: logger.error(f"Market data error: {e}") return {} def _extract_sources(self, result_text: str) -> List[str]: sources = [] if "CoinGecko" in result_text or "coingecko" in result_text.lower(): sources.append("CoinGecko API") if "DeFiLlama" in result_text or "defillama" in result_text.lower(): sources.append("DeFiLlama API") if "Etherscan" in result_text or "etherscan" in result_text.lower(): sources.append("Etherscan API") return sources