File size: 8,891 Bytes
f104fee
 
 
 
 
 
 
 
 
 
 
4498d8e
f104fee
9b006e9
f104fee
 
 
 
 
 
9b006e9
 
 
 
 
 
 
 
 
 
f104fee
 
4498d8e
f104fee
 
4498d8e
f104fee
 
9b006e9
f104fee
 
 
 
 
 
 
 
 
 
9b006e9
 
 
f104fee
 
9b006e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4498d8e
 
 
 
 
 
9b006e9
f104fee
 
 
 
 
 
4498d8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f104fee
 
 
 
 
 
 
 
9b006e9
 
 
 
 
 
 
 
 
 
f104fee
 
 
 
 
 
 
 
 
 
 
4498d8e
f104fee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b006e9
 
f104fee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.memory import ConversationBufferWindowMemory
from typing import List, Dict, Any
import asyncio
from datetime import datetime

from src.tools.coingecko_tool import CoinGeckoTool
from src.tools.defillama_tool import DeFiLlamaTool
from src.tools.etherscan_tool import EtherscanTool
from src.tools.chart_data_tool import ChartDataTool
from src.agent.query_planner import QueryPlanner
from src.utils.config import config
from src.utils.logger import get_logger

logger = get_logger(__name__)

class Web3ResearchAgent:
    def __init__(self):
        self.llm = None
        self.tools = []
        self.agent = None
        self.executor = None
        self.enabled = False
        
        if not config.GEMINI_API_KEY:
            logger.warning("GEMINI_API_KEY not configured - AI agent disabled")
            return
        
        try:
            self.llm = ChatGoogleGenerativeAI(
                model="gemini-2.0-flash-exp",
                google_api_key=config.GEMINI_API_KEY,
                temperature=0.1,
                max_tokens=8192
            )
            
            self.tools = self._initialize_tools()
            self.query_planner = QueryPlanner(self.llm)
            self.memory = ConversationBufferWindowMemory(
                memory_key="chat_history", return_messages=True, k=10
            )
            
            self.agent = self._create_agent()
            self.executor = AgentExecutor(
                agent=self.agent, tools=self.tools, memory=self.memory,
                verbose=False, max_iterations=5, handle_parsing_errors=True
            )
            self.enabled = True
            logger.info("Web3ResearchAgent initialized successfully")
            
        except Exception as e:
            logger.error(f"Agent init failed: {e}")
            self.enabled = False
    
    def _initialize_tools(self):
        tools = []
        
        try:
            tools.append(CoinGeckoTool())
            logger.info("CoinGecko tool initialized")
        except Exception as e:
            logger.warning(f"CoinGecko tool failed: {e}")
        
        try:
            tools.append(DeFiLlamaTool())
            logger.info("DeFiLlama tool initialized")
        except Exception as e:
            logger.warning(f"DeFiLlama tool failed: {e}")
        
        try:
            tools.append(EtherscanTool())
            logger.info("Etherscan tool initialized")
        except Exception as e:
            logger.warning(f"Etherscan tool failed: {e}")
        
        try:
            tools.append(ChartDataTool())
            logger.info("ChartDataTool initialized")
        except Exception as e:
            logger.warning(f"ChartDataTool failed: {e}")
        
        return tools
    
    def _create_agent(self):
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are an expert Web3 research assistant. Use available tools to provide accurate, 
            data-driven insights about cryptocurrency markets, DeFi protocols, and blockchain data.
            
            **Chart Creation Guidelines:**
            - When users ask for charts, trends, or visualizations, ALWAYS use the ChartDataTool
            - ALWAYS include the complete JSON output from ChartDataTool in your response
            - The JSON data will be extracted and rendered as interactive charts
            - Never modify or summarize the JSON data - include it exactly as returned
            - Place the JSON data anywhere in your response (beginning, middle, or end)
            
            **Example Response Format:**
            Here's the Bitcoin trend analysis you requested:
            
            {{"chart_type": "price_chart", "data": {{"prices": [...], "symbol": "BTC"}}, "config": {{...}}}}
            
            The chart shows recent Bitcoin price movements with key support levels...
            
            **Security Guidelines:**
            - Never execute arbitrary code or shell commands
            - Only use provided tools for data collection
            - Validate all external data before processing
            
            Format responses with clear sections, emojis, and actionable insights.
            Use all available tools to gather comprehensive data before providing analysis."""),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder("agent_scratchpad")
        ])
        
        return create_tool_calling_agent(self.llm, self.tools, prompt)
    
    async def research_query(self, query: str) -> Dict[str, Any]:
        if not self.enabled:
            return {
                "success": False,
                "query": query,
                "error": "AI agent not configured. Please set GEMINI_API_KEY environment variable.",
                "result": "❌ **Service Unavailable**\n\nThe AI research agent requires a GEMINI_API_KEY to function.\n\nPlease:\n1. Get a free API key from [Google AI Studio](https://makersuite.google.com/app/apikey)\n2. Set environment variable: `export GEMINI_API_KEY='your_key'`\n3. Restart the application",
                "sources": [],
                "metadata": {"timestamp": datetime.now().isoformat()}
            }
        
        try:
            logger.info(f"Processing: {query}")
            
            research_plan = await self.query_planner.plan_research(query)
            
            enhanced_query = f"""
            Research Query: {query}
            Research Plan: {research_plan.get('steps', [])}
            Priority: {research_plan.get('priority', 'general')}
            
            Execute systematic research and provide comprehensive analysis.
            For any visualizations or charts requested, use the ChartDataTool to generate structured data.
            """
            
            result = await asyncio.to_thread(
                self.executor.invoke, {"input": enhanced_query}
            )
            
            return {
                "success": True,
                "query": query,
                "research_plan": research_plan,
                "result": result.get("output", "No response"),
                "sources": self._extract_sources(result.get("output", "")),
                "metadata": {
                    "tools_used": [tool.name for tool in self.tools],
                    "timestamp": datetime.now().isoformat()
                }
            }
            
        except Exception as e:
            logger.error(f"Research error: {e}")
            return {
                "success": False,
                "query": query,
                "error": str(e),
                "result": f"❌ **Research Error**: {str(e)}\n\nPlease try a different query or check your API configuration.",
                "sources": [],
                "metadata": {"timestamp": datetime.now().isoformat()}
            }
    
    async def get_price_history(self, symbol: str, days: int = 30) -> Dict[str, Any]:
        try:
            coingecko_tool = next(t for t in self.tools if isinstance(t, CoinGeckoTool))
            return await coingecko_tool._arun(symbol, {"type": "price_history", "days": days})
        except Exception as e:
            logger.error(f"Price history error: {e}")
            return {}
    
    async def get_comprehensive_market_data(self) -> Dict[str, Any]:
        try:
            tasks = []
            for tool in self.tools:
                if isinstance(tool, CoinGeckoTool):
                    tasks.append(tool._arun("", {"type": "market_overview"}))
                elif isinstance(tool, DeFiLlamaTool):
                    tasks.append(tool._arun("", {"type": "tvl_overview"}))
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            data = {}
            for i, result in enumerate(results):
                if not isinstance(result, Exception):
                    if i == 0:
                        data["market"] = result
                    elif i == 1:
                        data["defi"] = result
            
            return data
        except Exception as e:
            logger.error(f"Market data error: {e}")
            return {}
    
    def _extract_sources(self, result_text: str) -> List[str]:
        sources = []
        if "CoinGecko" in result_text or "coingecko" in result_text.lower():
            sources.append("CoinGecko API")
        if "DeFiLlama" in result_text or "defillama" in result_text.lower():
            sources.append("DeFiLlama API") 
        if "Etherscan" in result_text or "etherscan" in result_text.lower():
            sources.append("Etherscan API")
        return sources