Spaces:
Sleeping
Sleeping
File size: 8,891 Bytes
f104fee 4498d8e f104fee 9b006e9 f104fee 9b006e9 f104fee 4498d8e f104fee 4498d8e f104fee 9b006e9 f104fee 9b006e9 f104fee 9b006e9 4498d8e 9b006e9 f104fee 4498d8e f104fee 9b006e9 f104fee 4498d8e f104fee 9b006e9 f104fee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.memory import ConversationBufferWindowMemory
from typing import List, Dict, Any
import asyncio
from datetime import datetime
from src.tools.coingecko_tool import CoinGeckoTool
from src.tools.defillama_tool import DeFiLlamaTool
from src.tools.etherscan_tool import EtherscanTool
from src.tools.chart_data_tool import ChartDataTool
from src.agent.query_planner import QueryPlanner
from src.utils.config import config
from src.utils.logger import get_logger
logger = get_logger(__name__)
class Web3ResearchAgent:
def __init__(self):
self.llm = None
self.tools = []
self.agent = None
self.executor = None
self.enabled = False
if not config.GEMINI_API_KEY:
logger.warning("GEMINI_API_KEY not configured - AI agent disabled")
return
try:
self.llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash-exp",
google_api_key=config.GEMINI_API_KEY,
temperature=0.1,
max_tokens=8192
)
self.tools = self._initialize_tools()
self.query_planner = QueryPlanner(self.llm)
self.memory = ConversationBufferWindowMemory(
memory_key="chat_history", return_messages=True, k=10
)
self.agent = self._create_agent()
self.executor = AgentExecutor(
agent=self.agent, tools=self.tools, memory=self.memory,
verbose=False, max_iterations=5, handle_parsing_errors=True
)
self.enabled = True
logger.info("Web3ResearchAgent initialized successfully")
except Exception as e:
logger.error(f"Agent init failed: {e}")
self.enabled = False
def _initialize_tools(self):
tools = []
try:
tools.append(CoinGeckoTool())
logger.info("CoinGecko tool initialized")
except Exception as e:
logger.warning(f"CoinGecko tool failed: {e}")
try:
tools.append(DeFiLlamaTool())
logger.info("DeFiLlama tool initialized")
except Exception as e:
logger.warning(f"DeFiLlama tool failed: {e}")
try:
tools.append(EtherscanTool())
logger.info("Etherscan tool initialized")
except Exception as e:
logger.warning(f"Etherscan tool failed: {e}")
try:
tools.append(ChartDataTool())
logger.info("ChartDataTool initialized")
except Exception as e:
logger.warning(f"ChartDataTool failed: {e}")
return tools
def _create_agent(self):
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert Web3 research assistant. Use available tools to provide accurate,
data-driven insights about cryptocurrency markets, DeFi protocols, and blockchain data.
**Chart Creation Guidelines:**
- When users ask for charts, trends, or visualizations, ALWAYS use the ChartDataTool
- ALWAYS include the complete JSON output from ChartDataTool in your response
- The JSON data will be extracted and rendered as interactive charts
- Never modify or summarize the JSON data - include it exactly as returned
- Place the JSON data anywhere in your response (beginning, middle, or end)
**Example Response Format:**
Here's the Bitcoin trend analysis you requested:
{{"chart_type": "price_chart", "data": {{"prices": [...], "symbol": "BTC"}}, "config": {{...}}}}
The chart shows recent Bitcoin price movements with key support levels...
**Security Guidelines:**
- Never execute arbitrary code or shell commands
- Only use provided tools for data collection
- Validate all external data before processing
Format responses with clear sections, emojis, and actionable insights.
Use all available tools to gather comprehensive data before providing analysis."""),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad")
])
return create_tool_calling_agent(self.llm, self.tools, prompt)
async def research_query(self, query: str) -> Dict[str, Any]:
if not self.enabled:
return {
"success": False,
"query": query,
"error": "AI agent not configured. Please set GEMINI_API_KEY environment variable.",
"result": "β **Service Unavailable**\n\nThe AI research agent requires a GEMINI_API_KEY to function.\n\nPlease:\n1. Get a free API key from [Google AI Studio](https://makersuite.google.com/app/apikey)\n2. Set environment variable: `export GEMINI_API_KEY='your_key'`\n3. Restart the application",
"sources": [],
"metadata": {"timestamp": datetime.now().isoformat()}
}
try:
logger.info(f"Processing: {query}")
research_plan = await self.query_planner.plan_research(query)
enhanced_query = f"""
Research Query: {query}
Research Plan: {research_plan.get('steps', [])}
Priority: {research_plan.get('priority', 'general')}
Execute systematic research and provide comprehensive analysis.
For any visualizations or charts requested, use the ChartDataTool to generate structured data.
"""
result = await asyncio.to_thread(
self.executor.invoke, {"input": enhanced_query}
)
return {
"success": True,
"query": query,
"research_plan": research_plan,
"result": result.get("output", "No response"),
"sources": self._extract_sources(result.get("output", "")),
"metadata": {
"tools_used": [tool.name for tool in self.tools],
"timestamp": datetime.now().isoformat()
}
}
except Exception as e:
logger.error(f"Research error: {e}")
return {
"success": False,
"query": query,
"error": str(e),
"result": f"β **Research Error**: {str(e)}\n\nPlease try a different query or check your API configuration.",
"sources": [],
"metadata": {"timestamp": datetime.now().isoformat()}
}
async def get_price_history(self, symbol: str, days: int = 30) -> Dict[str, Any]:
try:
coingecko_tool = next(t for t in self.tools if isinstance(t, CoinGeckoTool))
return await coingecko_tool._arun(symbol, {"type": "price_history", "days": days})
except Exception as e:
logger.error(f"Price history error: {e}")
return {}
async def get_comprehensive_market_data(self) -> Dict[str, Any]:
try:
tasks = []
for tool in self.tools:
if isinstance(tool, CoinGeckoTool):
tasks.append(tool._arun("", {"type": "market_overview"}))
elif isinstance(tool, DeFiLlamaTool):
tasks.append(tool._arun("", {"type": "tvl_overview"}))
results = await asyncio.gather(*tasks, return_exceptions=True)
data = {}
for i, result in enumerate(results):
if not isinstance(result, Exception):
if i == 0:
data["market"] = result
elif i == 1:
data["defi"] = result
return data
except Exception as e:
logger.error(f"Market data error: {e}")
return {}
def _extract_sources(self, result_text: str) -> List[str]:
sources = []
if "CoinGecko" in result_text or "coingecko" in result_text.lower():
sources.append("CoinGecko API")
if "DeFiLlama" in result_text or "defillama" in result_text.lower():
sources.append("DeFiLlama API")
if "Etherscan" in result_text or "etherscan" in result_text.lower():
sources.append("Etherscan API")
return sources
|