File size: 7,368 Bytes
f104fee
 
 
 
 
 
 
 
 
 
 
 
9b006e9
f104fee
 
 
 
 
 
9b006e9
 
 
 
 
 
 
 
 
 
f104fee
 
 
 
 
 
 
 
9b006e9
f104fee
 
 
 
 
 
 
 
 
 
9b006e9
 
 
f104fee
 
9b006e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f104fee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b006e9
 
 
 
 
 
 
 
 
 
f104fee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b006e9
 
f104fee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.memory import ConversationBufferWindowMemory
from typing import List, Dict, Any
import asyncio
from datetime import datetime

from src.tools.coingecko_tool import CoinGeckoTool
from src.tools.defillama_tool import DeFiLlamaTool
from src.tools.etherscan_tool import EtherscanTool
from src.agent.query_planner import QueryPlanner
from src.utils.config import config
from src.utils.logger import get_logger

logger = get_logger(__name__)

class Web3ResearchAgent:
    def __init__(self):
        self.llm = None
        self.tools = []
        self.agent = None
        self.executor = None
        self.enabled = False
        
        if not config.GEMINI_API_KEY:
            logger.warning("GEMINI_API_KEY not configured - AI agent disabled")
            return
        
        try:
            self.llm = ChatGoogleGenerativeAI(
                model="gemini-1.5-flash",
                google_api_key=config.GEMINI_API_KEY,
                temperature=0.1,
                max_tokens=2048
            )
            
            self.tools = self._initialize_tools()
            self.query_planner = QueryPlanner(self.llm)
            self.memory = ConversationBufferWindowMemory(
                memory_key="chat_history", return_messages=True, k=10
            )
            
            self.agent = self._create_agent()
            self.executor = AgentExecutor(
                agent=self.agent, tools=self.tools, memory=self.memory,
                verbose=False, max_iterations=5, handle_parsing_errors=True
            )
            self.enabled = True
            logger.info("Web3ResearchAgent initialized successfully")
            
        except Exception as e:
            logger.error(f"Agent init failed: {e}")
            self.enabled = False
    
    def _initialize_tools(self):
        tools = []
        
        try:
            tools.append(CoinGeckoTool())
            logger.info("CoinGecko tool initialized")
        except Exception as e:
            logger.warning(f"CoinGecko tool failed: {e}")
        
        try:
            tools.append(DeFiLlamaTool())
            logger.info("DeFiLlama tool initialized")
        except Exception as e:
            logger.warning(f"DeFiLlama tool failed: {e}")
        
        try:
            tools.append(EtherscanTool())
            logger.info("Etherscan tool initialized")
        except Exception as e:
            logger.warning(f"Etherscan tool failed: {e}")
        
        return tools
    
    def _create_agent(self):
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are an expert Web3 research assistant. Use available tools to provide accurate, 
            data-driven insights about cryptocurrency markets, DeFi protocols, and blockchain data.
            
            Format responses with clear sections, emojis, and actionable insights."""),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder("agent_scratchpad")
        ])
        
        return create_tool_calling_agent(self.llm, self.tools, prompt)
    
    async def research_query(self, query: str) -> Dict[str, Any]:
        if not self.enabled:
            return {
                "success": False,
                "query": query,
                "error": "AI agent not configured. Please set GEMINI_API_KEY environment variable.",
                "result": "❌ **Service Unavailable**\n\nThe AI research agent requires a GEMINI_API_KEY to function.\n\nPlease:\n1. Get a free API key from [Google AI Studio](https://makersuite.google.com/app/apikey)\n2. Set environment variable: `export GEMINI_API_KEY='your_key'`\n3. Restart the application",
                "sources": [],
                "metadata": {"timestamp": datetime.now().isoformat()}
            }
        
        try:
            logger.info(f"Processing: {query}")
            
            research_plan = await self.query_planner.plan_research(query)
            
            enhanced_query = f"""
            Research Query: {query}
            Research Plan: {research_plan.get('steps', [])}
            Priority: {research_plan.get('priority', 'general')}
            
            Execute systematic research and provide comprehensive analysis.
            """
            
            result = await asyncio.to_thread(
                self.executor.invoke, {"input": enhanced_query}
            )
            
            return {
                "success": True,
                "query": query,
                "research_plan": research_plan,
                "result": result.get("output", "No response"),
                "sources": self._extract_sources(result.get("output", "")),
                "metadata": {
                    "tools_used": [tool.name for tool in self.tools],
                    "timestamp": datetime.now().isoformat()
                }
            }
            
        except Exception as e:
            logger.error(f"Research error: {e}")
            return {
                "success": False,
                "query": query,
                "error": str(e),
                "result": f"❌ **Research Error**: {str(e)}\n\nPlease try a different query or check your API configuration.",
                "sources": [],
                "metadata": {"timestamp": datetime.now().isoformat()}
            }
    
    async def get_price_history(self, symbol: str, days: int = 30) -> Dict[str, Any]:
        try:
            coingecko_tool = next(t for t in self.tools if isinstance(t, CoinGeckoTool))
            return await coingecko_tool._arun(symbol, {"type": "price_history", "days": days})
        except Exception as e:
            logger.error(f"Price history error: {e}")
            return {}
    
    async def get_comprehensive_market_data(self) -> Dict[str, Any]:
        try:
            tasks = []
            for tool in self.tools:
                if isinstance(tool, CoinGeckoTool):
                    tasks.append(tool._arun("", {"type": "market_overview"}))
                elif isinstance(tool, DeFiLlamaTool):
                    tasks.append(tool._arun("", {"type": "tvl_overview"}))
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            data = {}
            for i, result in enumerate(results):
                if not isinstance(result, Exception):
                    if i == 0:
                        data["market"] = result
                    elif i == 1:
                        data["defi"] = result
            
            return data
        except Exception as e:
            logger.error(f"Market data error: {e}")
            return {}
    
    def _extract_sources(self, result_text: str) -> List[str]:
        sources = []
        if "CoinGecko" in result_text or "coingecko" in result_text.lower():
            sources.append("CoinGecko API")
        if "DeFiLlama" in result_text or "defillama" in result_text.lower():
            sources.append("DeFiLlama API") 
        if "Etherscan" in result_text or "etherscan" in result_text.lower():
            sources.append("Etherscan API")
        return sources