web3-copilot / src /agent /research_agent.py
Priyanshi Saxena
feat: testing done
9b006e9
raw
history blame
7.37 kB
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.memory import ConversationBufferWindowMemory
from typing import List, Dict, Any
import asyncio
from datetime import datetime
from src.tools.coingecko_tool import CoinGeckoTool
from src.tools.defillama_tool import DeFiLlamaTool
from src.tools.etherscan_tool import EtherscanTool
from src.agent.query_planner import QueryPlanner
from src.utils.config import config
from src.utils.logger import get_logger
logger = get_logger(__name__)
class Web3ResearchAgent:
def __init__(self):
self.llm = None
self.tools = []
self.agent = None
self.executor = None
self.enabled = False
if not config.GEMINI_API_KEY:
logger.warning("GEMINI_API_KEY not configured - AI agent disabled")
return
try:
self.llm = ChatGoogleGenerativeAI(
model="gemini-1.5-flash",
google_api_key=config.GEMINI_API_KEY,
temperature=0.1,
max_tokens=2048
)
self.tools = self._initialize_tools()
self.query_planner = QueryPlanner(self.llm)
self.memory = ConversationBufferWindowMemory(
memory_key="chat_history", return_messages=True, k=10
)
self.agent = self._create_agent()
self.executor = AgentExecutor(
agent=self.agent, tools=self.tools, memory=self.memory,
verbose=False, max_iterations=5, handle_parsing_errors=True
)
self.enabled = True
logger.info("Web3ResearchAgent initialized successfully")
except Exception as e:
logger.error(f"Agent init failed: {e}")
self.enabled = False
def _initialize_tools(self):
tools = []
try:
tools.append(CoinGeckoTool())
logger.info("CoinGecko tool initialized")
except Exception as e:
logger.warning(f"CoinGecko tool failed: {e}")
try:
tools.append(DeFiLlamaTool())
logger.info("DeFiLlama tool initialized")
except Exception as e:
logger.warning(f"DeFiLlama tool failed: {e}")
try:
tools.append(EtherscanTool())
logger.info("Etherscan tool initialized")
except Exception as e:
logger.warning(f"Etherscan tool failed: {e}")
return tools
def _create_agent(self):
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert Web3 research assistant. Use available tools to provide accurate,
data-driven insights about cryptocurrency markets, DeFi protocols, and blockchain data.
Format responses with clear sections, emojis, and actionable insights."""),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad")
])
return create_tool_calling_agent(self.llm, self.tools, prompt)
async def research_query(self, query: str) -> Dict[str, Any]:
if not self.enabled:
return {
"success": False,
"query": query,
"error": "AI agent not configured. Please set GEMINI_API_KEY environment variable.",
"result": "❌ **Service Unavailable**\n\nThe AI research agent requires a GEMINI_API_KEY to function.\n\nPlease:\n1. Get a free API key from [Google AI Studio](https://makersuite.google.com/app/apikey)\n2. Set environment variable: `export GEMINI_API_KEY='your_key'`\n3. Restart the application",
"sources": [],
"metadata": {"timestamp": datetime.now().isoformat()}
}
try:
logger.info(f"Processing: {query}")
research_plan = await self.query_planner.plan_research(query)
enhanced_query = f"""
Research Query: {query}
Research Plan: {research_plan.get('steps', [])}
Priority: {research_plan.get('priority', 'general')}
Execute systematic research and provide comprehensive analysis.
"""
result = await asyncio.to_thread(
self.executor.invoke, {"input": enhanced_query}
)
return {
"success": True,
"query": query,
"research_plan": research_plan,
"result": result.get("output", "No response"),
"sources": self._extract_sources(result.get("output", "")),
"metadata": {
"tools_used": [tool.name for tool in self.tools],
"timestamp": datetime.now().isoformat()
}
}
except Exception as e:
logger.error(f"Research error: {e}")
return {
"success": False,
"query": query,
"error": str(e),
"result": f"❌ **Research Error**: {str(e)}\n\nPlease try a different query or check your API configuration.",
"sources": [],
"metadata": {"timestamp": datetime.now().isoformat()}
}
async def get_price_history(self, symbol: str, days: int = 30) -> Dict[str, Any]:
try:
coingecko_tool = next(t for t in self.tools if isinstance(t, CoinGeckoTool))
return await coingecko_tool._arun(symbol, {"type": "price_history", "days": days})
except Exception as e:
logger.error(f"Price history error: {e}")
return {}
async def get_comprehensive_market_data(self) -> Dict[str, Any]:
try:
tasks = []
for tool in self.tools:
if isinstance(tool, CoinGeckoTool):
tasks.append(tool._arun("", {"type": "market_overview"}))
elif isinstance(tool, DeFiLlamaTool):
tasks.append(tool._arun("", {"type": "tvl_overview"}))
results = await asyncio.gather(*tasks, return_exceptions=True)
data = {}
for i, result in enumerate(results):
if not isinstance(result, Exception):
if i == 0:
data["market"] = result
elif i == 1:
data["defi"] = result
return data
except Exception as e:
logger.error(f"Market data error: {e}")
return {}
def _extract_sources(self, result_text: str) -> List[str]:
sources = []
if "CoinGecko" in result_text or "coingecko" in result_text.lower():
sources.append("CoinGecko API")
if "DeFiLlama" in result_text or "defillama" in result_text.lower():
sources.append("DeFiLlama API")
if "Etherscan" in result_text or "etherscan" in result_text.lower():
sources.append("Etherscan API")
return sources