Spaces:
Running
Running
from typing import Dict, Any, List | |
import asyncio | |
class QueryPlanner: | |
def __init__(self, llm): | |
self.llm = llm | |
async def plan_research(self, query: str) -> Dict[str, Any]: | |
try: | |
planning_prompt = f""" | |
Analyze this research query and create a structured plan: | |
Query: {query} | |
Determine: | |
1. What type of analysis is needed (price, market, defi, comparison, etc.) | |
2. Which data sources would be most relevant | |
3. What specific steps should be taken | |
4. Priority focus area | |
Respond in JSON format with keys: type, steps, priority, data_sources | |
""" | |
response = await asyncio.to_thread( | |
self.llm.invoke, | |
planning_prompt | |
) | |
# Simple categorization based on keywords | |
query_lower = query.lower() | |
plan = { | |
"type": self._categorize_query(query_lower), | |
"steps": self._generate_steps(query_lower), | |
"priority": self._determine_priority(query_lower), | |
"data_sources": self._identify_sources(query_lower) | |
} | |
return plan | |
except Exception: | |
return { | |
"type": "general", | |
"steps": ["Analyze query", "Gather data", "Provide insights"], | |
"priority": "general analysis", | |
"data_sources": ["coingecko", "defillama"] | |
} | |
def _categorize_query(self, query: str) -> str: | |
if any(word in query for word in ["price", "chart", "value"]): | |
return "price_analysis" | |
elif any(word in query for word in ["defi", "tvl", "protocol", "yield"]): | |
return "defi_analysis" | |
elif any(word in query for word in ["compare", "vs", "versus"]): | |
return "comparison" | |
elif any(word in query for word in ["market", "overview", "trending"]): | |
return "market_overview" | |
else: | |
return "general" | |
def _generate_steps(self, query: str) -> List[str]: | |
steps = ["Gather relevant data"] | |
if "price" in query: | |
steps.extend(["Get current price data", "Analyze price trends"]) | |
if "defi" in query: | |
steps.extend(["Fetch DeFi protocol data", "Analyze TVL trends"]) | |
if any(word in query for word in ["compare", "vs"]): | |
steps.append("Perform comparative analysis") | |
steps.append("Synthesize insights and recommendations") | |
return steps | |
def _determine_priority(self, query: str) -> str: | |
if "urgent" in query or "now" in query: | |
return "high" | |
elif "overview" in query: | |
return "comprehensive" | |
else: | |
return "standard" | |
def _identify_sources(self, query: str) -> List[str]: | |
sources = [] | |
if any(word in query for word in ["price", "market", "coin", "token"]): | |
sources.append("coingecko") | |
if any(word in query for word in ["defi", "tvl", "protocol"]): | |
sources.append("defillama") | |
if any(word in query for word in ["transaction", "address", "gas"]): | |
sources.append("etherscan") | |
return sources if sources else ["coingecko", "defillama"] | |